Netty聊天室
项目结构
服务端
Server
public class GroupChatServer {
private int port
;
public GroupChatServer(int port
) {
this.port
= port
;
}
public void run() throws InterruptedException
{
EventLoopGroup bossGroup
= new NioEventLoopGroup(1);
EventLoopGroup workerGroup
= new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap
= new ServerBootstrap();
serverBootstrap
.group(bossGroup
,workerGroup
)
.channel(NioServerSocketChannel
.class)
.option(ChannelOption
.SO_BACKLOG
,128)
.childOption(ChannelOption
.SO_KEEPALIVE
,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast("decoder",new StringDecoder());
pipeline
.addLast("encoder",new StringEncoder());
pipeline
.addLast(new GroupChatServerHandler());
}
});
System
.out
.println("netty 服务器启动");
ChannelFuture channelFuture
= serverBootstrap
.bind(port
).sync();
channelFuture
.channel().closeFuture().sync();
}finally {
bossGroup
.shutdownGracefully();
workerGroup
.shutdownGracefully();
}
}
public static void main(String
[] args
) throws InterruptedException
{
new GroupChatServer(6666).run();
}
}
ServerHandler
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup
= new DefaultChannelGroup(GlobalEventExecutor
.INSTANCE
);
@Override
public void handlerAdded(ChannelHandlerContext ctx
) throws Exception
{
Channel channel
= ctx
.channel();
channelGroup
.writeAndFlush("[客户端]"+channel
.remoteAddress()+"加入聊天\n");
channelGroup
.add(channel
);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx
) throws Exception
{
Channel channel
= ctx
.channel();
channelGroup
.writeAndFlush("[客户端]"+channel
.remoteAddress()+"离开了\n");
System
.out
.println("channelGroup size"+channelGroup
.size());
}
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
System
.out
.println(ctx
.channel().remoteAddress()+"上线了~");
}
@Override
public void channelInactive(ChannelHandlerContext ctx
) throws Exception
{
System
.out
.println(ctx
.channel().remoteAddress()+"离线了~");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx
, String msg
) throws Exception
{
Channel channel
= ctx
.channel();
channelGroup
.forEach(ch
-> {
if (channel
!= ch
){
ch
.writeAndFlush("[用户]"+channel
.remoteAddress() +" 发送了消息: "+msg
+"\n");
}else {
ch
.writeAndFlush("[自己]发送了消息: "+msg
+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
ctx
.close();
}
}
客户端
Client
public class GroupChatClient {
private final String host
;
private final int port
;
public GroupChatClient(String host
, int port
) {
this.host
= host
;
this.port
= port
;
}
public void run() throws InterruptedException
{
NioEventLoopGroup group
= new NioEventLoopGroup();
try {
Bootstrap bootstrap
= new Bootstrap();
bootstrap
.group(group
)
.channel(NioSocketChannel
.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast("decoder",new StringDecoder());
pipeline
.addLast("encoder",new StringEncoder());
pipeline
.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture
= bootstrap
.connect(host
, port
).sync();
Channel channel
= channelFuture
.channel();
System
.out
.println("----------"+channel
.localAddress()+"----------");
Scanner scanner
= new Scanner(System
.in
);
while (scanner
.hasNextLine()){
String msg
= scanner
.nextLine();
channel
.writeAndFlush(msg
+"\r\n");
}
}finally {
group
.shutdownGracefully();
}
}
public static void main(String
[] args
) throws InterruptedException
{
new GroupChatClient("localhost",6666).run();
}
}
ClientHandler
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx
, String msg
) throws Exception
{
System
.out
.println(msg
.trim());
}
}