一、使用系统分隔符
客户端
public class LineBaseEchoClient {
private final String host
;
public LineBaseEchoClient(String host
) {
this.host
= host
;
}
public void start() throws InterruptedException
{
EventLoopGroup group
= new NioEventLoopGroup();
try {
final Bootstrap b
= new Bootstrap();;
b
.group(group
)
.channel(NioSocketChannel
.class)
.remoteAddress(new InetSocketAddress(host
,LineBaseEchoServer
.PORT
))
.handler(new ChannelInitializerImp());
ChannelFuture f
= b
.connect().sync();
System
.out
.println("已连接到服务器.....");
f
.channel().closeFuture().sync();
} finally {
group
.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch
) throws Exception
{
ch
.pipeline().addLast(new LineBasedFrameDecoder(1024*100000));
ch
.pipeline().addLast(new LineBaseClientHandler());
}
}
public static void main(String
[] args
) throws InterruptedException
{
new LineBaseEchoClient("127.0.0.1").start();
}
}
客户端handler
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter
= new AtomicInteger(0);
protected void channelRead0(ChannelHandlerContext ctx
, ByteBuf msg
) throws Exception
{
System
.out
.println("client Accept["+msg
.toString(CharsetUtil
.UTF_8
)
+"] and the counter is:"+counter
.incrementAndGet());
}
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
ByteBuf msg
= null
;
String request
= "Mark,Lison,Peter,James,Deer"
+ System
.getProperty("line.separator");
for(int i
=0;i
<100;i
++){
msg
= Unpooled
.buffer(request
.length());
msg
.writeBytes(request
.getBytes());
ctx
.writeAndFlush(msg
);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
服务端
public class LineBaseEchoServer {
public static final int PORT
= 9998;
public static void main(String
[] args
) throws InterruptedException
{
LineBaseEchoServer lineBaseEchoServer
= new LineBaseEchoServer();
System
.out
.println("服务器即将启动");
lineBaseEchoServer
.start();
}
public void start() throws InterruptedException
{
final LineBaseServerHandler serverHandler
= new LineBaseServerHandler();
EventLoopGroup group
= new NioEventLoopGroup();
try {
ServerBootstrap b
= new ServerBootstrap();
b
.group(group
)
.channel(NioServerSocketChannel
.class)
.localAddress(new InetSocketAddress(PORT
))
.childHandler(new ChannelInitializerImp());
ChannelFuture f
= b
.bind().sync();
System
.out
.println("服务器启动完成,等待客户端的连接和数据.....");
f
.channel().closeFuture().sync();
} finally {
group
.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch
) throws Exception
{
ch
.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch
.pipeline().addLast(new LineBaseServerHandler());
}
}
}
服务端handler
@ChannelHandler.Sharable
public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter
= new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
ByteBuf in
= (ByteBuf
)msg
;
String request
= in
.toString(CharsetUtil
.UTF_8
);
System
.out
.println("Server Accept["+request
+"] and the counter is:"+counter
.incrementAndGet());
String resp
= "Hello,"+request
+". Welcome to Netty World!"
+ System
.getProperty("line.separator");
ctx
.writeAndFlush(Unpooled
.copiedBuffer(resp
.getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
二、使用自定义分隔符
无论是客户端还是服务端只有ChannelInitializerImp和DelimiterClientHandler有区别,其它和第一种方法的代码完全一样
ChannelInitializerImp有改变的代码:
public static final String DELIMITER_SYMBOL
= "MarkJames";
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch
) throws Exception
{
ByteBuf delimiter
= Unpooled
.copiedBuffer(
DelimiterEchoServer
.DELIMITER_SYMBOL
.getBytes());
ch
.pipeline().addLast(
new DelimiterBasedFrameDecoder(1024,delimiter
));
ch
.pipeline().addLast(new DelimiterClientHandler());
}
}
DelimiterClientHandler有改变的代码:
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
ByteBuf msg
= null
;
String request
= "Mark,Lison,Peter,James,Deer"
+ DelimiterEchoServer
.DELIMITER_SYMBOL
;
for(int i
=0;i
<10;i
++){
msg
= Unpooled
.buffer(request
.length());
msg
.writeBytes(request
.getBytes());
ctx
.writeAndFlush(msg
);
}
}
三、使用定长消息
无论是客户端还是服务端只有ChannelInitializerImp和DelimiterClientHandler有区别,其它和第一种方法的代码完全一样
ChannelInitializerImp有改变的代码:
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch
) throws Exception
{
ch
.pipeline().addLast(
new FixedLengthFrameDecoder(
FixedLengthEchoServer
.RESPONSE
.length()));
ch
.pipeline().addLast(new FixedLengthClientHandler());
}
}
DelimiterClientHandler有改变的代码:
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
ByteBuf msg
= null
;
for(int i
=0;i
<10;i
++){
msg
= Unpooled
.buffer(FixedLengthEchoClient
.REQUEST
.length());
msg
.writeBytes(FixedLengthEchoClient
.REQUEST
.getBytes());
ctx
.writeAndFlush(msg
);
}
}
四、使用发送数据和数据长度
略