Netty笔记
# Bootstrap
# 基础代码结构
这里记录一份基础代码结构,用到的时候随时过来看一眼或者干脆直接复制,防止忘了怎么写
TCP服务端基础代码结构
public class SimpleNettyServer {
public static final String HOST = "127.0.0.1";
public static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
// 创建两个事件循环组,boss处理连接事件,work处理消息读写,分工明确
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 创建服务器对象
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workGroup) // 绑定线程组
.channel(NioServerSocketChannel.class) // 使用非阻塞的服务端SocketChannel作为连接建立通道
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 通过NioSocketChannel进行通信
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 使用pipeline添加Handler用于处理消息以及执行业务逻辑
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器收到消息:" + msg);
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new StringEncoder());
}
});
// 服务器绑定IP地址并启动(异步)
ChannelFuture channelFuture = serverBootstrap.bind(HOST, PORT); // 可以 .sync() 同步
// 在回调中编写连接建立后的业务逻辑
channelFuture.addListener(sf -> {
System.out.println("Netty 服务器启动成功:" + PORT);
ChannelFuture successFuture = (ChannelFuture) sf;
// 处理服务端关闭连接的善后工作
successFuture.channel().closeFuture().addListener(closeFuture -> {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.out.println("Netty 服务器已关闭");
});
});
}
}
TCP客户端基础代码结构
public class SimpleNettyClient {
public static final String HOST = "127.0.0.1";
public static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
// 客户端只管负责连接服务器,不需要考虑处理连接事件,单个事件循环组足矣
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// 创建客户端对象
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup) // 绑定线程组
.channel(NioSocketChannel.class) // 使用非阻塞的SocketChannel作为连接建立通道,不需要Server功能
.handler(new ChannelInitializer<NioSocketChannel>() { // 通过NioSocketChannel进行通信
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务器发回的消息:" + msg);
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new StringEncoder());
}
});
// 启动客户端连接服务端(异步)
ChannelFuture channelFuture = bootstrap.connect(HOST, PORT); // .sync() 可以同步
// 在回调中编写连接建立后的业务逻辑
channelFuture.addListener(cf -> {
System.out.println("成功连接到服务器");
ChannelFuture connectFuture = (ChannelFuture) cf;
// 处理客户端关闭连接的善后工作
connectFuture.channel().closeFuture().addListener(closeFuture -> {
eventLoopGroup.shutdownGracefully();
System.out.println("连接已关闭");
});
});
}
}
# 参数选项
Netty 的 Bootstrap 可以配置一些参数对连接进行设置,这里有些注意事项需要说清楚
- ServerBootstrap 配置参数的方法有两个,分别是
option
和childOption
,因为服务端有两种类型的连接通道需要关注,服务端本身是ServerSocketChannel,服务端自己成功启动才能与客户端建立连接,所以服务端的 option 用于配置服务端参数 - 服务端通道启动成功后会等待客户端连接,客户端发起连接后会通过
SocketChannel
建立连接通道进行通信,所以服务端的 childOption 用于配置客户端参数 - 客户端不需要考虑太多,只需要考虑自己本身的 Channel 通道即可,所以客户端的 option 方法是用于配置客户端参数
serverBootstrap
.option(ChannelOption.SO_RCVBUF, 10 * 1024) // 设置TCP接收缓冲区大小
// ------------------------------------------------------------------
.childOption(ChannelOption.RCVBUF_ALLOCATOR, ...) // 设置ByteBuf缓冲区大小
// ----------------------------------------------------------------------
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000) // 连接超时时间10秒
# ByteBuf
ByteBuf 用于承载数据,为 Netty 提供的增强的缓冲区对象,主要有以下几点优势
- 分别使用读指针(rIndex)和写指针(wIndex)省去读写模式反复切换的步骤
- 4.1 版本后的 Netty 支持 ByteBuf 池化技术,对创建慢而读写快的直接内存(Direct)很友好
- 对 4.1 以后的非安卓版本的 Netty 生效
- 通过创建 ByteBuf 对象查看他的 Class 可以检查是否启用了池化技术
- 控制池化是否启用:
-Dio.netty.allocator.type={unpooled|pooled}
- ByteBuf 对象默认容量为 256 字节,写满后会自动扩容,最大扩容容量为
Integer.MAX_VALUE
可自定义设置- 小于 512 字节的轻量级数据读写,触发扩容后会自动找到比当前容量大的下一个 16 的整数倍
- 超过 512 字节的读写,会按照 2^n 次方扩容
- ByteBuf 读写时注意字节序,是大端读写(默认)还是小端读写(LE)
# 方法 API 笔记
创建对象
方法名 | 作用 |
---|---|
ByteBufAllocator.DEFAULT.buffer() | 创建缓冲区对象(默认直接内存) |
ByteBufAllocator.DEFAULT.heapBuffer() | 创建堆内存缓冲区对象 |
ByteBufAllocator.DEFAULT.directBuffer() | 创建直接内存缓冲区对象 |
Unpooled.buffer() | 非池化的缓冲区对象(默认堆内存) |
Unpooled.directBuffer() | 非池化的直接内存缓冲区对象 |
ctx.alloc().buffer() | 在 pipeline 的 handler 中推荐用这种方式创建缓冲区对象 |
读写操作
方法名 | 作用 |
---|---|
buf.wirte*() | 向缓冲区写入各种数据(Byte、Int、Double 等) |
buf.set*() | 指定下标向缓冲区写入各种数据,不会移动指针 |
buf.read*() | 在缓冲区读取各种数据(Byte、Int、Double 等) |
buf.get*() | 指定下标在缓冲区读取各种数据,不会移动指针 |
指针操作
方法名 | 作用 |
---|---|
buf.mark*Index() | 在当前位置设置标记,可分别设置 Reader 读标记和 Writer 写标记 |
buf.reset*Index() | 将读指针 Reader 或写指针 Writer 重置到之前设置标记的位置 |
# 内存释放
ByteBuf 分为三个主要实现:
- UnpooledHeapByteBuf:非池化堆内存缓冲区,依靠 GC 垃圾回收机制释放内存
- UnpooledDirectByteBuf:非池化直接内存缓冲区,GC 无法处理他的内存释放需要特殊处理
- PooledByteBuf:池化的缓冲区对象,他的内存释放更复杂一些
方案 1:针对各种类型的 ByteBuf 的内存释放,他们实现了共同的接口ReferenceCounted
,通过引用技术法使用统一规范的 API 来释放内存,每个 ByteBuf 都存有一个引用数量,当该 ByteBuf 的引用数量归 0 就会被回收
方案 2:Netty 的 pipeline 除了开发者手动添加的 handler 之外,他的头部和尾部内置了head
和tail
处理器,如果 ByteBuf 没有在开发者的 Handler 中被释放,而是一路传到了头尾,Netty 会帮咱们释放
方法名 | 作用 |
---|---|
buf.retain() | 引用计数器 +1 |
buf1.release() | 引用计数器 -1 |
buf.refCnt() | 获取当前计数器的引用数量 |
ReferenceCountUtil.release(buf) | 无视计数器引用数量,释放内存 |
# 零拷贝与深拷贝
方法 | 类型 | 范围 | 内存 | 其他 |
---|---|---|---|---|
buf.slice | 零拷贝 | 自定义&全部 | 共享内存 | 建议计数器 +1 后自行维护 |
buf.duplicate | 零拷贝 | 全部 | 共享内存 | 建议计数器 +1 后自行维护 |
buf.copy | 深拷贝 | 自定义&全部 | 独立内存 | 独立维护 |
compositeBuffe | 合并 | —— | 共享内存 | 合并后的 buf 与原 buf 同时释放 |
Unpooled.wrappedBuffer | 合并 | —— | 未测试 | 未测试(非池化) |
# EventLoop
EventLoop 事件循环对象,反复接收事件对象并进行处理
- 继承了
ScheduledExecutorService
接口,本质上是个单线程的线程池 - 继承了 EventLoopGroup 事件循环组,组中可维护一个或多个事件循环对象,用来处理任务
- NioEventLoop 与 Channel 线程绑定,一旦负责某个 Channel 就会负责到底,不会被其他线程接手
- DefaultEventLoop 相比 NioEventLoop 没有处理 IO 事件的能力,但是可以协助做一些其他的工作
- EventLoop 会持续等待任务,需要调用
shutdownGracefully()
才会被关闭,通常配合Future
使用
# ChannelFuture
JDK 自带的 Future 功能比较简单,Netty 写了个增强的 Future 以及 Promise,可以更好的处理线程工作问题,Channel 的连接关闭方法都是异步,会返回 Netty 的 Future 对象,通过该对象实现同步效果
// 服务端的启动,客户端的启动,以及关闭连接通道都是异步操作,都会返回Future对象
ChannelFuture future = serverBootstrap.bind(HOST, PORT);
ChannelFuture future = bootstrap.connect(HOST, PORT);
ChannelFuture future = channel.closeFuture();
// 通过Future的回调可以在服务启动后、服务关闭后进行一些处理
future.addListener(fu -> {
// 必要情况下fu需要强转为ChannelFuture才可以使用
ChannelFuture future2 = (ChannelFuture) fu;
System.out.println("在这里编写业务逻辑");
});
// 不想使用回调的情况下,也可以通过 .sync 方法同步等待
serverBootstrap.bind(HOST, PORT).sync();
bootstrap.connect(HOST, PORT).sync();
channel.closeFuture().sync();
# Pipeline&Handler
# Pipeline 执行顺序
客户端也好服务端也罢,将代码结构搭建起来即可,Netty 的核心功能在于 pipeline 和 handler,pipeline 利用责任链设计模式,将每个功能抽象为一个个的 handler,Channel 接收到的 ByteBuf 数据会经过每一个 handler,handler 可自行决定是否对当前数据进行操作,handler 的执行顺序由 pipeline 的添加顺序决定
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new CunstomHandler1() {});
pipeline.addLast(new StringEncoder());
pipeline 通过 add 方法向责任链中添加处理器,常用的addLast
代表向责任链结尾添加,对应的也有addBefore
向头部添加,代码的顺序以及添加的位置决定最终的执行顺序:
- 在 Channel 中接收数据为入站,入站的数据会根据排好的顺序自上而下执行
- 在 Channel 中发送数据为出站,出站的数据会根据排号的顺序自下而上执行
# Handler 执行逻辑
而 Handler 处理器又分为两个类型:进站处理器(Inbound)和出站处理器(Outbound):
- 接收数据时会自上而下找入站处理器依次执行
- 发送数据时会自下而上找出站处理器依次执行
- 通过查看 Handler 的继承关系可以区分出是进站还是出站
除了使用 pipeline 手动添加进去的处理器之外,pipeline 的头尾分别内置了处理器 head 和 tail,如果 ByteBuf 没有被我们手动释放,那么当他们传到内置的 handler 中时,Netty 会帮我们释放
head → decode → handler → ... → handler → decode → tail
还有一点需要注意,如果在 handler 执行途中需要发送数据,有两种情况:
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 该方法发送数据会从当前位置向上找出站处理器,如果编码器在下面会出问题
ctx.writeAndFlush("Funny mud go pee");
// 该方法发送数据会从tail位置向上经过包括编码器在内的所有出站编码器
ctx.channel().writeAndFlush("Funny mud go pee");
}
});
# Handler 生命周期
Channel 的作用是建立网络连接通道,他有一套完整的生命周期:
- 注册 register:通道被注册到 EventLoop 循环事件对象,等待或准备连接
- 激活 active:连接建立成功,通道属于激活状态
- 读写交互 read/write:这部分就是由之前提到的 pipeline 中添加的 handler 进行处理,属于核心部分
- 非活跃 nactive:这个状态表示网络波动中断、或者某一方 close 关闭了连接
- 注销 unregistered:连接弃用,从 EventLoop 中取消注册
读写交互属于 Channel 生命周期的一部分,上面提到的 handler 全名是 ChannelHandler,他不仅可以实现读写,还可以检测并处理 Channel 中生命周期事件
入站处理器
/**
* LifecycleHandler 生命周期
*
* @author 张涵哲
* @since 2025-03-23 20:01:08
*/
public class LifecycleHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("Channel 已注册");
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("连接已建立");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("接收到数据: " + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("数据读取完成");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Channel 处于非激活状态");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
System.out.println("Channel 已注销");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("发生异常: " + cause.getMessage());
ctx.close();
}
}
出站处理器
/**
* LifecycleHandler 生命周期
*
* @author 张涵哲
* @since 2025-03-23 20:01:08
*/
public class LifecycleHandler extends ChannelOutboundHandlerAdapter {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
System.out.println("Channel 绑定到地址: " + localAddress);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
System.out.println("Channel 连接到地址: " + remoteAddress);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println("写入数据: " + msg);
}
@Override
public void flush(ChannelHandlerContext ctx) {
System.out.println("刷新数据到网络");
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Channel 关闭");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("发生异常: " + cause.getMessage());
}
}
# Handler 的共享
目前 Handler 的用法都是pipeline.addLast(new xxxHandler())
创建对象来使用的,如果当前 Handler 在其他地方也需要,将它单独提取出来作为成员变量使用,默认不允许这种写法是会报错的
Handler 本身是线程安全的,但如果在 Handler 中维护了成员变量多线程下就会有问题,直接共享会报错也是提醒开发者在共享之前处理好线程安全问题,如果已经明确解决线程安全问题,可以在 Handler 上使用注解让他支持共享
@ChannelHandler.Sharable // 使用该注解让Handler支持共享
public class XxxHandler extends ChannelInboundHandlerAdapter {
...
}
@ChannelHandler.Sharable
可以让大多数 Handler 支持共享使用,但还是有一部分 Handler 使用该注解也无法共享,例如用来处理黏包半包的帧解码器,如果当前收到的是半包的消息,Netty 会将该 ByteBuf 未读数据保留等待下次数据后合成完整消息,如果帧解码器被共享会导致消息解析异常,这点需要注意
# 常用 Handler 记录
# 黏包半包帧解码器
固定长度帧解码器
在特定场景下,服务端与客户端约定好每次收发数据长度都固定时,可以使用该解码器
// 每5KB拆分成一条消息
pipeline.addLast(new FixedLengthFrameDecoder(5 * 1024));
行帧解码器
LineBasedFrameDecoder
是换行符的行解码器,他可以读取\n
以及\r\n
并进行拆分,形成一条完整消息
// 参数1:单条消息的最大限制:10KB
pipeline.addLast(new LineBasedFrameDecoder(10 * 1024));
// 参数2:消息中是否移除换行符;参数3:是否快速异常
pipeline.addLast(new LineBasedFrameDecoder(10 * 1024, true, false));
DelimiterBasedFrameDecoder
为分隔符解码器,不同于之前的换行符解码器,该解码器可以使用任意字符作为分隔符
// 自定义消息分隔符
ByteBuf delimiter = Unpooled.buffer().writeBytes("\r\n".getBytes(StandardCharsets.UTF_8));
// 参数列表同 LineBasedFrameDecoder
pipeline.addLast(new DelimiterBasedFrameDecoder(10 * 1024, delimiter));
长度字段协议帧解码器
根据客户端与服务端约好的规则,在消息开头告诉 Netty 该消息有多长,然后按照给出的大小分配缓冲区
/**
* 参数列表介绍
* 1. byteOrder:采用大端序进行解码(Netty 默认排序)
* 2. maxFrameLength:最大支持10KB大小的单挑消息
* 3. lengthFieldOffset:从消息开头需要跳过几个字节才能找到长度信息
* 4. lengthFieldLength:长度信息本身占用几个字节
* 5. lengthAdjustment:从长度信息结尾来看,需要跳过几个字节才到正文
* 6. initialBytesToStrip:消息拆分完成后,需要从开头剥离几个字节(0表示保留整条消息)
* 7. failFast:是否快速异常,如果读取消息长度后发现消息长度大于maxFrameLength的情况下:
* 7.1 true:启用快速异常,检测无法接收完整消息后直接抛出异常,节省系统资源
* 7.2 false:关闭快速异常:仍然继续接收不断扩容,直至容量超出maxFrameLength后抛出异常
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 10 * 1024, 2, 2, 2, 0, true));
# 简单类型处理器
在一些业务场景下,消息解码并进行处理后,可能会拆分出多种业务对象:
// 在该Handler中分别将消息拆分为user对象,org对象以及post对象
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Optional.ofNullable(msg)
.map(item -> Convert.toMap(String.class, Object.class, item))
.filter(MapUtil::isNotEmpty)
.ifPresent(data -> {
SysUser user = new SysUser();
SysOrg org = new SysOrg();
SysPost post = new SysPost();
user.setId(MapUtil.getLong(data, "userId"));
user.setName(MapUtil.getStr(data, "userName"));
org.setId(MapUtil.getLong(data, "orgId"));
org.setName(MapUtil.getStr(data, "orgName"));
post.setId(MapUtil.getLong(data, "postId"));
post.setName(MapUtil.getStr(data, "postName"));
ctx.fireChannelRead(user);
ctx.fireChannelRead(org);
ctx.fireChannelRead(post);
});
}
});
上面的 handler 处理消息执行了一次,调用了三次ctx.fireChannelRead
分别传入不同类型的对象,就会产生后面的 handler 都会执行三次,每次接收到的对象都不一样,需要在 handler 中判断类型进行处理
// 在handler中判断对象类型处理业务逻辑
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SysUser) {
// 处理user业务
} else if (msg instanceof SysOrg) {
// 处理org业务
} else if (msg instanceof SysPost) {
// 处理post业务
}
}
});
// 或者写三个handler,每个进行单独判断,对业务进行解耦
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SysUser) {
// 处理user业务
}
}
});
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SysOrg) {
// 处理org业务
}
}
});
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SysPost) {
// 处理post业务
}
}
});
针对这种情况可以使用SimpleChannelInboundHandler<T>
简单入站处理器,通过泛型指定仅处理对应的数据
// user
pipeline.addLast(new SimpleChannelInboundHandler<SysUser>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SysUser user) throws Exception {}
});
// org
pipeline.addLast(new SimpleChannelInboundHandler<SysOrg>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SysOrg org) throws Exception {}
});
// post
pipeline.addLast(new SimpleChannelInboundHandler<SysPost>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SysPost post) throws Exception {}
});
# HTTP 编解码器
上面提到的各种处理处理黏包半包的处理器,无论是定长、分隔符、还是长度字段,都是在代码编写之前双方约定好的开发协议,除了双方约定协议编写代码之外,市面上有很多成型并广泛使用的协议,例如 HTTP 协议,他远比自己编写的服务端和客户端复杂的多,对于这种比较复杂且使用广泛的协议,Netty 提供了内置的 HTTP 编解码器HttpClientCodec
// 命名规范以Codec结尾,属于组合处理器,既能编码也能解码
// HttpClientCodec会将请求拆分为两个对象传递给后面,分别是HttpRequest请求头信息和HttpContent请求体
pipeline.addLast(new HttpClientCodec());
// pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {});
pipeline.addLast(new SimpleChannelInboundHandler<HttpContent>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpContent content) throws Exception {
// 读取请求体数据并打印
ByteBuf byteBuf = content.content();
String requestBodyJson = byteBuf.toString(StandardCharsets.UTF_8);
System.out.println(requestBodyJson);
// 构造响应对象
Map<String, Object> data = MapUtil.newHashMap();
data.put("code", 200);
data.put("msg", "success");
byte[] responseBytes = JSONUtil.toJsonStr(data).getBytes(StandardCharsets.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8")
.set(HttpHeaderNames.CONTENT_LENGTH, responseBytes.length);
// 返回响应数据
ctx.writeAndFlush(response);
}
});
# 自定义编解码器
之前用到的很多都是 Inbound 或者 Outbound 的单向处理器,HTTP 协议解码器则是双向解码器,放在 Pipeline 开头既可以解码数据,又可以在发送数据后进行编码,我们自己也可以创建双向的编解码器,自定义编解码器可选择继承两个父类:
ByteToMessageCodec<I>
/**
* 双向编解码器建议都以Codec结尾
*
* @author 张涵哲
* @since 2025-03-24 12:39:21
*/
public class CustomCodec extends ByteToMessageCodec<Map<String, Object>> {
@Override
protected void encode(ChannelHandlerContext ctx, Map<String, Object> msg, ByteBuf out) throws Exception {
// 消息编码逻辑
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 消息解码逻辑
}
}
该编解码器负责处理 ByteBuf 和 Java 对象的转换,涉及到黏包半包问题,所以该类不允许被共享使用
MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
该类为消息转换器类,可处理任意两种类型的数据转换,允许被共享,可以利用他来实现共享的编解码器
/**
* CustomCodec
*
* @author 张涵哲
* @since 2025-03-24 12:43:56
*/
@ChannelHandler.Sharable // 允许被共享
public class CustomCodec extends MessageToMessageCodec<ByteBuf, Map<String, Object>> {
@Override
protected void encode(ChannelHandlerContext ctx, Map<String, Object> msg, List<Object> out) throws Exception {
// 消息编码逻辑
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// 消息解码逻辑
}
}
# 读写双向处理器
自定义编解码器中提到了双向 Handler 的概念,通过继承xxxCodec
类的双向操作实现编解码,但 xxxCodec 是编解码器需要继承的父类,如果想写一个正常的双向 handler 可以直接使用或继承ChannelDuplexHandler
来实现效果
channel.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读数据事件
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 写数据事件
}
});
# 空闲检测&心跳
Netty 中提供了IdleStateHandler
类检测 Channel 连接通过空闲时间,例如 10 秒内没有收到消息会触发事件,10 秒内没有写出消息也会触发事件,也可以同时检测读写空闲时间,利用空闲时间检测可以实现心跳机制,防止连接假死占用系统资源```
/**
* IdleStateHandler这里使用全参数构造演示
* 1. readerIdleTime:读消息空闲时间,这里配置为10
* 2. writerIdleTime:写消息空闲时间,这里配置为0表示不检测
* 3. allIdleTime:读写双向空闲时间,这里配置为0表示不检测
* 4. timeUnit:前面配置的数值所对应的时间单位
*/
channel.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
IdleStateHandler
空闲检测触发会将事件传递给下一个 Handler 的事件触发器userEventTriggered
// 触发空闲检测
channel.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
// 创建Handler,在事件触发器中捕获事件,判断并进行处理
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 确保监听到的是空闲检测事件
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.READER_IDLE) {
// 长时间没有接收到消息
} else if (event.state() == IdleState.WRITER_IDLE) {
// 长时间没有发送消息
} else if (event.state() == IdleState.ALL_IDLE) {
// 长时间没有读写
} else {
System.out.println(evt);
}
}
}
});
# NIO 基础知识
PS: 懒得学,笔记不全,这部分看了也白看
NIO 三大组件:Channel 通道负责通信,ByteBuffer 负责承载数据,Selector 负责多路复用
# ByteBuffer
ByteBuffer 在 NIO 的 Channel 连接通道属于传输数据的载体,叫做字节缓冲区,里面有三个重要属性
- position:当前指针所在位置,NIO 的 Buffer 为单指针操作,读写都会基于该指针
- limit:读写边界位置
- 读模式时 limit 的位置会在最新写入的位置,只会在这个位置之内读取,超过这个范围读不到数据
- 写模式下与 capacity 一致,表示最多能把缓冲区写满
- capacity:缓冲区最大容量
注意:ByteBuffer 不支持动态扩容,一旦创建长度就会固定
# 方法 API 笔记
创建对象
方法名 | 作用 |
---|---|
ByteBuffer.allocate(64) | 指定缓冲区大小创建 |
ByteBuffer.allocateDirect(1024) | 指定缓冲区大小创建(系统内存,不受 JVM 控制) |
ByteBuffer.wrap(array) | 将数组包装为缓冲区对象 |
StandardCharsets.UTF_8.encode() | 将字符串编码为缓冲区对象 |
读写操作
方法名 | 作用 |
---|---|
buffer.get() | 读取数据(指定下标操作不走指针) |
channel.write(buffer) | 用 Channel 通道读取缓存区内的数据 |
buffer.put() | 写入数据(指定下标操作不走指针) |
channel.read(buffer) | Channel 通道内的数据写入到缓存区 |
指针操作
方法名 | 作用 |
---|---|
buffer.position() | 获取当前指针所在位置 |
buffer.limit() | 获取缓存区内数据的指针边界 |
buffer.capacity() | 获取缓存区最大容量 |
buffer.rewind() | 重置指针及标记,适用于重复读数据 |
buffer.mark() | 在某个位置设置标记,方便随时跳转回来 |
buffer.reset() | 跳转到之前标记的位置 |
模式切换 & 释放内存
方法名 | 作用 |
---|---|
buffer.flip() | 将 buffer 切换为读模式 |
buffer.clear() | 将 buffer 切换为写模式,放弃所有数据 |
buffer.compact() | 将 buffer 切换为写模式,保留未读数据 |
# Buffer&String 转换
public class ConvertDemo {
public static void main(String[] args) {
// stringToByteBuffer();
// byteBufferToString();
}
/**
* 字符串转ByteBuffer
*
* @author 张涵哲
* @since 2025-03-16 16:49:22
*/
public static void stringToByteBuffer() {
// 1. 将bytes数组添加进去,写入后buffer处于写模式
ByteBuffer buffer1 = ByteBuffer.allocate(8);
buffer1.put("12345678".getBytes(StandardCharsets.UTF_8));
// 2. 基于bytes数组创建,创建后buffer处于读模式
ByteBuffer buffer2 = ByteBuffer.wrap("12345678".getBytes(StandardCharsets.UTF_8));
// 3. 通过字符集工具类,创建后buffer处于读模式
ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("12345678");
// 打印buffer
ByteBufferUtil.debugAll(buffer1);
ByteBufferUtil.debugAll(buffer2);
ByteBufferUtil.debugAll(buffer3);
}
/**
* ByteBuffer转字符串
*
* @author 张涵哲
* @since 2025-03-16 16:49:35
*/
public static void byteBufferToString() {
// 获取ByteBuffer,该方式获取的buffer默认处于读模式
ByteBuffer buffer = StandardCharsets.UTF_8.encode("12345678");
// 1. 将buffer读到bytes数组中转换
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
// 2. 通过字符集工具类
buffer.rewind();
System.out.println(StandardCharsets.UTF_8.decode(buffer).toString());
}
}
# 练习作业代码
黏包半包
public class SplitMessage {
public static void main(String[] args) {
try (FileChannel fileChannel = FileChannel.open(Path.of("message.txt"), StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(128);
// 读取文件数据,写入到buffer中
while (fileChannel.read(buffer) != -1) {
// 切换为读模式
buffer.flip();
for (int index = buffer.position(); index < buffer.limit(); index++) {
// 读到换行符表示,截止到这有一条完整的消息
if ((char) buffer.get(index) == '\n') {
// 这里计算length要+1,别把\n落下
int length = index + 1 - buffer.position();
ByteBuffer allocate = ByteBuffer.allocate(length);
for (int i = 0; i < length; i++) {
allocate.put(buffer.get());
}
allocate.flip();
print(allocate);
}
}
// 保留未读数据,将buffer修改为写模式
buffer.compact();
}
buffer.flip();
print(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 打印结果
*
* @author 张涵哲
* @since 2025-03-16 17:28:03
*/
public static void print(ByteBuffer buffer) {
String string = StandardCharsets.UTF_8.decode(buffer).toString();
System.out.print(string);
}
}
Channel:TODO...
Selector:TODO...
# 调试工具类
- 笔记中使用的第三方工具类库均来自 hutool
- ByteBuffer & ByteBuf 的调试工具类,作者为黑马讲师满一航
package site.hanzhe.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
/**
* ByteBuffer & ByteBuf 的调试工具类
*
* @author Panwen Chen
* @since 2021/4/12 15:59
*/
public class NettyUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(buf, buffer);
System.out.println(buf);
}
}