socket长连接即服务端不断开客户端channel的连接,客户端需要定时向服务端进行心跳检测,服务端需要将过期未进行心跳检测的socket关闭。
服务端关闭过期的channel连接: Netty提供了ScheduledFuture,可以通过ChannelHandlerContext.executor().schedule()创建,支持延时提交,也支持取消任务,为自动关闭提供了一个很好的实现方案。
消息定义
public class Msg { /** 消息类型: 1:心跳检测消息 2:普通消息 */ private byte type; /**消息长度*/ private int length; /**消息内容*/ private String content; public byte getType() { return type; } public void setType(byte type) { this.type = type; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return "Msg{" + "type=" + type + ", length=" + length + ", content='" + content + '\'' + '}'; } }消息编码
public class MsgEncoder extends MessageToByteEncoder<Msg> { @Override protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf byteBuf) throws Exception { byteBuf.writeByte(msg.getType()); byteBuf.writeInt(msg.getLength()); if (!StringUtil.isNullOrEmpty(msg.getContent())) { byteBuf.writeBytes(msg.getContent().getBytes()); } } }消息解码
public class MsgDecoder extends ReplayingDecoder<MsgDecoder.MsgState> { /** * 状态类型通常是一个Enum ; 使用Void如果状态管理是未使用 * TYPE: 消息类型 * LENGTH: 消息长度 * CONTENT: 消息内容 */ public enum MsgState { TYPE, LENGTH, CONTENT } public MsgDecoder() { super(MsgState.TYPE); } private Msg msg; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { MsgState state = state(); switch (state) { case TYPE: msg = new Msg(); byte type = byteBuf.readByte(); msg.setType(type); checkpoint(MsgState.LENGTH); break; case LENGTH: int length = byteBuf.readInt(); msg.setLength(length); if (length > 0) { checkpoint(MsgState.CONTENT); } else { out.add(msg); checkpoint(MsgState.TYPE); } break; case CONTENT: byte[] bytes = new byte[msg.getLength()]; byteBuf.readBytes(bytes); String content = new String(bytes); msg.setContent(content); out.add(msg); checkpoint(MsgState.TYPE); break; default: throw new IllegalStateException("invalid state:" + state); } } }消息处理
@ChannelHandler.Sharable public class MsgHandler extends SimpleChannelInboundHandler<Msg> { private static Map<Integer, ChannelCache> channelCache = new HashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, Msg msg) throws Exception { System.out.println("收到消息,消息内容" + msg); Channel channel = ctx.channel(); final int hashCode = channel.hashCode(); //判断channel在缓存中 if (!channelCache.containsKey(hashCode)) { //添加通道关闭的监听器,当通道关闭时将channel从缓存中移除 channel.closeFuture().addListener(future -> { channelCache.remove(hashCode); }); //创建并执行定时任务 10秒后服务端主动将channel关闭 ScheduledFuture scheduledFuture = ctx.executor().schedule( () -> { channel.close(); }, 10, TimeUnit.SECONDS); //将渠道信息放入缓存 channelCache.put(hashCode, new ChannelCache(channel, scheduledFuture)); } switch (msg.getType()) { //心跳检测 case 1: { //创建一个新的定时器 ScheduledFuture scheduledFuture = ctx.executor().schedule( () -> channel.close(), 5, TimeUnit.SECONDS); //重新设置channel过期定时器并将老的定时器取消 ChannelCache cache = channelCache.get(hashCode); cache.getScheduledFuture().cancel(true); cache.setScheduledFuture(scheduledFuture); ctx.channel().writeAndFlush(msg); break; } //普通消息 case 2: { channelCache.entrySet().stream().forEach(entry -> { Channel otherChannel = entry.getValue().getChannel(); otherChannel.writeAndFlush(msg); }); break; } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (null != cause) { cause.printStackTrace(); } if (null != ctx) { ctx.close(); } } }channel缓存
public class ChannelCache { private Channel channel; private ScheduledFuture scheduledFuture; public ChannelCache(Channel channel, ScheduledFuture scheduledFuture) { this.channel = channel; this.scheduledFuture = scheduledFuture; } 。。。。 }服务端
/** * 基于netty的服务端 * 思路: * socket长连接即服务端不断开客户端channel的连接,客户端需要定时向服务端进行心跳检测,服务端需要将过期未进行心跳检测的socket关闭。 * 服务端关闭过期的channel连接: * Netty提供了ScheduledFuture,可以通过ChannelHandlerContext.executor().schedule()创建,支持延时提交,也支持取消任务, * 为自动关闭提供了一个很好的实现方案。 */ public class LongConnServer { private static final int port = 9999; public static void main(String[] args) throws Exception { LongConnServer server = new LongConnServer(); server.start(); } public void start() throws Exception { ServerBootstrap b = new ServerBootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new MsgDecoder()) .addLast("encoder", new MsgEncoder()) .addLast("handler", new MsgHandler()); } }) // determining the number of connections queued .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); b.bind(port).sync(); } }客户端
/** * @describe: socket客户端 * @author: houkai */ public class LongConnClient { String host = "127.0.0.1"; int port = 9999; public static void main(String[] args) throws Exception { new LongConnClient().testLongConn(); } public void testLongConn() throws Exception { final Socket socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); //独立的线程 获取服务端的响应消息 new Thread(() -> { while (true) { readResponse(socket); } }).start(); //每隔3秒进行一次心跳检测 new Thread(() -> { while (true) { try { heartCheck(socket); Thread.sleep(3000); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }).start(); //客户端每一秒向服务端发送一跳消息 while (true) { byte[] content = ("hello, I'm " + hashCode()).getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5); byteBuffer.put((byte) 2); byteBuffer.putInt(content.length); byteBuffer.put(content); socket.getOutputStream().write(byteBuffer.array()); Thread.sleep(1000); } } /** * 心跳检测 */ private void heartCheck(Socket socket) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(5); byteBuffer.put((byte) 1); byteBuffer.putInt(0); socket.getOutputStream().write(byteBuffer.array()); } /** * 读取响应的消息 */ private void readResponse(final Socket socket) { try { InputStream in = socket.getInputStream(); byte[] buffer = new byte[1024]; int n; while ((n = in.read(buffer)) > 0) { System.out.println(new String(buffer, 0, n)); } } catch (IOException e) { e.printStackTrace(); } } }