努力写更多优质文章,欢迎关注CC_且听风吟~
一句话总结:Netty是一个异步的、基于事件驱动的网络应用框架。
Netty采用了改进的主从Reactor模式
Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
BossGroup和WorkerGroup类型都是NioEventLoopGroup
NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop
NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的Socket的网络通讯
NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
每个Boss NioEventLoop执行的步骤:
轮询accept事件处理accept事件,与client连接,生成NioSocketChannel,并将其注册到某个Worker上的Selector处理任务队列的任务runAllTasks每个Worker NioEventLoop循环执行的步骤
轮询read和write事件在对应的NioSocketChannel上处理I/O事件处理任务队列的任务,即runAllTasks每个Worker NioEventLoop在处理业务时,会使用pipeline管道;pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中包含了channel,即通过pipeline可以获取到对应通道
https://www.jianshu.com/p/38b56531565d
我们想要做一个服务端和客户端,配置相应的内容
另外还需要写自定义的事件处理器Handler
NettyServer服务端类
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) { // 创建bossGroup和workerGroup // 这两个都是无限循环 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("Server has ready"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(8888).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }NettyServerHandler服务端处理类
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义的Handler消息处理器 * 需要继承某些Netty规定好的handler类 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取数据 * @param ctx 上下文对象,包含pipeline和channel * @param msg 客户端发送的数据,默认是Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server ctx: " + ctx); // 将msg转换为ByteBuf (io.netty.buffer.ByteBuf提供的) ByteBuf buf = (ByteBuf) msg; System.out.println("msg: " + buf.toString(CharsetUtil.UTF_8)); System.out.println("address: " + ctx.channel().remoteAddress()); } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 将数据写入到缓存并且刷新 ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端返回消息...", CharsetUtil.UTF_8)); } /** * 处理异常,一般需要关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }NettyClient客户端类
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) { // 客户端只需要一个事件循环组 NioEventLoopGroup group = new NioEventLoopGroup(); try { // 创建客户端的启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 加入客户端事件处理器 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("Client has ready"); // 启动客户端去连接服务器端 ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }NettyClientHandler客户端处理类
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当通道就会触发该方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client ctx: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端消息", CharsetUtil.UTF_8)); } /** * 当通道有读取事件时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }输出结果如下:
Server has ready server ctx: ChannelHandlerContext(NettyServerHandler#0, [id: 0xf5af8a05, L:/127.0.0.1:8888 - R:/127.0.0.1:52216]) msg: 这是客户端消息 address: /127.0.0.1:52216 Client has ready client ctx: ChannelHandlerContext(NettyClientHandler#0, [id: 0x8b995363, L:/127.0.0.1:52216 - R:localhost/127.0.0.1:8888]) 服务器回复消息:这是客户端返回消息... 服务器地址:localhost/127.0.0.1:8888在Netty的线程模型里面,每个NioEventLoop都有一个TaskQueue任务队列
任务队列(类似于消息队列)可以有以下内容:
用户程序自定义的普通任务用户自定义的定时任务非当前Reactor线程调用Channel的各种方法比如推送系统的业务线程里面,根据用户标识找到对应的Channel引用然后调用Write类方法向该用户推送消息,就会使用到任务队列
定义任务提交到taskQueue中
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.TimeUnit; public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server READ事件"); // 创建一个自定义线程任务 ctx.channel().eventLoop().execute(()->{ try { TimeUnit.SECONDS.sleep(10); System.out.println("自定义任务结束..."); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("server 事件结束"); } }上面这个例子,server事件会立马结束,需要的任务会进入任务队列等待执行
自定义定时任务提交到scheduleTaskQueue中
ctx.channel().eventLoop().schedule( ()->System.out.println("5s后执行任务..."), 5, TimeUnit.SECONDS);服务端每获取到一个连接,就使用一个集合来保存这个连接的SocketChannel,当需要执行任务或者消息推送的时候,获取每个SocketChannel的channel然后加入这个需要执行的任务
Future-Listener 机制
对象刚刚创建时,处于未完成状态,此时可以通过返回的ChannelFuture来获取操作执行的状态,并且注册监听函数来执行某一步结束后的操作
// 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(8888).sync(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception{ if (future.isSuccess()){ System.out.println("监听端口成功..."); }else{ System.out.println("失败..."); } } });异步处理线程不会阻塞
一个Netty程序通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件
Bootstrap类是客户端的启动引导类,ServerBootstrap是服务端的启动引导类
// 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline().addLast(new NettyServerHandler()); } });handler和childHandler分别给bossGroup、workerGroup添加监听器
常用方法:
方法说明group(bossGroup, workerGroup)服务端绑定2个EventLoopgroup(workerGroup)客户端绑定1个EventLoopchannel()设置通道实现option()ServerChannel设置OptionchildOption()接收到的Channel设置Optionhandler()bossGroup设置handlerchildHandler()workerGroup设置handlerbind(port)服务端设置端口connect(ip, port)客户端连接服务器Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等他执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFuture来实现
注册一个监听任务,当操作执行成功或者失败时候会触发注册的监听事件。
常用方法:
方法说明channel()返回正在进行的I/O通道sync()将异步操作转换为同步操作Channel是Netty网络通信的组件,可以用于执行网络IO操作
使用Channel可以获得当前网络连接的通道的状态
可以获得网络连接的配置参数
提供异步的网络I/O操作,调用后立即返回一个ChannelFuture实例
不同协议不同的阻塞类型的连接都有不同的Channel与之对应
NioSocketChannel,异步的客户端TCP Socket连接NioServerSocketChannel,异步的服务器端TCP Socket连接NioDatagramChanne,异步的UDP连接NioSctpChannel,异步的客户端Sctp连接NioSctpServerChannel,异步的Sctp服务端连接使用Selector对象实现I/O多路复用,通过一个Selector线程可以监听多个连接的Channel事件
处理I/O事件或拦截I/O操作的handler,并将其转发到其ChannelPipline中的下一个处理程序
可以继承ChannelHandler的实现类来实现自定义的处理功能:
ChannelPipline是一个Handler集合,负责处理和拦截Inbound或者outbound的事件和操作
ChannelPipline实现了一种高级形式的拦截过滤器模式,使得用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互
保存Channel相关的上下文信息,真实类型是DefaultChannelHandlerContext
可以通过ctx获取pipline、channel、handler等信息
常用方法
Netty在创建Channel实例后,一般都需要设置ChannelOption参数
ChannelOption.SO_BACKLOG:对应TCP/IP协议中listen函数中的backlog参数,用来初始化服务器可连接队列大小ChannelOption.SO_KEEPALIVE:一直保持连接活动状态EventLoopGroup是一组EventLoop的抽象对象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例
Netty提供一个专门用来操作缓冲区ByteBuf的工具类
public class NettyByteBuf { public static void main(String[] args) { // 创建一个大小为10字节的buffer ByteBuf byteBuf = Unpooled.buffer(10); for (int i = 0; i < 10; i++) { byteBuf.writeByte(i); } // 不需要 翻转 // 输出 for (int i = 0; i < byteBuf.capacity(); i++) { System.out.println(byteBuf.readByte()); } } }这个buf不需要翻转,因为其底层维护了readerIndex和writerIndex两个值
当使用writeByte方法,writerIndex加1,同理当使用readByte方法,readerIndex加1
从字符串创建ByteBuf
public class NettyByteBuf { public static void main(String[] args) { ByteBuf byteBuf = Unpooled.copiedBuffer("你好!", CharsetUtil.UTF_8); if (byteBuf.hasArray()){ byte[] array = byteBuf.array(); System.out.println(new String(array, CharsetUtil.UTF_8)); System.out.println("byteBuf: " + byteBuf); System.out.println(byteBuf.arrayOffset()); System.out.println(byteBuf.readerIndex()); System.out.println(byteBuf.writerIndex()); System.out.println(byteBuf.capacity()); System.out.println(byteBuf.readByte()); System.out.println(byteBuf.readerIndex()); } } }心跳机制是服务端用来检测客户端的活动状态的手段,每隔一段时间,服务端会向客户端发送心跳包来确定客户端是否存活,存活的状态有三种:读空闲,写空闲,读写空闲
下面是Netty实现心跳检测机制的实例:
new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS); // 添加心跳检测处理器 /** * 心跳检测服务端 */ public class HeartBeatServer { private int PORT; public HeartBeatServer(int PORT) { this.PORT = PORT; } public void run(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline() .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()) .addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS)) // 添加心跳检测处理器 .addLast(new HeartBeatHandler()); } }); System.out.println("服务端就绪"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(PORT); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isSuccess()){ System.out.println("监听端口" +PORT+ "成功..."); }else{ System.out.println("监听端口失败..."); } } }); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { HeartBeatServer heartBeatServer = new HeartBeatServer(8888); heartBeatServer.run(); } }其中readerIdleTime、writerIdleTime、allIdleTime分别为3s、5s、7s
下面是心跳处理handler的实现:
/** * 心跳处理器 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { /** * 处理心跳事件 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent) evt; String msg = ""; switch (event.state()){ case READER_IDLE: msg = "读空闲"; break; case WRITER_IDLE: msg = "写空闲"; break; case ALL_IDLE: msg = "读写空闲"; break; } System.out.println(ctx.channel().remoteAddress() + " 发生了:" + msg); } } }效果如下:
需要注意的是,IdleStateHandler获取到的事件信息会传递到下一个handler的userEventTriggered方法进行处理,因此自定义的心跳时间处理器需要添加在IdleStateHandler之后
使用websocket实现长连接,这个技术经常用来做在线聊天。。
首先得有一个网页,上面是输入框,并且可以发送websocket请求
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>在线聊天</title> </head> <body> <label for="chat-msg">消息框:</label><textarea rows="10" readonly id="chat-msg" style="width: 200px;"></textarea> <button type="button" onclick="document.getElementById('chat-msg').value = ''">清空消息</button> <br> <label for="chat-input">输入框:</label><textarea rows="10" id="chat-input" style="width: 200px;"></textarea> <button type="button" onclick="send(document.getElementById('chat-input').value)">发送消息</button> </body> </html>这个网页使用js发送websocket请求到服务端即可,以下是js代码:
var socket; if (!window.WebSocket){ alert("浏览器不支持WebSocket!") }else{ socket = new WebSocket("ws://localhost:8888/chat"); // 这里绑定/chat // 接收到消息 socket.onmessage = function (message) { var element = document.getElementById("chat-msg"); element.value = element.value + "\n" + message.data; } socket.onopen = function () { var element = document.getElementById("chat-msg"); element.value = "连接已建立..." } socket.onclose = function () { var element = document.getElementById("chat-msg"); element.value = "连接关闭..." } socket.onerror = function () { var element = document.getElementById("chat-msg"); element.value = "连接异常..." } } function send(message) { if (!window.socket){ alert("websocket服务尚未启动!"); return; } if (socket.readyState === socket.OPEN){ // 连接已经建立,发送数据 socket.send(message); }else{ alert("websocket尚未连接!"); } }然后编写服务端:
/** * 使用websocket的长连接server */ public class WebSocketServer { private int PORT; public WebSocketServer(int PORT) { this.PORT = PORT; } public void run(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline() // Http解码器 .addLast(new HttpServerCodec()) // 块方式写处理器 .addLast(new ChunkedWriteHandler()) // 分段数据合并器 .addLast(new HttpObjectAggregator(8192)) // 定义websocket请求的uri,将http协议转换为ws协议,保持长连接 .addLast(new WebSocketServerProtocolHandler("/chat")) // 自定义处理器 .addLast(new WebSocketServerHandler()); } }); System.out.println("服务端就绪"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(PORT); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isSuccess()){ System.out.println("监听端口" +PORT+ "成功..."); }else{ System.out.println("监听端口失败..."); } } }); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { WebSocketServer webSocketServer = new WebSocketServer(8888); webSocketServer.run(); } }在服务端代码里面需要加入自定义的handler,下面是自定义websocket handler代码:
/** * 自定义websocket请求处理器 * 泛型类TextWebSocketFrame表示这个websocket传递的帧frame类型是text */ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { System.out.println("服务端收到消息:" + textWebSocketFrame.text()); // 转发消息 // 转发回去的类型是frame类型,因为websocket是以帧为单位进行传输的 channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now() + textWebSocketFrame.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("建立了新的连接:" + ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("断开了连接:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("发生了异常"); ctx.close(); } }需要注意的是,返回的消息都是以Frame的形式包装返回,因为WebSocket是基于帧为单位
接下来,启动服务端,打开网页:
以上就是Netty实现长连接的实例,如果想要发送给多人,就跟上面的群聊实例一样维护一个channelGroup然后再处理消息的分发即可