Netty是一个异步事件驱动的网络应用框架,可快速开发可维护的高性能协议服务器和客户端。基于NIO实现的高性能网络IO框架,极大简化基于常用网络协议的编程(TCP、UDP等)。
完整版:
根据自己的理解,简单版的工作模型架构图:
如上图所示,Netty是基于主从Reactor的架构模型;主EventLoopGroup负责处理客户端的连接请求事件,客户端连接成功后交由从EventLoopGroup,具体的数据IO由从EventLoopGroup的EventLoop处理。
EventLoop顾名思义为事件循环,它的工作就是循环监听事件;
每个EventLoop中有自己的Channel(通道),Channel中包含Pipeline(管道),Pipeline是数据传输的管道,二者为一一对应关系;在Pipeline中可添加多个handler,对Pipeline中的出站、入站数据处理,所有handler由Pipeline中的一个双向链表管理,入站数据顺着链表顺序依次被所有的ChannelInboundHandler处理,出站数据逆着链表顺序被所有的ChannelOutboundHandler处理。
入站:对服务端或者客户端来说,数据传入则为入站
出站:对服务端或者客户端来说,数据写出则为出站
比如,客户端向服务端发送数据时,对客户端来说是出站,对服务端来说是入站。
BootStrap、ServerBootStrap
Netty的启动器类,将多个EventLoopGroup组合起来,并引导其启动运行。
NioEventLoopGroup
类似一个线程池,管理内部的EventLoop的调度。
NioEventLoop
顾名思义,是一个事件循环,其工作就是循环的获取连接设备的事件。其内部维护一个Selector用来监听Channel,和一个TaskQueue用于处理IO事件。
Channel
数据传输的通道,更多的是对数据传输通道的状态、行为控制;
ChannelFuture
异步任务的返回值,相当于是一个票据,凭票据可以在未来获取异步任务的执行结果。和JUC中的Future一样。
ChannelPipeline
管理出站入站的各类ChannelHandler,将管理的ChannelHandler应用到对应通道的出站入站数据,对数据进行处理。
ChannelHandler
处理具体的IO事件,一般用来对IO数据流编解码、转换、处理等操作。
下面开始实操,实现一个TCP服务器
什么是粘包、拆包?
通过TCP协议进行数据发送时候,由于数据的大小不一定,数据太小时有可能会将多个数据包一起发送,数据太大有可能会把数据包拆分后发送。但是对于数据接收端来说,并不知道数据究竟是如何划分,有可能接收的是一个被拆分后的数据包,也有可能是一个由多个数据包拼接一起的数据包,这就是拆包和粘包。
Netty实现的粘包、拆包解决方案
FixedLengthFrameDecoder 定长读取LineBasedFrameDecoder 按行读取DelimiterBasedFrameDecoder 按自定义分割符读取LengthFieldBasedFrameDecoder 将数据包的长度作为头部传输,接收方得到长度后在按长度读取下面的自定义解码器,解决粘包、拆包的方案类似于LengthFieldBasedFrameDecoder,通过自定义协议将数据长度和数据一起发送。
//主要功能: //1.将接收到的ByteBuf转换为数据字符串 //2.解决粘包、拆包的问题 //3.将处理后的对象交到handler链中下一个handler处理 public class ByteToChatMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in == null) return; //因为使用自定义协议解决粘包、拆包问题,自定义协议内部先传输数据长度,所以需在这里先接收长度 if (in.readableBytes() <= 4) return;; in.markReaderIndex(); int len = in.readInt(); if (in.readableBytes() < len) { in.resetReaderIndex(); return; } byte[] bytes = new byte[len]; in.readBytes(bytes); String json = new String(bytes, CharsetUtil.UTF_8); in.markReaderIndex(); //将处理后的数据,交由handler链中的下一个处理 out.add(json); } } public class ServerChatMessageReadHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //得到在上一个Handler中处理完成的数据,用于实现自己的业务逻辑 } }为了解决服务端(接收端)的粘包、拆包问题,通过自定义协议发送数据长度和数据包到接收端。
public class ChatMessageToByteEncoder extends MessageToByteEncoder<TransferPackageWrap> { @Override protected void encode(ChannelHandlerContext ctx, TransferPackageWrap msg, ByteBuf out) throws Exception { //这里发送数据顺序必须和数据接收端(服务端)的接收顺序保持一致(TCP保证数据顺序) //发送数据包长度 out.writeInt(msg.getLength()); //发送数据包 out.writeBytes(msg.getBytes()); } }代码附件