netty~构建一个基于netty的客户端和服务端

    科技2022-08-14  116

    描述:本案例将构建一个基于netty的客户端和服务端

    项目maven依赖:

    <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.52.Final</version> </dependency>

    1.服务端消息处理

    /** * @describe 服务端消息读取处理 * @Sharable 注解标注可以被多个Channel安全的共享 * */ @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 所有传入的消息都会调用 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("服务器收到消息"+in.toString(CharsetUtil.UTF_8)); //将接收到的消息写入channel中 ctx.write(in); } /** * 通知ChannelInboundHandler 最后一次对channelRead() 的调用时当前批量读取的最后一条消息 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //将未决消息发送至远程节点,并关闭channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /** * 在读取操作期间如果有异常抛出时则会调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //关闭channel ctx.close(); } }

    2.服务端构建

    /** * @describe: 服务端 * @author: houkai */ public class EchoServer { private static final int port = 9999; public static void main(String[] args) throws InterruptedException { EchoServer server = new EchoServer(); server.start(); } private void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); //创建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(group) //指定所使用的nio传输channel .channel(NioServerSocketChannel.class) //使用指定端口设置套接字地址 .localAddress(port) //添加一个EchoServerHandler到子的Channel的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(serverHandler); } }); try { // 对sync() 方法的调用将导致当前的线程阻塞,一直到绑定操作完成为止 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); group.shutdownGracefully().sync(); } } }

    3.客户端消息处理

    /** * @describe: 客户端消息处理类 * @author: houkai */ @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 在到服务端的连接被建立之后被调用 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //当被通知channel是活跃的时候发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("netty rocks!", CharsetUtil.UTF_8)); } /** * 服务端接收到一条消息时被调用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("客户端接口到消息:"+ msg.toString(CharsetUtil.UTF_8)); } /** * 处理过程发生异常被调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    4.客户端构建

    /** * @describe: 客户端 * @author: houkai */ public class EchoClient { private static final String host = "127.0.0.1"; private static final int port = 9999; public static void main(String[] args) throws InterruptedException { EchoClient echoClient = new EchoClient(); echoClient.start(); } private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); try { //连接到远程节点,阻塞等待直到连接完成 ChannelFuture f = b.connect().sync(); //阻塞一直到channel关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); group.shutdownGracefully().sync(); } } }

     

    Processed: 0.010, SQL: 8