netty~http客户端与服务端

    科技2024-08-01  31

    描述

       netty对http消息提供了 FullHttpRequest接口,此接口合并HttpRequest和FullHttpMessage ,因此请求是一个完整的HTTP请求。

    服务端消息处理器

    /** * @describe: http服务端对消息的处理 * @author: houkai */ public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) { FullHttpResponse response = null; if (fullHttpRequest.method() == HttpMethod.GET) { System.out.println(getGetParamsFromChannel(fullHttpRequest)); String data = "GET method over"; ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes()); response = responseOk(HttpResponseStatus.OK, buf); } else if (fullHttpRequest.method() == HttpMethod.POST) { System.out.println(getPostParamsFromChannel(fullHttpRequest)); String data = "POST method over"; ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes()); response = responseOk(HttpResponseStatus.OK, buf); } else { response = responseOk(HttpResponseStatus.INTERNAL_SERVER_ERROR, null); } // 发送响应 channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** * 获取post请求的数据 */ private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) { Map<String, Object> params = new HashMap<>(); if (fullHttpRequest.method() == HttpMethod.POST) { // 处理post 请求 String strContentType = fullHttpRequest.headers().get("Content-Type").trim(); if (StringUtil.isNullOrEmpty(strContentType)) { return null; } if (strContentType.contains("x-www-form-urlencoded")) { params = getFormParams(fullHttpRequest); } else if (strContentType.contains("application/json")) { params = getJSONParams(fullHttpRequest); } else { return null; } } return params; } /** * 获取post请求json格式的数据 并转为map */ private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) { Map<String, Object> params = new HashMap<>(); ByteBuf content = fullHttpRequest.content(); byte[] reqContent = new byte[content.readableBytes()]; content.readBytes(reqContent); String strContent = null; try { strContent = new String(reqContent, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } JSONObject jsonParams = JSONObject.parseObject(strContent); for (Object key : jsonParams.keySet()) { params.put(key.toString(), jsonParams.get(key)); } return params; } /** * 获取post请求 form表单格式的数据并转为map */ private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) { Map<String, Object> params = new HashMap<>(); HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest); List<InterfaceHttpData> postData = decoder.getBodyHttpDatas(); for (InterfaceHttpData data : postData) { if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { MemoryAttribute attribute = (MemoryAttribute) data; params.put(attribute.getName(), attribute.getValue()); } } return params; } /** * 构造成功的响应信息 响应头 + 响应体 */ private FullHttpResponse responseOk(HttpResponseStatus status, ByteBuf buf) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf); if (buf != null) { response.headers().set("Content-Type", "text/plain;charset=UTF-8"); response.headers().set("Content-Length", response.content().readableBytes()); } return response; } /** * 获取get的请求参数并转为map */ private Map<String, Object> getGetParamsFromChannel(FullHttpRequest fullHttpRequest) { Map<String, Object> params = new HashMap<>(); if (fullHttpRequest.method() == HttpMethod.GET) { QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri()); Map<String, List<String>> paramList = decoder.parameters(); for (Map.Entry<String, List<String>> entry : paramList.entrySet()) { params.put(entry.getKey(), entry.getValue().get(0)); } return params; } return params; } }

    服务器端启动器

    /** * @describe: http 服务端 * @author: houkai */ public class HttpServer { private static final int port = 9999; public static void main(String[] args) throws InterruptedException { HttpServer server = new HttpServer(); server.start(); } private void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 请求解码器 socketChannel.pipeline().addLast("http-decoder", new HttpRequestDecoder()); // 将HTTP消息的多个部分合成一条完整的HTTP消息 socketChannel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65535)); // 响应转码器 socketChannel.pipeline().addLast("http-encoder", new HttpResponseEncoder()); // 解决大码流的问题,ChunkedWriteHandler:向客户端发送HTML5文件 socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // 自定义处理handler socketChannel.pipeline().addLast("http-server", new HttpHandler()); } }); bootstrap.bind(port).sync(); } }

    客户端消息处理器

    /** * @describe: * @author: houkai */ public class HttpClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通断被激活时我们发送请求到指定路径 URI uri = new URI("http://localhost:9999"); FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString()); // 添加请求头,确定请求体的长度 request.headers().add(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 将请求实体写入出站处理器 ctx.writeAndFlush(request); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpResponse) { // 收到了http服务器的响应 FullHttpResponse response = (FullHttpResponse) msg; ByteBuf buf = response.content(); String result = buf.toString(CharsetUtil.UTF_8); System.out.println("response -> " + result); } } }

    客户端启动器

    /** * @describe: http 客户端 * @author: houkai */ public class HttpClient { public static void main(String[] args) { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new HttpClientCodec()); channel.pipeline().addLast(new HttpObjectAggregator(65536)); // http消息聚合器 channel.pipeline().addLast(new HttpContentDecompressor()); // 数据解压处理器 channel.pipeline().addLast(new HttpClientHandler()); // 业务逻辑处理器 } }); try { // 异步等待连接上远程http服务器 ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync(); // 等待与远程http服务器断开连接 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭客户端 一旦这个方法被调用, isShuttingDown()开始返回true ,并且执行准备关机 group.shutdownGracefully(); } } }

     

    Processed: 0.009, SQL: 8