一文打穿Netty使用详解

    科技2024-10-16  32

    努力写更多优质文章,欢迎关注CC_且听风吟~

    文章目录

    NettyNetty是什么Netty应用场景 Netty线程模型Netty简易实例TaskQueue任务队列用户程序自定义的普通任务用户自定义的定时任务非当前线程调用Channel的各种方法 ChannelFuture异步模型Netty核心组件Bootstrap与ServerBootstrapFuture与ChannelFutureChannelSelectorChannelHandlerPipline和ChannelPiplineChannelHandlerContextChannelOptionEventLoopGroupUnpooled Netty群聊实例Netty心跳机制Netty长连接

    Netty

    Netty是什么

    一句话总结:Netty是一个异步的、基于事件驱动的网络应用框架。

    Netty应用场景

    Netty线程模型

    Netty采用了改进的主从Reactor模式

    Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

    BossGroup和WorkerGroup类型都是NioEventLoopGroup

    NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop

    NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的Socket的网络通讯

    NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop

    每个Boss NioEventLoop执行的步骤:

    轮询accept事件处理accept事件,与client连接,生成NioSocketChannel,并将其注册到某个Worker上的Selector处理任务队列的任务runAllTasks

    每个Worker NioEventLoop循环执行的步骤

    轮询read和write事件在对应的NioSocketChannel上处理I/O事件处理任务队列的任务,即runAllTasks

    每个Worker NioEventLoop在处理业务时,会使用pipeline管道;pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中包含了channel,即通过pipeline可以获取到对应通道

    https://www.jianshu.com/p/38b56531565d

    Netty简易实例

    我们想要做一个服务端和客户端,配置相应的内容

    另外还需要写自定义的事件处理器Handler

    NettyServer服务端类

    import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) { // 创建bossGroup和workerGroup // 这两个都是无限循环 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("Server has ready"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(8888).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

    NettyServerHandler服务端处理类

    import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 自定义的Handler消息处理器 * 需要继承某些Netty规定好的handler类 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取数据 * @param ctx 上下文对象,包含pipeline和channel * @param msg 客户端发送的数据,默认是Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server ctx: " + ctx); // 将msg转换为ByteBuf (io.netty.buffer.ByteBuf提供的) ByteBuf buf = (ByteBuf) msg; System.out.println("msg: " + buf.toString(CharsetUtil.UTF_8)); System.out.println("address: " + ctx.channel().remoteAddress()); } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 将数据写入到缓存并且刷新 ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端返回消息...", CharsetUtil.UTF_8)); } /** * 处理异常,一般需要关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

    NettyClient客户端类

    import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) { // 客户端只需要一个事件循环组 NioEventLoopGroup group = new NioEventLoopGroup(); try { // 创建客户端的启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 加入客户端事件处理器 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("Client has ready"); // 启动客户端去连接服务器端 ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }

    NettyClientHandler客户端处理类

    import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当通道就会触发该方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client ctx: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端消息", CharsetUtil.UTF_8)); } /** * 当通道有读取事件时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    输出结果如下:

    Server has ready server ctx: ChannelHandlerContext(NettyServerHandler#0, [id: 0xf5af8a05, L:/127.0.0.1:8888 - R:/127.0.0.1:52216]) msg: 这是客户端消息 address: /127.0.0.1:52216 Client has ready client ctx: ChannelHandlerContext(NettyClientHandler#0, [id: 0x8b995363, L:/127.0.0.1:52216 - R:localhost/127.0.0.1:8888]) 服务器回复消息:这是客户端返回消息... 服务器地址:localhost/127.0.0.1:8888

    TaskQueue任务队列

    在Netty的线程模型里面,每个NioEventLoop都有一个TaskQueue任务队列

    任务队列(类似于消息队列)可以有以下内容:

    用户程序自定义的普通任务用户自定义的定时任务非当前Reactor线程调用Channel的各种方法

    比如推送系统的业务线程里面,根据用户标识找到对应的Channel引用然后调用Write类方法向该用户推送消息,就会使用到任务队列

    用户程序自定义的普通任务

    定义任务提交到taskQueue中

    import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.TimeUnit; public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server READ事件"); // 创建一个自定义线程任务 ctx.channel().eventLoop().execute(()->{ try { TimeUnit.SECONDS.sleep(10); System.out.println("自定义任务结束..."); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("server 事件结束"); } }

    上面这个例子,server事件会立马结束,需要的任务会进入任务队列等待执行

    用户自定义的定时任务

    自定义定时任务提交到scheduleTaskQueue中

    ctx.channel().eventLoop().schedule( ()->System.out.println("5s后执行任务..."), 5, TimeUnit.SECONDS);

    非当前线程调用Channel的各种方法

    服务端每获取到一个连接,就使用一个集合来保存这个连接的SocketChannel,当需要执行任务或者消息推送的时候,获取每个SocketChannel的channel然后加入这个需要执行的任务

    ChannelFuture异步模型

    Future-Listener 机制

    对象刚刚创建时,处于未完成状态,此时可以通过返回的ChannelFuture来获取操作执行的状态,并且注册监听函数来执行某一步结束后的操作

    // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(8888).sync(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception{ if (future.isSuccess()){ System.out.println("监听端口成功..."); }else{ System.out.println("失败..."); } } });

    异步处理线程不会阻塞

    Netty核心组件

    Bootstrap与ServerBootstrap

    一个Netty程序通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件

    Bootstrap类是客户端的启动引导类,ServerBootstrap是服务端的启动引导类

    // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline().addLast(new NettyServerHandler()); } });

    handler和childHandler分别给bossGroup、workerGroup添加监听器

    常用方法:

    方法说明group(bossGroup, workerGroup)服务端绑定2个EventLoopgroup(workerGroup)客户端绑定1个EventLoopchannel()设置通道实现option()ServerChannel设置OptionchildOption()接收到的Channel设置Optionhandler()bossGroup设置handlerchildHandler()workerGroup设置handlerbind(port)服务端设置端口connect(ip, port)客户端连接服务器

    Future与ChannelFuture

    Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等他执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFuture来实现

    注册一个监听任务,当操作执行成功或者失败时候会触发注册的监听事件。

    常用方法:

    方法说明channel()返回正在进行的I/O通道sync()将异步操作转换为同步操作

    Channel

    Channel是Netty网络通信的组件,可以用于执行网络IO操作

    使用Channel可以获得当前网络连接的通道的状态

    可以获得网络连接的配置参数

    提供异步的网络I/O操作,调用后立即返回一个ChannelFuture实例

    不同协议不同的阻塞类型的连接都有不同的Channel与之对应

    NioSocketChannel,异步的客户端TCP Socket连接NioServerSocketChannel,异步的服务器端TCP Socket连接NioDatagramChanne,异步的UDP连接NioSctpChannel,异步的客户端Sctp连接NioSctpServerChannel,异步的Sctp服务端连接

    Selector

    使用Selector对象实现I/O多路复用,通过一个Selector线程可以监听多个连接的Channel事件

    ChannelHandler

    处理I/O事件或拦截I/O操作的handler,并将其转发到其ChannelPipline中的下一个处理程序

    可以继承ChannelHandler的实现类来实现自定义的处理功能:

    Pipline和ChannelPipline

    ChannelPipline是一个Handler集合,负责处理和拦截Inbound或者outbound的事件和操作

    ChannelPipline实现了一种高级形式的拦截过滤器模式,使得用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互

    ChannelHandlerContext

    保存Channel相关的上下文信息,真实类型是DefaultChannelHandlerContext

    可以通过ctx获取pipline、channel、handler等信息

    常用方法

    ChannelOption

    Netty在创建Channel实例后,一般都需要设置ChannelOption参数

    ChannelOption.SO_BACKLOG:对应TCP/IP协议中listen函数中的backlog参数,用来初始化服务器可连接队列大小ChannelOption.SO_KEEPALIVE:一直保持连接活动状态

    EventLoopGroup

    EventLoopGroup是一组EventLoop的抽象对象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例

    Unpooled

    Netty提供一个专门用来操作缓冲区ByteBuf的工具类

    public class NettyByteBuf { public static void main(String[] args) { // 创建一个大小为10字节的buffer ByteBuf byteBuf = Unpooled.buffer(10); for (int i = 0; i < 10; i++) { byteBuf.writeByte(i); } // 不需要 翻转 // 输出 for (int i = 0; i < byteBuf.capacity(); i++) { System.out.println(byteBuf.readByte()); } } }

    这个buf不需要翻转,因为其底层维护了readerIndex和writerIndex两个值

    当使用writeByte方法,writerIndex加1,同理当使用readByte方法,readerIndex加1

    从字符串创建ByteBuf

    public class NettyByteBuf { public static void main(String[] args) { ByteBuf byteBuf = Unpooled.copiedBuffer("你好!", CharsetUtil.UTF_8); if (byteBuf.hasArray()){ byte[] array = byteBuf.array(); System.out.println(new String(array, CharsetUtil.UTF_8)); System.out.println("byteBuf: " + byteBuf); System.out.println(byteBuf.arrayOffset()); System.out.println(byteBuf.readerIndex()); System.out.println(byteBuf.writerIndex()); System.out.println(byteBuf.capacity()); System.out.println(byteBuf.readByte()); System.out.println(byteBuf.readerIndex()); } } }

    Netty群聊实例

    服务端及其处理器 public class NettyChatServer { private int PORT; public NettyChatServer(int PORT) { this.PORT = PORT; } public void run(){ // 创建bossGroup和workerGroup // 这两个都是无限循环 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline().addLast(new NettyChatServerHandler()) .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()); } }); System.out.println("服务端就绪"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(PORT); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isSuccess()){ System.out.println("监听端口" +PORT+ "成功..."); }else{ System.out.println("监听端口失败..."); } } }); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { NettyChatServer server = new NettyChatServer(8888); server.run(); } } public class NettyChatServerHandler extends ChannelInboundHandlerAdapter { // 全局事件执行器,是一个单例对象 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 当连接建立后的处理器 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 将该channel加入聊天的信息推送给其他客户端 channelGroup.writeAndFlush(sdf.format(new Date()) + "[客户端]:" + channel.remoteAddress() + "加入了聊天"); // 将当前的channel加入到channelGroup里面 channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush(sdf.format(new Date()) + "[客户端]:" + channel.remoteAddress() + "退出了聊天"); } /** * 转发消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); ByteBuf buf = (ByteBuf) msg; // 遍历channelGroup处理消息 channelGroup.forEach(ch -> { if (channel != ch){ ch.writeAndFlush( sdf.format(new Date()) + "[" + channel.remoteAddress() + "]说:" + buf.toString(CharsetUtil.UTF_8)); }else { ch.writeAndFlush(sdf.format(new Date()) + "[自己]说:" + buf.toString(CharsetUtil.UTF_8)); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 客户端及其处理器 public class NettyChatClient { public static void main(String[] args) { // 客户端只需要一个事件循环组 NioEventLoopGroup group = new NioEventLoopGroup(); try { // 创建客户端的启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 加入客户端事件处理器 socketChannel.pipeline().addLast(new NettyChatClientHandler()) .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()); } }); System.out.println("客户端就绪"); // 启动客户端去连接服务器端 ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); Channel channel = future.channel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); channel.writeAndFlush(s); } future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); System.out.println("客户端关闭"); } } } public class NettyChatClientHandler extends ChannelDuplexHandler { /** * 当通道建立就会触发该方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接服务器成功!"); } /** * 当通道有读取事件时触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    Netty心跳机制

    心跳机制是服务端用来检测客户端的活动状态的手段,每隔一段时间,服务端会向客户端发送心跳包来确定客户端是否存活,存活的状态有三种:读空闲,写空闲,读写空闲

    下面是Netty实现心跳检测机制的实例:

    new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS); // 添加心跳检测处理器 /** * 心跳检测服务端 */ public class HeartBeatServer { private int PORT; public HeartBeatServer(int PORT) { this.PORT = PORT; } public void run(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline() .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()) .addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS)) // 添加心跳检测处理器 .addLast(new HeartBeatHandler()); } }); System.out.println("服务端就绪"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(PORT); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isSuccess()){ System.out.println("监听端口" +PORT+ "成功..."); }else{ System.out.println("监听端口失败..."); } } }); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { HeartBeatServer heartBeatServer = new HeartBeatServer(8888); heartBeatServer.run(); } }

    其中readerIdleTime、writerIdleTime、allIdleTime分别为3s、5s、7s

    下面是心跳处理handler的实现:

    /** * 心跳处理器 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { /** * 处理心跳事件 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent) evt; String msg = ""; switch (event.state()){ case READER_IDLE: msg = "读空闲"; break; case WRITER_IDLE: msg = "写空闲"; break; case ALL_IDLE: msg = "读写空闲"; break; } System.out.println(ctx.channel().remoteAddress() + " 发生了:" + msg); } } }

    效果如下:

    需要注意的是,IdleStateHandler获取到的事件信息会传递到下一个handler的userEventTriggered方法进行处理,因此自定义的心跳时间处理器需要添加在IdleStateHandler之后

    Netty长连接

    使用websocket实现长连接,这个技术经常用来做在线聊天。。

    首先得有一个网页,上面是输入框,并且可以发送websocket请求

    <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>在线聊天</title> </head> <body> <label for="chat-msg">消息框:</label><textarea rows="10" readonly id="chat-msg" style="width: 200px;"></textarea> <button type="button" onclick="document.getElementById('chat-msg').value = ''">清空消息</button> <br> <label for="chat-input">输入框:</label><textarea rows="10" id="chat-input" style="width: 200px;"></textarea> <button type="button" onclick="send(document.getElementById('chat-input').value)">发送消息</button> </body> </html>

    这个网页使用js发送websocket请求到服务端即可,以下是js代码:

    var socket; if (!window.WebSocket){ alert("浏览器不支持WebSocket!") }else{ socket = new WebSocket("ws://localhost:8888/chat"); // 这里绑定/chat // 接收到消息 socket.onmessage = function (message) { var element = document.getElementById("chat-msg"); element.value = element.value + "\n" + message.data; } socket.onopen = function () { var element = document.getElementById("chat-msg"); element.value = "连接已建立..." } socket.onclose = function () { var element = document.getElementById("chat-msg"); element.value = "连接关闭..." } socket.onerror = function () { var element = document.getElementById("chat-msg"); element.value = "连接异常..." } } function send(message) { if (!window.socket){ alert("websocket服务尚未启动!"); return; } if (socket.readyState === socket.OPEN){ // 连接已经建立,发送数据 socket.send(message); }else{ alert("websocket尚未连接!"); } }

    然后编写服务端:

    /** * 使用websocket的长连接server */ public class WebSocketServer { private int PORT; public WebSocketServer(int PORT) { this.PORT = PORT; } public void run(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象 ServerBootstrap(配置参数) ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 选择服务器通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 创建一个通道测试对象 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline设置处理器 protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自己的Handler socketChannel.pipeline() // Http解码器 .addLast(new HttpServerCodec()) // 块方式写处理器 .addLast(new ChunkedWriteHandler()) // 分段数据合并器 .addLast(new HttpObjectAggregator(8192)) // 定义websocket请求的uri,将http协议转换为ws协议,保持长连接 .addLast(new WebSocketServerProtocolHandler("/chat")) // 自定义处理器 .addLast(new WebSocketServerHandler()); } }); System.out.println("服务端就绪"); // 绑定一个端口并且同步 ChannelFuture future = bootstrap.bind(PORT); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isSuccess()){ System.out.println("监听端口" +PORT+ "成功..."); }else{ System.out.println("监听端口失败..."); } } }); // 对关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 异常处理结束关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { WebSocketServer webSocketServer = new WebSocketServer(8888); webSocketServer.run(); } }

    在服务端代码里面需要加入自定义的handler,下面是自定义websocket handler代码:

    /** * 自定义websocket请求处理器 * 泛型类TextWebSocketFrame表示这个websocket传递的帧frame类型是text */ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { System.out.println("服务端收到消息:" + textWebSocketFrame.text()); // 转发消息 // 转发回去的类型是frame类型,因为websocket是以帧为单位进行传输的 channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(LocalDateTime.now() + textWebSocketFrame.text())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("建立了新的连接:" + ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("断开了连接:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("发生了异常"); ctx.close(); } }

    需要注意的是,返回的消息都是以Frame的形式包装返回,因为WebSocket是基于帧为单位

    接下来,启动服务端,打开网页:

    以上就是Netty实现长连接的实例,如果想要发送给多人,就跟上面的群聊实例一样维护一个channelGroup然后再处理消息的分发即可

    Processed: 0.011, SQL: 8