基于Netty实现TCP连接的服务端、客户端

    科技2022-07-11  85

    简介

    Netty是一个异步事件驱动的网络应用框架,可快速开发可维护的高性能协议服务器和客户端。基于NIO实现的高性能网络IO框架,极大简化基于常用网络协议的编程(TCP、UDP等)。

    架构模型

    完整版:

    根据自己的理解,简单版的工作模型架构图:

    如上图所示,Netty是基于主从Reactor的架构模型;主EventLoopGroup负责处理客户端的连接请求事件,客户端连接成功后交由从EventLoopGroup,具体的数据IO由从EventLoopGroup的EventLoop处理。

    EventLoop顾名思义为事件循环,它的工作就是循环监听事件;

    每个EventLoop中有自己的Channel(通道),Channel中包含Pipeline(管道),Pipeline是数据传输的管道,二者为一一对应关系;在Pipeline中可添加多个handler,对Pipeline中的出站、入站数据处理,所有handler由Pipeline中的一个双向链表管理,入站数据顺着链表顺序依次被所有的ChannelInboundHandler处理,出站数据逆着链表顺序被所有的ChannelOutboundHandler处理。

    入站:对服务端或者客户端来说,数据传入则为入站

    出站:对服务端或者客户端来说,数据写出则为出站

    比如,客户端向服务端发送数据时,对客户端来说是出站,对服务端来说是入站。

    主要模块

    BootStrap、ServerBootStrap

    Netty的启动器类,将多个EventLoopGroup组合起来,并引导其启动运行。

    NioEventLoopGroup

    类似一个线程池,管理内部的EventLoop的调度。

    NioEventLoop

    顾名思义,是一个事件循环,其工作就是循环的获取连接设备的事件。其内部维护一个Selector用来监听Channel,和一个TaskQueue用于处理IO事件。

    Channel

    数据传输的通道,更多的是对数据传输通道的状态、行为控制;

    ChannelFuture

    异步任务的返回值,相当于是一个票据,凭票据可以在未来获取异步任务的执行结果。和JUC中的Future一样。

    ChannelPipeline

    管理出站入站的各类ChannelHandler,将管理的ChannelHandler应用到对应通道的出站入站数据,对数据进行处理。

    ChannelHandler

    处理具体的IO事件,一般用来对IO数据流编解码、转换、处理等操作。


    下面开始实操,实现一个TCP服务器

    服务端实现

    启动引导类实现
    public class ChatServer { private Integer port; private ServerBootstrap serverBootstrap; public ChatServer(Integer port) { if (null == port) { throw new NettyChatException("服务端口为空"); } this.port = port; this.serverBootstrap = setup(); } private ServerBootstrap setup() { //负责接收客户端连接请求的group EventLoopGroup bossGroup = new NioEventLoopGroup(1); //负责和客户端数据交互的group EventLoopGroup workerGroup = new NioEventLoopGroup(); //ServerBootstrap是一个启动器类,用来启动EventLoopGroup return new ServerBootstrap().group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //添加解码器,将数据从byte解析为对象 pipeline.addLast(new ByteToChatMessageDecoder()); //自定义handler,处理业务相关 pipeline.addLast(new ServerChatMessageReadHandler()); } }); } public void run() throws InterruptedException { serverBootstrap.bind(port).sync() .channel().closeFuture().sync(); } }
    自定义解码器,解决粘包、拆包

    什么是粘包、拆包?

    通过TCP协议进行数据发送时候,由于数据的大小不一定,数据太小时有可能会将多个数据包一起发送,数据太大有可能会把数据包拆分后发送。但是对于数据接收端来说,并不知道数据究竟是如何划分,有可能接收的是一个被拆分后的数据包,也有可能是一个由多个数据包拼接一起的数据包,这就是拆包和粘包。

    Netty实现的粘包、拆包解决方案

    FixedLengthFrameDecoder 定长读取LineBasedFrameDecoder 按行读取DelimiterBasedFrameDecoder 按自定义分割符读取LengthFieldBasedFrameDecoder 将数据包的长度作为头部传输,接收方得到长度后在按长度读取

    下面的自定义解码器,解决粘包、拆包的方案类似于LengthFieldBasedFrameDecoder,通过自定义协议将数据长度和数据一起发送。

    //主要功能: //1.将接收到的ByteBuf转换为数据字符串 //2.解决粘包、拆包的问题 //3.将处理后的对象交到handler链中下一个handler处理 public class ByteToChatMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in == null) return; //因为使用自定义协议解决粘包、拆包问题,自定义协议内部先传输数据长度,所以需在这里先接收长度 if (in.readableBytes() <= 4) return;; in.markReaderIndex(); int len = in.readInt(); if (in.readableBytes() < len) { in.resetReaderIndex(); return; } byte[] bytes = new byte[len]; in.readBytes(bytes); String json = new String(bytes, CharsetUtil.UTF_8); in.markReaderIndex(); //将处理后的数据,交由handler链中的下一个处理 out.add(json); } } public class ServerChatMessageReadHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //得到在上一个Handler中处理完成的数据,用于实现自己的业务逻辑 } }

    客户端

    启动引导类实现
    public class ChatClient { private String host; private Integer port; private Bootstrap bootstrap; public ChatClient(String host, Integer port) { this.host = host; this.port = port; this.bootstrap = setup(); } private Bootstrap setup() { EventLoopGroup group = new NioEventLoopGroup(); return new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChatMessageToByteEncoder()); pipeline.addLast(new ClientChatMessageSendHandler()); } }); } public void run() throws InterruptedException { bootstrap.connect(host, port).sync() .channel().closeFuture().sync(); } }
    自定义编码器
    //业务处理类 //通过该类发送数据后,数据会交由handler链中的上一个handler处理(出站数据逆序执行) public class ClientChatMessageSendHandler extends ChannelInboundHandlerAdapter { private Channel channel; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); channel = ctx.channel(); ChatTextMessage message = new ChatTextMessage("今天天气下雨了,有点微冷☔️!!!"); message.setFromIp("67.5.5.4"); message.setFromUid("7834875435"); for (int i = 0; i < 100; i++) { sendMessage(message); } } public void sendMessage(TransferPackage transferPackage) { if (channel != null && channel.isWritable()) { TransferPackageWrap transfer = new TransferPackageWrap<>(transferPackage); channel.writeAndFlush(transfer); } } }

    为了解决服务端(接收端)的粘包、拆包问题,通过自定义协议发送数据长度和数据包到接收端。

    public class ChatMessageToByteEncoder extends MessageToByteEncoder<TransferPackageWrap> { @Override protected void encode(ChannelHandlerContext ctx, TransferPackageWrap msg, ByteBuf out) throws Exception { //这里发送数据顺序必须和数据接收端(服务端)的接收顺序保持一致(TCP保证数据顺序) //发送数据包长度 out.writeInt(msg.getLength()); //发送数据包 out.writeBytes(msg.getBytes()); } }

    代码附件

    Processed: 0.029, SQL: 8