ZHE知识库 ZHE知识库
首页
JAVA
中间件
易学
小说漫画
其他
Source (opens new window)
首页
JAVA
中间件
易学
小说漫画
其他
Source (opens new window)
  • JUC入门笔记
  • JVM入门笔记
  • Java微服务

  • Netty笔记
    • Bootstrap
      • 基础代码结构
      • 参数选项
    • ByteBuf
      • 方法 API 笔记
      • 内存释放
      • 零拷贝与深拷贝
    • EventLoop
    • ChannelFuture
    • Pipeline&Handler
      • Pipeline 执行顺序
      • Handler 执行逻辑
      • Handler 生命周期
      • Handler 的共享
    • 常用 Handler 记录
      • 黏包半包帧解码器
      • 简单类型处理器
      • HTTP 编解码器
      • 自定义编解码器
      • 读写双向处理器
      • 空闲检测&心跳
    • ~~NIO 基础知识~~
      • ByteBuffer
      • 方法 API 笔记
      • Buffer&String 转换
      • 练习作业代码
    • 调试工具类
  • Java
张涵哲
2025-03-16
目录

Netty笔记

# Bootstrap

# 基础代码结构

这里记录一份基础代码结构,用到的时候随时过来看一眼或者干脆直接复制,防止忘了怎么写

TCP服务端基础代码结构

public class SimpleNettyServer {

    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {
        // 创建两个事件循环组,boss处理连接事件,work处理消息读写,分工明确
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        // 创建服务器对象
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)  // 绑定线程组
                .channel(NioServerSocketChannel.class)  // 使用非阻塞的服务端SocketChannel作为连接建立通道
                .childHandler(new ChannelInitializer<NioSocketChannel>() {  // 通过NioSocketChannel进行通信
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 使用pipeline添加Handler用于处理消息以及执行业务逻辑
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("服务器收到消息:" + msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                        pipeline.addLast(new StringEncoder());
                    }
                });
        // 服务器绑定IP地址并启动(异步)
        ChannelFuture channelFuture = serverBootstrap.bind(HOST, PORT); // 可以 .sync() 同步
        // 在回调中编写连接建立后的业务逻辑
        channelFuture.addListener(sf -> {
            System.out.println("Netty 服务器启动成功:" + PORT);
            ChannelFuture successFuture = (ChannelFuture) sf;
            // 处理服务端关闭连接的善后工作
            successFuture.channel().closeFuture().addListener(closeFuture -> {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
                System.out.println("Netty 服务器已关闭");
            });
        });
    }

}

TCP客户端基础代码结构

public class SimpleNettyClient {

    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {
        // 客户端只管负责连接服务器,不需要考虑处理连接事件,单个事件循环组足矣
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        // 创建客户端对象
        Bootstrap bootstrap = new Bootstrap()
                .group(eventLoopGroup)  // 绑定线程组
                .channel(NioSocketChannel.class)  // 使用非阻塞的SocketChannel作为连接建立通道,不需要Server功能
                .handler(new ChannelInitializer<NioSocketChannel>() {  // 通过NioSocketChannel进行通信
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("收到服务器发回的消息:" + msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                        pipeline.addLast(new StringEncoder());
                    }
                });
        // 启动客户端连接服务端(异步)
        ChannelFuture channelFuture = bootstrap.connect(HOST, PORT);  // .sync() 可以同步
        // 在回调中编写连接建立后的业务逻辑
        channelFuture.addListener(cf -> {
            System.out.println("成功连接到服务器");
            ChannelFuture connectFuture = (ChannelFuture) cf;
            // 处理客户端关闭连接的善后工作
            connectFuture.channel().closeFuture().addListener(closeFuture -> {
                eventLoopGroup.shutdownGracefully();
                System.out.println("连接已关闭");
            });
        });
    }

}

# 参数选项

Netty 的 Bootstrap 可以配置一些参数对连接进行设置,这里有些注意事项需要说清楚

  1. ServerBootstrap 配置参数的方法有两个,分别是option和childOption,因为服务端有两种类型的连接通道需要关注,服务端本身是ServerSocketChannel,服务端自己成功启动才能与客户端建立连接,所以服务端的 option 用于配置服务端参数
  2. 服务端通道启动成功后会等待客户端连接,客户端发起连接后会通过SocketChannel建立连接通道进行通信,所以服务端的 childOption 用于配置客户端参数
  3. 客户端不需要考虑太多,只需要考虑自己本身的 Channel 通道即可,所以客户端的 option 方法是用于配置客户端参数
serverBootstrap
    .option(ChannelOption.SO_RCVBUF, 10 * 1024) // 设置TCP接收缓冲区大小
    // ------------------------------------------------------------------
    .childOption(ChannelOption.RCVBUF_ALLOCATOR, ...) // 设置ByteBuf缓冲区大小
// ----------------------------------------------------------------------
bootstrap
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000) // 连接超时时间10秒

# ByteBuf

ByteBuf 用于承载数据,为 Netty 提供的增强的缓冲区对象,主要有以下几点优势

  • 分别使用读指针(rIndex)和写指针(wIndex)省去读写模式反复切换的步骤
  • 4.1 版本后的 Netty 支持 ByteBuf 池化技术,对创建慢而读写快的直接内存(Direct)很友好
    • 对 4.1 以后的非安卓版本的 Netty 生效
    • 通过创建 ByteBuf 对象查看他的 Class 可以检查是否启用了池化技术
    • 控制池化是否启用:-Dio.netty.allocator.type={unpooled|pooled}
  • ByteBuf 对象默认容量为 256 字节,写满后会自动扩容,最大扩容容量为Integer.MAX_VALUE可自定义设置
    • 小于 512 字节的轻量级数据读写,触发扩容后会自动找到比当前容量大的下一个 16 的整数倍
    • 超过 512 字节的读写,会按照 2^n 次方扩容
  • ByteBuf 读写时注意字节序,是大端读写(默认)还是小端读写(LE)

# 方法 API 笔记

创建对象

方法名 作用
ByteBufAllocator.DEFAULT.buffer() 创建缓冲区对象(默认直接内存)
ByteBufAllocator.DEFAULT.heapBuffer() 创建堆内存缓冲区对象
ByteBufAllocator.DEFAULT.directBuffer() 创建直接内存缓冲区对象
Unpooled.buffer() 非池化的缓冲区对象(默认堆内存)
Unpooled.directBuffer() 非池化的直接内存缓冲区对象
ctx.alloc().buffer() 在 pipeline 的 handler 中推荐用这种方式创建缓冲区对象

读写操作

方法名 作用
buf.wirte*() 向缓冲区写入各种数据(Byte、Int、Double 等)
buf.set*() 指定下标向缓冲区写入各种数据,不会移动指针
buf.read*() 在缓冲区读取各种数据(Byte、Int、Double 等)
buf.get*() 指定下标在缓冲区读取各种数据,不会移动指针

指针操作

方法名 作用
buf.mark*Index() 在当前位置设置标记,可分别设置 Reader 读标记和 Writer 写标记
buf.reset*Index() 将读指针 Reader 或写指针 Writer 重置到之前设置标记的位置

# 内存释放

ByteBuf 分为三个主要实现:

  • UnpooledHeapByteBuf:非池化堆内存缓冲区,依靠 GC 垃圾回收机制释放内存
  • UnpooledDirectByteBuf:非池化直接内存缓冲区,GC 无法处理他的内存释放需要特殊处理
  • PooledByteBuf:池化的缓冲区对象,他的内存释放更复杂一些

方案 1:针对各种类型的 ByteBuf 的内存释放,他们实现了共同的接口ReferenceCounted,通过引用技术法使用统一规范的 API 来释放内存,每个 ByteBuf 都存有一个引用数量,当该 ByteBuf 的引用数量归 0 就会被回收

方案 2:Netty 的 pipeline 除了开发者手动添加的 handler 之外,他的头部和尾部内置了head和tail处理器,如果 ByteBuf 没有在开发者的 Handler 中被释放,而是一路传到了头尾,Netty 会帮咱们释放

方法名 作用
buf.retain() 引用计数器 +1
buf1.release() 引用计数器 -1
buf.refCnt() 获取当前计数器的引用数量
ReferenceCountUtil.release(buf) 无视计数器引用数量,释放内存

# 零拷贝与深拷贝

方法 类型 范围 内存 其他
buf.slice 零拷贝 自定义&全部 共享内存 建议计数器 +1 后自行维护
buf.duplicate 零拷贝 全部 共享内存 建议计数器 +1 后自行维护
buf.copy 深拷贝 自定义&全部 独立内存 独立维护
compositeBuffe 合并 —— 共享内存 合并后的 buf 与原 buf 同时释放
Unpooled.wrappedBuffer 合并 —— 未测试 未测试(非池化)

# EventLoop

EventLoop 事件循环对象,反复接收事件对象并进行处理

  • 继承了ScheduledExecutorService接口,本质上是个单线程的线程池
  • 继承了 EventLoopGroup 事件循环组,组中可维护一个或多个事件循环对象,用来处理任务
  • NioEventLoop 与 Channel 线程绑定,一旦负责某个 Channel 就会负责到底,不会被其他线程接手
  • DefaultEventLoop 相比 NioEventLoop 没有处理 IO 事件的能力,但是可以协助做一些其他的工作
  • EventLoop 会持续等待任务,需要调用shutdownGracefully()才会被关闭,通常配合Future使用

# ChannelFuture

JDK 自带的 Future 功能比较简单,Netty 写了个增强的 Future 以及 Promise,可以更好的处理线程工作问题,Channel 的连接关闭方法都是异步,会返回 Netty 的 Future 对象,通过该对象实现同步效果

// 服务端的启动,客户端的启动,以及关闭连接通道都是异步操作,都会返回Future对象
ChannelFuture future = serverBootstrap.bind(HOST, PORT);
ChannelFuture future = bootstrap.connect(HOST, PORT);
ChannelFuture future = channel.closeFuture();

// 通过Future的回调可以在服务启动后、服务关闭后进行一些处理
future.addListener(fu -> {
    // 必要情况下fu需要强转为ChannelFuture才可以使用
    ChannelFuture future2 = (ChannelFuture) fu;
    System.out.println("在这里编写业务逻辑");
});
// 不想使用回调的情况下,也可以通过 .sync 方法同步等待
serverBootstrap.bind(HOST, PORT).sync();
bootstrap.connect(HOST, PORT).sync();
channel.closeFuture().sync();

# Pipeline&Handler

# Pipeline 执行顺序

客户端也好服务端也罢,将代码结构搭建起来即可,Netty 的核心功能在于 pipeline 和 handler,pipeline 利用责任链设计模式,将每个功能抽象为一个个的 handler,Channel 接收到的 ByteBuf 数据会经过每一个 handler,handler 可自行决定是否对当前数据进行操作,handler 的执行顺序由 pipeline 的添加顺序决定

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new CunstomHandler1() {});
pipeline.addLast(new StringEncoder());

pipeline 通过 add 方法向责任链中添加处理器,常用的addLast代表向责任链结尾添加,对应的也有addBefore向头部添加,代码的顺序以及添加的位置决定最终的执行顺序:

  • 在 Channel 中接收数据为入站,入站的数据会根据排好的顺序自上而下执行
  • 在 Channel 中发送数据为出站,出站的数据会根据排号的顺序自下而上执行

# Handler 执行逻辑

而 Handler 处理器又分为两个类型:进站处理器(Inbound)和出站处理器(Outbound):

  • 接收数据时会自上而下找入站处理器依次执行
  • 发送数据时会自下而上找出站处理器依次执行
  • 通过查看 Handler 的继承关系可以区分出是进站还是出站

除了使用 pipeline 手动添加进去的处理器之外,pipeline 的头尾分别内置了处理器 head 和 tail,如果 ByteBuf 没有被我们手动释放,那么当他们传到内置的 handler 中时,Netty 会帮我们释放

head → decode → handler → ... → handler → decode → tail

还有一点需要注意,如果在 handler 执行途中需要发送数据,有两种情况:

pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 该方法发送数据会从当前位置向上找出站处理器,如果编码器在下面会出问题
        ctx.writeAndFlush("Funny mud go pee");
        // 该方法发送数据会从tail位置向上经过包括编码器在内的所有出站编码器
        ctx.channel().writeAndFlush("Funny mud go pee");
    }
});

# Handler 生命周期

Channel 的作用是建立网络连接通道,他有一套完整的生命周期:

  • 注册 register:通道被注册到 EventLoop 循环事件对象,等待或准备连接
  • 激活 active:连接建立成功,通道属于激活状态
  • 读写交互 read/write:这部分就是由之前提到的 pipeline 中添加的 handler 进行处理,属于核心部分
  • 非活跃 nactive:这个状态表示网络波动中断、或者某一方 close 关闭了连接
  • 注销 unregistered:连接弃用,从 EventLoop 中取消注册

读写交互属于 Channel 生命周期的一部分,上面提到的 handler 全名是 ChannelHandler,他不仅可以实现读写,还可以检测并处理 Channel 中生命周期事件

入站处理器

/**
 * LifecycleHandler 生命周期
 *
 * @author 张涵哲
 * @since 2025-03-23 20:01:08
 */
public class LifecycleHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("Channel 已注册");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("连接已建立");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("接收到数据: " + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("数据读取完成");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("Channel 处于非激活状态");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        System.out.println("Channel 已注销");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("发生异常: " + cause.getMessage());
        ctx.close();
    }

}

出站处理器

/**
 * LifecycleHandler 生命周期
 *
 * @author 张涵哲
 * @since 2025-03-23 20:01:08
 */
public class LifecycleHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        System.out.println("Channel 绑定到地址: " + localAddress);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        System.out.println("Channel 连接到地址: " + remoteAddress);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("写入数据: " + msg);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) {
        System.out.println("刷新数据到网络");
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Channel 关闭");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("发生异常: " + cause.getMessage());
    }

}

# Handler 的共享

目前 Handler 的用法都是pipeline.addLast(new xxxHandler())创建对象来使用的,如果当前 Handler 在其他地方也需要,将它单独提取出来作为成员变量使用,默认不允许这种写法是会报错的

Handler 本身是线程安全的,但如果在 Handler 中维护了成员变量多线程下就会有问题,直接共享会报错也是提醒开发者在共享之前处理好线程安全问题,如果已经明确解决线程安全问题,可以在 Handler 上使用注解让他支持共享

@ChannelHandler.Sharable // 使用该注解让Handler支持共享
public class XxxHandler extends ChannelInboundHandlerAdapter {
    ...
}

@ChannelHandler.Sharable可以让大多数 Handler 支持共享使用,但还是有一部分 Handler 使用该注解也无法共享,例如用来处理黏包半包的帧解码器,如果当前收到的是半包的消息,Netty 会将该 ByteBuf 未读数据保留等待下次数据后合成完整消息,如果帧解码器被共享会导致消息解析异常,这点需要注意

# 常用 Handler 记录

# 黏包半包帧解码器

固定长度帧解码器

在特定场景下,服务端与客户端约定好每次收发数据长度都固定时,可以使用该解码器

// 每5KB拆分成一条消息
pipeline.addLast(new FixedLengthFrameDecoder(5 * 1024));

行帧解码器

LineBasedFrameDecoder是换行符的行解码器,他可以读取\n以及\r\n并进行拆分,形成一条完整消息

// 参数1:单条消息的最大限制:10KB
pipeline.addLast(new LineBasedFrameDecoder(10 * 1024));

// 参数2:消息中是否移除换行符;参数3:是否快速异常
pipeline.addLast(new LineBasedFrameDecoder(10 * 1024, true, false));

DelimiterBasedFrameDecoder为分隔符解码器,不同于之前的换行符解码器,该解码器可以使用任意字符作为分隔符

// 自定义消息分隔符
ByteBuf delimiter = Unpooled.buffer().writeBytes("\r\n".getBytes(StandardCharsets.UTF_8));
// 参数列表同 LineBasedFrameDecoder
pipeline.addLast(new DelimiterBasedFrameDecoder(10 * 1024, delimiter));

长度字段协议帧解码器

根据客户端与服务端约好的规则,在消息开头告诉 Netty 该消息有多长,然后按照给出的大小分配缓冲区

/**
 * 参数列表介绍
 * 1. byteOrder:采用大端序进行解码(Netty 默认排序)
 * 2. maxFrameLength:最大支持10KB大小的单挑消息
 * 3. lengthFieldOffset:从消息开头需要跳过几个字节才能找到长度信息
 * 4. lengthFieldLength:长度信息本身占用几个字节
 * 5. lengthAdjustment:从长度信息结尾来看,需要跳过几个字节才到正文
 * 6. initialBytesToStrip:消息拆分完成后,需要从开头剥离几个字节(0表示保留整条消息)
 * 7. failFast:是否快速异常,如果读取消息长度后发现消息长度大于maxFrameLength的情况下:
 *    7.1 true:启用快速异常,检测无法接收完整消息后直接抛出异常,节省系统资源
 *    7.2 false:关闭快速异常:仍然继续接收不断扩容,直至容量超出maxFrameLength后抛出异常
 */
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 10 * 1024, 2, 2, 2, 0, true));

# 简单类型处理器

在一些业务场景下,消息解码并进行处理后,可能会拆分出多种业务对象:

// 在该Handler中分别将消息拆分为user对象,org对象以及post对象
pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Optional.ofNullable(msg)
            .map(item -> Convert.toMap(String.class, Object.class, item))
            .filter(MapUtil::isNotEmpty)
            .ifPresent(data -> {
                SysUser user = new SysUser();
                SysOrg org = new SysOrg();
                SysPost post = new SysPost();
                user.setId(MapUtil.getLong(data, "userId"));
                user.setName(MapUtil.getStr(data, "userName"));
                org.setId(MapUtil.getLong(data, "orgId"));
                org.setName(MapUtil.getStr(data, "orgName"));
                post.setId(MapUtil.getLong(data, "postId"));
                post.setName(MapUtil.getStr(data, "postName"));
                ctx.fireChannelRead(user);
                ctx.fireChannelRead(org);
                ctx.fireChannelRead(post);
            });
    }
});

上面的 handler 处理消息执行了一次,调用了三次ctx.fireChannelRead分别传入不同类型的对象,就会产生后面的 handler 都会执行三次,每次接收到的对象都不一样,需要在 handler 中判断类型进行处理

// 在handler中判断对象类型处理业务逻辑
pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof SysUser) {
            // 处理user业务
        } else if (msg instanceof SysOrg) {
            // 处理org业务
        } else if (msg instanceof SysPost) {
            // 处理post业务
        }
    }
});

// 或者写三个handler,每个进行单独判断,对业务进行解耦
pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof SysUser) {
            // 处理user业务
        }
    }
});
pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof SysOrg) {
            // 处理org业务
        }
    }
});
pipeline.addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof SysPost) {
            // 处理post业务
        }
    }
});

针对这种情况可以使用SimpleChannelInboundHandler<T>简单入站处理器,通过泛型指定仅处理对应的数据

// user
pipeline.addLast(new SimpleChannelInboundHandler<SysUser>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SysUser user) throws Exception {}
});
// org
pipeline.addLast(new SimpleChannelInboundHandler<SysOrg>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SysOrg org) throws Exception {}
});
// post
pipeline.addLast(new SimpleChannelInboundHandler<SysPost>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SysPost post) throws Exception {}
});

# HTTP 编解码器

上面提到的各种处理处理黏包半包的处理器,无论是定长、分隔符、还是长度字段,都是在代码编写之前双方约定好的开发协议,除了双方约定协议编写代码之外,市面上有很多成型并广泛使用的协议,例如 HTTP 协议,他远比自己编写的服务端和客户端复杂的多,对于这种比较复杂且使用广泛的协议,Netty 提供了内置的 HTTP 编解码器HttpClientCodec

// 命名规范以Codec结尾,属于组合处理器,既能编码也能解码
// HttpClientCodec会将请求拆分为两个对象传递给后面,分别是HttpRequest请求头信息和HttpContent请求体
pipeline.addLast(new HttpClientCodec());
// pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {});
pipeline.addLast(new SimpleChannelInboundHandler<HttpContent>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpContent content) throws Exception {
        // 读取请求体数据并打印
        ByteBuf byteBuf = content.content();
        String requestBodyJson = byteBuf.toString(StandardCharsets.UTF_8);
        System.out.println(requestBodyJson);
        // 构造响应对象
        Map<String, Object> data = MapUtil.newHashMap();
        data.put("code", 200);
        data.put("msg", "success");
        byte[] responseBytes = JSONUtil.toJsonStr(data).getBytes(StandardCharsets.UTF_8);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers()
            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8")
            .set(HttpHeaderNames.CONTENT_LENGTH, responseBytes.length);
        // 返回响应数据
        ctx.writeAndFlush(response);
    }
});

# 自定义编解码器

之前用到的很多都是 Inbound 或者 Outbound 的单向处理器,HTTP 协议解码器则是双向解码器,放在 Pipeline 开头既可以解码数据,又可以在发送数据后进行编码,我们自己也可以创建双向的编解码器,自定义编解码器可选择继承两个父类:

ByteToMessageCodec<I>

/**
 * 双向编解码器建议都以Codec结尾
 *
 * @author 张涵哲
 * @since 2025-03-24 12:39:21
 */
public class CustomCodec extends ByteToMessageCodec<Map<String, Object>> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Map<String, Object> msg, ByteBuf out) throws Exception {
        // 消息编码逻辑
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 消息解码逻辑
    }

}

该编解码器负责处理 ByteBuf 和 Java 对象的转换,涉及到黏包半包问题,所以该类不允许被共享使用

MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>

该类为消息转换器类,可处理任意两种类型的数据转换,允许被共享,可以利用他来实现共享的编解码器

/**
 * CustomCodec
 *
 * @author 张涵哲
 * @since 2025-03-24 12:43:56
 */
@ChannelHandler.Sharable  // 允许被共享
public class CustomCodec extends MessageToMessageCodec<ByteBuf, Map<String, Object>> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Map<String, Object> msg, List<Object> out) throws Exception {
        // 消息编码逻辑
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // 消息解码逻辑
    }

}

# 读写双向处理器

自定义编解码器中提到了双向 Handler 的概念,通过继承xxxCodec类的双向操作实现编解码,但 xxxCodec 是编解码器需要继承的父类,如果想写一个正常的双向 handler 可以直接使用或继承ChannelDuplexHandler来实现效果

channel.pipeline().addLast(new ChannelDuplexHandler() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 读数据事件
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 写数据事件
    }
});

# 空闲检测&心跳

Netty 中提供了IdleStateHandler类检测 Channel 连接通过空闲时间,例如 10 秒内没有收到消息会触发事件,10 秒内没有写出消息也会触发事件,也可以同时检测读写空闲时间,利用空闲时间检测可以实现心跳机制,防止连接假死占用系统资源```

/**
 * IdleStateHandler这里使用全参数构造演示
 * 1. readerIdleTime:读消息空闲时间,这里配置为10
 * 2. writerIdleTime:写消息空闲时间,这里配置为0表示不检测
 * 3. allIdleTime:读写双向空闲时间,这里配置为0表示不检测
 * 4. timeUnit:前面配置的数值所对应的时间单位
 */
channel.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));

IdleStateHandler空闲检测触发会将事件传递给下一个 Handler 的事件触发器userEventTriggered

// 触发空闲检测
channel.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
// 创建Handler,在事件触发器中捕获事件,判断并进行处理
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 确保监听到的是空闲检测事件
        if (evt instanceof IdleStateEvent event) {
            if (event.state() == IdleState.READER_IDLE) {
                // 长时间没有接收到消息
            } else if (event.state() == IdleState.WRITER_IDLE) {
                // 长时间没有发送消息
            } else if (event.state() == IdleState.ALL_IDLE) {
                // 长时间没有读写
            } else {
                System.out.println(evt);
            }
        }
    }
});

# NIO 基础知识

PS: 懒得学,笔记不全,这部分看了也白看

NIO 三大组件:Channel 通道负责通信,ByteBuffer 负责承载数据,Selector 负责多路复用

# ByteBuffer

ByteBuffer 在 NIO 的 Channel 连接通道属于传输数据的载体,叫做字节缓冲区,里面有三个重要属性

  • position:当前指针所在位置,NIO 的 Buffer 为单指针操作,读写都会基于该指针
  • limit:读写边界位置
    • 读模式时 limit 的位置会在最新写入的位置,只会在这个位置之内读取,超过这个范围读不到数据
    • 写模式下与 capacity 一致,表示最多能把缓冲区写满
  • capacity:缓冲区最大容量

注意:ByteBuffer 不支持动态扩容,一旦创建长度就会固定

# 方法 API 笔记

创建对象

方法名 作用
ByteBuffer.allocate(64) 指定缓冲区大小创建
ByteBuffer.allocateDirect(1024) 指定缓冲区大小创建(系统内存,不受 JVM 控制)
ByteBuffer.wrap(array) 将数组包装为缓冲区对象
StandardCharsets.UTF_8.encode() 将字符串编码为缓冲区对象

读写操作

方法名 作用
buffer.get() 读取数据(指定下标操作不走指针)
channel.write(buffer) 用 Channel 通道读取缓存区内的数据
buffer.put() 写入数据(指定下标操作不走指针)
channel.read(buffer) Channel 通道内的数据写入到缓存区

指针操作

方法名 作用
buffer.position() 获取当前指针所在位置
buffer.limit() 获取缓存区内数据的指针边界
buffer.capacity() 获取缓存区最大容量
buffer.rewind() 重置指针及标记,适用于重复读数据
buffer.mark() 在某个位置设置标记,方便随时跳转回来
buffer.reset() 跳转到之前标记的位置

模式切换 & 释放内存

方法名 作用
buffer.flip() 将 buffer 切换为读模式
buffer.clear() 将 buffer 切换为写模式,放弃所有数据
buffer.compact() 将 buffer 切换为写模式,保留未读数据

# Buffer&String 转换

public class ConvertDemo {

    public static void main(String[] args) {
        // stringToByteBuffer();
        // byteBufferToString();
    }

    /**
     * 字符串转ByteBuffer
     *
     * @author 张涵哲
     * @since 2025-03-16 16:49:22
     */
    public static void stringToByteBuffer() {
        // 1. 将bytes数组添加进去,写入后buffer处于写模式
        ByteBuffer buffer1 = ByteBuffer.allocate(8);
        buffer1.put("12345678".getBytes(StandardCharsets.UTF_8));
        // 2. 基于bytes数组创建,创建后buffer处于读模式
        ByteBuffer buffer2 = ByteBuffer.wrap("12345678".getBytes(StandardCharsets.UTF_8));
        // 3. 通过字符集工具类,创建后buffer处于读模式
        ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("12345678");
        // 打印buffer
        ByteBufferUtil.debugAll(buffer1);
        ByteBufferUtil.debugAll(buffer2);
        ByteBufferUtil.debugAll(buffer3);
    }

    /**
     * ByteBuffer转字符串
     *
     * @author 张涵哲
     * @since 2025-03-16 16:49:35
     */
    public static void byteBufferToString() {
        // 获取ByteBuffer,该方式获取的buffer默认处于读模式
        ByteBuffer buffer = StandardCharsets.UTF_8.encode("12345678");
        // 1. 将buffer读到bytes数组中转换
        byte[] bytes = new byte[buffer.limit()];
        buffer.get(bytes);
        System.out.println(new String(bytes, StandardCharsets.UTF_8));
        // 2. 通过字符集工具类
        buffer.rewind();
        System.out.println(StandardCharsets.UTF_8.decode(buffer).toString());
    }

}

# 练习作业代码

黏包半包

public class SplitMessage {

    public static void main(String[] args) {
        try (FileChannel fileChannel = FileChannel.open(Path.of("message.txt"), StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(128);
            // 读取文件数据,写入到buffer中
            while (fileChannel.read(buffer) != -1) {
                // 切换为读模式
                buffer.flip();
                for (int index = buffer.position(); index < buffer.limit(); index++) {
                    // 读到换行符表示,截止到这有一条完整的消息
                    if ((char) buffer.get(index) == '\n') {
                        // 这里计算length要+1,别把\n落下
                        int length = index + 1 - buffer.position();
                        ByteBuffer allocate = ByteBuffer.allocate(length);
                        for (int i = 0; i < length; i++) {
                            allocate.put(buffer.get());
                        }
                        allocate.flip();
                        print(allocate);
                    }
                }
                // 保留未读数据,将buffer修改为写模式
                buffer.compact();
            }
            buffer.flip();
            print(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 打印结果
     *
     * @author 张涵哲
     * @since 2025-03-16 17:28:03
     */
    public static void print(ByteBuffer buffer) {
        String string = StandardCharsets.UTF_8.decode(buffer).toString();
        System.out.print(string);
    }

}

Channel:TODO...

Selector:TODO...

# 调试工具类

  1. 笔记中使用的第三方工具类库均来自 hutool
  2. ByteBuffer & ByteBuf 的调试工具类,作者为黑马讲师满一航
package site.hanzhe.util;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;


/**
 * ByteBuffer & ByteBuf 的调试工具类
 *
 * @author Panwen Chen
 * @since 2021/4/12 15:59
 */
public class NettyUtil {

    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
        final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {
            HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {
                buf.append("   ");
            }
            HEXPADDING[i] = buf.toString();
        }

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
            StringBuilder buf = new StringBuilder(12);
            buf.append(StringUtil.NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {
            BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {
                buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {
            if (i <= 0x1f || i >= 0x7f) {
                BYTE2CHAR[i] = '.';
            } else {
                BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有内容
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {
        int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可读取内容
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {
        StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
        if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException(
                    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <= " + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {
            return;
        }
        dump.append(
                "         +-------------------------------------------------+" +
                        StringUtil.NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {
            int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(" |");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {
            int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append(" |");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(StringUtil.NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
        if (row < HEXDUMP_ROWPREFIXES.length) {
            dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {
            dump.append(StringUtil.NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
        return (short) (buffer.get(index) & 0xFF);
    }

    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(StringUtil.NEWLINE);
        ByteBufUtil.appendPrettyHexDump(buf, buffer);
        System.out.println(buf);
    }

}
分布式事务

← 分布式事务

Theme by Vdoing | Copyright © 2023-2025 Zhe | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式