咖啡汪日志————Netty 4.1.45.Final新手引导之Echo服务的简单演示

    科技2026-01-03  13

    作为不是在逗比,都是在逗比路上的二哈 节假日也不能闲 从今天开始 本汪要进入Netty的拆家行当了,哈哈 解析源码,分析底层参数配置,对性能的影响,熟悉netty的各种使用场景,顺便了解下装配器模式

    Echo服务的各组件名称和作用 1)EventLoop和EventLoopGroup 线程和线程组 NIOEventLoopGroup就是一个线程池实现 2)Bootstrapt启动引导类 数据的预加载,参数配置,不同使用端启动类不同 3)Channel 生命周期,状态变化 4)ChannelHandler和ChannelPipline ChannelHandler对Channel的处理类,日志打点,读取连接中的数据,返回数据 ChannelInboundHandler等类的基类,其中方法和接口: 处理者的添加,移除,异常捕获

    void handlerAdded(ChannelHandlerContext var1) throws Exception; void handlerRemoved(ChannelHandlerContext var1) throws Exception; void exceptionCaught(ChannelHandlerContext var1, Throwable var2) public @interface Sharable{}

    ChannelPipeline —— ChannelHandler的装配器,内部为双向链表,用于维护通道管理者ChannelHandler。

    一、服务器端:

    1.EchoServerHandler

    import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * 服务端Handler——管理者 * ** * * @author: Yuezejian Created in 2020/10/8 上午10:46 * @modified By: */ public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; System.out.println("服务端接收数据:" + data.toString(CharsetUtil.UTF_8)); //TODO:write将数据写入本地缓存通道中,flush刷新推送 ctx.writeAndFlush(data); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("EchoServerHandle channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //TODO:出问题时,应该进行异常源信息的日志记录 cause.printStackTrace(); ctx.close(); } }

    2.EchoServer 服务端启动类

    import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 回显服务 服务端 * ** * * @author: Yuezejian Created in 2020/10/8 上午10:46 * @modified By: */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void run() throws InterruptedException { /** *创建两个线程组bossGroup和workGroup,bossGroup负责处理请求连接,workGroup负责数据的处理 *两个都是无线循环 *调用可构造方法,默认的子线程数NioEventLoopGroup是实际cpu核数*2 */ //TODO:EventLoopGroup(事件环组) 包含一组 EventLoop,Channel 通过注册到 EventLoop 中执行操作 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { //创建启动器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //添加父线程组和子线程组 serverBootstrap.group(bossGroup, workGroup) //使用NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象 //给pipeline设置处理器 @Override protected void initChannel(SocketChannel sc) throws Exception { //调用处理器 sc.pipeline().addLast(new EchoServerHandler()); } }); System.out.println("Echo 服务器启动"); //启动服务器并绑定端口,绑定端口并同步,创建一个ChannelFuture对象 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { //释放线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException{ int port = 8080; if (args != null && args.length > 0){ port = Integer.valueOf(args[0]); } new EchoServer(port).run(); }

    启动简单的回显服务,进行连接测试:

    二、客户端

    1.EchoClientHandler

    import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * 客户端Handler * ** * * @author: Yuezejian Created in 2020/10/8 下午5:14 * @modified By: */ public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception { System.out.println("Client received :" + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Active"); //TODO:写入本地缓存通道中,并刷新推送 ctx.writeAndFlush(Unpooled.copiedBuffer("HuskyYue Netty Project",CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("Echo client channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    2.回显服务客户端 EchoClient

    import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.net.InetSocketAddress; /** * 回显服务 客户端 * ** * * @author: Yuezejian Created in 2020/10/8 下午4:55 * @modified By: */ public class EchoClient { private String host; private int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress( new InetSocketAddress(host,port)) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel( SocketChannel sc) throws Exception { // Netty它预设置了很多好用的 ChannelHandler, // 其中就有一个 StringEncoder():将 String 转换为可以传输的字节类型。 // 相对应的 StringDecoder():将字节流转换为 String 对象。 sc.pipeline().addLast(new EchoClientHandler()) .addLast(new StringEncoder()) .addLast(new StringDecoder()); } }); //TODO:连接到服务器,connect是异步连接,再调用同步等待sync,等待连接成功 ChannelFuture channelFuture = bootstrap.connect().sync(); //TODO:阻塞住直到客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { //释放NIO线程 group.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoClient("127.0.0.1",8080).start(); } }

    使用客户端进行连接,进行简单的测试:

    Processed: 0.021, SQL: 9