netty解决粘包和半包

    科技2025-12-17  12

    一、使用系统分隔符

    客户端

    /** * 作者:yun * 创建日期:2018/08/26 * 类说明: */ public class LineBaseEchoClient { private final String host; public LineBaseEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap();;/*客户端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/ .remoteAddress(new InetSocketAddress(host,LineBaseEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已连接到服务器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { // 加分割符:系统回车符 ch.pipeline().addLast(new LineBasedFrameDecoder(1024*100000)); ch.pipeline().addLast(new LineBaseClientHandler()); } } public static void main(String[] args) throws InterruptedException { new LineBaseEchoClient("127.0.0.1").start(); } }

    客户端handler

    /** * 作者:yun * 创建日期:2018/08/26 * 类说明: */ public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,Peter,James,Deer" + System.getProperty("line.separator"); for(int i=0;i<100;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    服务端

    /** * 作者:yun * 创建日期:2018/08/25 * 类说明: */ public class LineBaseEchoServer { public static final int PORT = 9998; public static void main(String[] args) throws InterruptedException { LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer(); System.out.println("服务器即将启动"); lineBaseEchoServer.start(); } public void start() throws InterruptedException { final LineBaseServerHandler serverHandler = new LineBaseServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseServerHandler()); } } }

    服务端handler

    /** * 作者:yun * 创建日期:2018/08/25 * 类说明:自己的业务处理 */ @ChannelHandler.Sharable public class LineBaseServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); /*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + System.getProperty("line.separator"); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    二、使用自定义分隔符

    无论是客户端还是服务端只有ChannelInitializerImp和DelimiterClientHandler有区别,其它和第一种方法的代码完全一样

    ChannelInitializerImp有改变的代码:

    //自定义的分割符 public static final String DELIMITER_SYMBOL = "MarkJames"; private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //1) 加分割符 自定义的 ByteBuf delimiter = Unpooled.copiedBuffer( DelimiterEchoServer.DELIMITER_SYMBOL.getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,delimiter)); ch.pipeline().addLast(new DelimiterClientHandler()); } }

    DelimiterClientHandler有改变的代码:

    /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,Peter,James,Deer" + DelimiterEchoServer.DELIMITER_SYMBOL; for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } }

    三、使用定长消息

    无论是客户端还是服务端只有ChannelInitializerImp和DelimiterClientHandler有区别,其它和第一种方法的代码完全一样

    ChannelInitializerImp有改变的代码:

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //消息定长 ch.pipeline().addLast( new FixedLengthFrameDecoder( FixedLengthEchoServer.RESPONSE.length())); ch.pipeline().addLast(new FixedLengthClientHandler()); } }

    DelimiterClientHandler有改变的代码:

    /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; for(int i=0;i<10;i++){ msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length()); msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes()); ctx.writeAndFlush(msg); } }

    四、使用发送数据和数据长度

    Processed: 0.014, SQL: 9