描述
netty对http消息提供了 FullHttpRequest接口,此接口合并HttpRequest和FullHttpMessage ,因此请求是一个完整的HTTP请求。
服务端消息处理器
/**
* @describe: http服务端对消息的处理
* @author: houkai
*/
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
FullHttpResponse response = null;
if (fullHttpRequest.method() == HttpMethod.GET) {
System.out.println(getGetParamsFromChannel(fullHttpRequest));
String data = "GET method over";
ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes());
response = responseOk(HttpResponseStatus.OK, buf);
} else if (fullHttpRequest.method() == HttpMethod.POST) {
System.out.println(getPostParamsFromChannel(fullHttpRequest));
String data = "POST method over";
ByteBuf buf = Unpooled.wrappedBuffer(data.getBytes());
response = responseOk(HttpResponseStatus.OK, buf);
} else {
response = responseOk(HttpResponseStatus.INTERNAL_SERVER_ERROR, null);
}
// 发送响应
channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 获取post请求的数据
*/
private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
Map<String, Object> params = new HashMap<>();
if (fullHttpRequest.method() == HttpMethod.POST) {
// 处理post 请求
String strContentType = fullHttpRequest.headers().get("Content-Type").trim();
if (StringUtil.isNullOrEmpty(strContentType)) {
return null;
}
if (strContentType.contains("x-www-form-urlencoded")) {
params = getFormParams(fullHttpRequest);
} else if (strContentType.contains("application/json")) {
params = getJSONParams(fullHttpRequest);
} else {
return null;
}
}
return params;
}
/**
* 获取post请求json格式的数据 并转为map
*/
private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) {
Map<String, Object> params = new HashMap<>();
ByteBuf content = fullHttpRequest.content();
byte[] reqContent = new byte[content.readableBytes()];
content.readBytes(reqContent);
String strContent = null;
try {
strContent = new String(reqContent, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
JSONObject jsonParams = JSONObject.parseObject(strContent);
for (Object key : jsonParams.keySet()) {
params.put(key.toString(), jsonParams.get(key));
}
return params;
}
/**
* 获取post请求 form表单格式的数据并转为map
*/
private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
Map<String, Object> params = new HashMap<>();
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
for (InterfaceHttpData data : postData) {
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
params.put(attribute.getName(), attribute.getValue());
}
}
return params;
}
/**
* 构造成功的响应信息 响应头 + 响应体
*/
private FullHttpResponse responseOk(HttpResponseStatus status, ByteBuf buf) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf);
if (buf != null) {
response.headers().set("Content-Type", "text/plain;charset=UTF-8");
response.headers().set("Content-Length", response.content().readableBytes());
}
return response;
}
/**
* 获取get的请求参数并转为map
*/
private Map<String, Object> getGetParamsFromChannel(FullHttpRequest fullHttpRequest) {
Map<String, Object> params = new HashMap<>();
if (fullHttpRequest.method() == HttpMethod.GET) {
QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
Map<String, List<String>> paramList = decoder.parameters();
for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
params.put(entry.getKey(), entry.getValue().get(0));
}
return params;
}
return params;
}
}
服务器端启动器
/**
* @describe: http 服务端
* @author: houkai
*/
public class HttpServer {
private static final int port = 9999;
public static void main(String[] args) throws InterruptedException {
HttpServer server = new HttpServer();
server.start();
}
private void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 请求解码器
socketChannel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
// 将HTTP消息的多个部分合成一条完整的HTTP消息
socketChannel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65535));
// 响应转码器
socketChannel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
// 解决大码流的问题,ChunkedWriteHandler:向客户端发送HTML5文件
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 自定义处理handler
socketChannel.pipeline().addLast("http-server", new HttpHandler());
}
});
bootstrap.bind(port).sync();
}
}
客户端消息处理器
/**
* @describe:
* @author: houkai
*/
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 通断被激活时我们发送请求到指定路径
URI uri = new URI("http://localhost:9999");
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
// 添加请求头,确定请求体的长度
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
// 将请求实体写入出站处理器
ctx.writeAndFlush(request);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpResponse) {
// 收到了http服务器的响应
FullHttpResponse response = (FullHttpResponse) msg;
ByteBuf buf = response.content();
String result = buf.toString(CharsetUtil.UTF_8);
System.out.println("response -> " + result);
}
}
}
客户端启动器
/**
* @describe: http 客户端
* @author: houkai
*/
public class HttpClient {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpObjectAggregator(65536)); // http消息聚合器
channel.pipeline().addLast(new HttpContentDecompressor()); // 数据解压处理器
channel.pipeline().addLast(new HttpClientHandler()); // 业务逻辑处理器
}
});
try {
// 异步等待连接上远程http服务器
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
// 等待与远程http服务器断开连接
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭客户端 一旦这个方法被调用, isShuttingDown()开始返回true ,并且执行准备关机
group.shutdownGracefully();
}
}
}