基于Netty做一个简单的群聊服务端

    科技2023-09-16  122

    Netty做一个简单的群聊服务端

    一个服务端管理多个客户端进行通信。 例如: 1、 服务端监听 客户端上线,下线提醒。 2、 客户端加入群聊提醒。 3、 客户端发送消息,其他客户端都可以接收。


    演示:


    开始写代码

    1.引入依赖

    maven引入Netty依赖

    <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.34.Final</version> </dependency>

    2.服务端启动代码

    package com.liuqi.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 群聊服务器 * * <br> * create by liuqi 2020/10/6 **/ public class ChatServer { public final static int PORT = 8088; public static void main(String[] args) { /* 处理客户端连接的线程组 */ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); /* 处理读写的线程组 */ NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { /* 创建服务端的启动对象 */ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) // 设置 bossGroup workGroup这两个线程组 .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道实现 .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new StringDecoder()) // 字符串解码处理器 .addLast(new StringEncoder()) // 字符串编码处理器 .addLast(new ChatServerHandler()); // 自定义的处理器 } }); /* 绑定一个端口并且同步 */ ChannelFuture cf = serverBootstrap.bind(PORT).sync(); System.out.println("服务启动成功"); /* 阻塞当前线程 对通道关闭进行监听*/ cf.channel().closeFuture().sync(); } catch (Exception e) { System.out.println("启动失败 " + e.getMessage()); e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }

    3.对自定义 ChatServerHandler 处理器的编写

    服务器接收客户端连接事件,断开连接事件,接收客户端消息,发送消息给客户端的事件处理。 channel_group 管理所有的客户端通道,由这个集合群发消息给客户端。 package com.liuqi.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.Date; /** * * 聊天服务器 处理器 * 因为指定了 StringDecoder 解码处理器,SimpleChannelInboundHandler<T> 的泛型直接指定String类型 * 重写 channelRead0()方法接收到客户端发送的消息会解析成 String * * <br> * create by liuqi 2020/10/6 **/ public class ChatServerHandler extends SimpleChannelInboundHandler<String> { /** * 定义一个 ChannelGroup 管理所有的 Channel(客户端连接) * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 */ private final static ChannelGroup channel_group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 成功建立连接 * 第一次执行 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); /** * 会自动遍历 为每一个通道发送消息 */ channel_group.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 加入聊天"); /* 添加当前通道 */ channel_group.add(channel); } /** * 关闭连接 (调用 close(), 或终止连接 ) */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channel_group.writeAndFlush(sdf.format(new Date()) + "[客户端]" + channel.remoteAddress() + " 离开了"); System.out.println("channelGroup size " + channel_group.size()); // 会自动移除 // channel_group.remove(channel); } /** * 通道处于活跃状态 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(sdf.format(new Date()) + "[客户端]" + ctx.channel().remoteAddress() + " 上线了~"); } /** * 通道处于 不活跃状态 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(sdf.format(new Date()) + "[客户端]" + ctx.channel().remoteAddress() + " 离线了~"); } /** * 消息解码后读取 (也就是接收到的消息处理过了) */ protected void channelRead0(final ChannelHandlerContext ctx, String msg) throws Exception { channel_group.forEach(channel -> { if (channel != ctx.channel()) { channel.writeAndFlush(sdf.format(new Date()) + " " + channel.remoteAddress() + " 说: " + msg); } else { channel.writeAndFlush(sdf.format(new Date()) + " [我发送了] " + msg); } }); } /** * 通道异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("连接异常 " + cause.getMessage()); /* 关闭通道 */ ctx.close(); } }

    4.客户端启动代码

    启动客户端,连接到服务端成功后,进行阻塞,监听键盘输入。每输入一行就发送消息给服务端, 服务端在将消息群发给其他客户端。 注意:连接服务端成功后需要阻塞,不然会断开连接. 可以使用 channelFuture.channel().closeFuture().sync()进行阻塞. Channel channel = channelFuture.channel() 得到的通道可以保存起来,进行发送消息。 package com.liuqi.chat.client; import com.liuqi.chat.ChatServer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; /** * <br> * create by liuqi 2020/10/6 **/ public class ChatClient { public static void main(String[] args) { /* 客户端需要一个读写事件循环组 */ NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); /* 创建客户端启动对象 */ Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventExecutors) // 设置处理读写的线程组 .channel(NioSocketChannel.class) // 通道处理器 .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new StringDecoder()) // 字符串解码处理器 .addLast(new StringEncoder()) // 字符串编码处理器 .addLast(new ChatClientHandler()); // 自定义的处理器 } }); /* 绑定一个端口并且同步 */ ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", ChatServer.PORT).sync(); System.out.println("建立连接成功"); /* 得到连接的通道 */ Channel channel = channelFuture.channel(); /** * 阻塞 监听键盘输入, * 方便调试发送消息 */ Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); /* 通过channel 发送到服务器端 */ channel.writeAndFlush(msg); } } catch (Exception e) { System.out.println("启动客户端失败 " + e.getMessage()); e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } }

    5 ChatClientHandler 客户端的消息处理器

    这个客户端处理器很简单,就接收到服务器发送的消息,输出在控制台。 package com.liuqi.chat.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 聊天客户端处理 * * <br> * create by liuqi 2020/10/6 **/ public class ChatClientHandler extends SimpleChannelInboundHandler<String> { /** * 解码后读取数据 * @param ctx * @param msg * @throws Exception */ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } /** * 连接异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("连接异常 " + cause.getMessage()); ctx.close(); } }

    总结

    到此这个简单的群聊服务就做好了 可以多复制几个客户端启动进行调试。

    Processed: 0.018, SQL: 8