netty作为应用框架,为什么需要做心跳呢?我们可以想象这样一个场景,服务器在某个时段连接了很多客户端,数量超过了服务器的带宽负载,就会导致一些客户端,特别是网络很差的客户端在底层的tcp连接中其实已经跟服务器断开了,但是服务器跟这些客户端的tcp的4次挥手迟迟没有完成,导致服务器认为这些客户端其实还活着,这种现象叫做链接的假死。如下图
对于netty来讲,一条链接对应着一个channel,一个channel中可能会创建很多handler,会占用服务器资源,一旦出现链接假死,就会造成实际上客户端已经断开,服务器的资源无法释放,造成服务器CPU以及内存资源浪费; 对于底层网络来讲,假死的tcp链接一直占用着IO资源,造成了IO资源的浪费; 这些大量资源被无故占用,直接后果是服务器资源被消耗殆尽,导致服务器性能下降,同时也会造成客户端体验变差,为了解决这个问题,netty提供了IdleStateHandler ,实现了服务器以及客户端的空闲检测以及心跳检测支持。
下面还是使用我们之前讲到过的自定义协议进行代码进行心跳演示
服务端:
NettyServer.java
@Slf4j public class NettyServer { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { serverBootstrap.group(bossGroup, workerGroup) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast("heartbeat", new NettyServerIdleStateHandler(10, 0, 0, TimeUnit.SECONDS)); ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(new NettyServerHeartBeatHandler()); ch.pipeline().addLast(new PacketEncoder()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅关闭 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }NettyServerIdleStateHandler.java
@Slf4j public class NettyServerIdleStateHandler extends IdleStateHandler { public NettyServerIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds); } public NettyServerIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { super(readerIdleTime, writerIdleTime, allIdleTime, unit); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == IdleState.READER_IDLE) { log.info("1s内没有读请求"); } else if (evt.state() == IdleState.WRITER_IDLE) { log.info("1s内没有写请求"); } else if (evt.state() == IdleState.ALL_IDLE) { log.info("1s内没有读写请求"); } else { } log.info("客户端超时未响应,断开链接"); ctx.channel().close(); } }NettyServerHeartBeatHandler.java
@Slf4j public class NettyServerHeartBeatHandler extends SimpleChannelInboundHandler<HeartBeatRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, HeartBeatRequestMessage msg) throws Exception { log.info("收到客户端[{}]心跳,返回回应消息", ctx.channel().remoteAddress()); ctx.channel().writeAndFlush(new HeartBeatResponseMessage()); } }客户端:
@Slf4j public class NettyClient { public static void main(String[] args) { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); try { bootstrap.group(nioEventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<HeartBeatResponseMessage>() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.executor().scheduleWithFixedDelay(() -> { if (ctx.channel().isActive()) { log.info("发送心跳"); ctx.channel().writeAndFlush(new HeartBeatRequestMessage()); } else { log.info("客户端挂了"); } // }, 5, 5, TimeUnit.SECONDS); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, HeartBeatResponseMessage msg) throws Exception { log.info("收到服务端心跳回应, [{}]", msg); } }); ch.pipeline().addLast(new PacketEncoder()); } }).connect("127.0.0.1", 8888).addListener( future -> { if (future.isSuccess()) { log.info("链接服务器成功"); } else { log.info("链接服务器失败"); System.exit(0); } } ).channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { nioEventLoopGroup.shutdownGracefully(); } } }以上代码的实现流程是客户端在启动时就直接启动一个定时任务给服务端发送心跳,服务端收到心跳消息后,会给客户端一个心跳包回应,服务端对客户端进行空闲检测,如果客户端在规定时间内没有心跳或者其他消息发来,就主动断开该链接
源码地址
