Netty做一个简单的群聊服务端
一个服务端管理多个客户端进行通信。 例如: 1、 服务端监听 客户端上线,下线提醒。 2、 客户端加入群聊提醒。 3、 客户端发送消息,其他客户端都可以接收。
演示:
开始写代码
1.引入依赖
maven引入Netty依赖
<dependency
>
<groupId
>io
.netty
</groupId
>
<artifactId
>netty
-all
</artifactId
>
<version
>4.1.34.Final
</version
>
</dependency
>
2.服务端启动代码
package com
.liuqi
.chat
;
import io
.netty
.bootstrap
.ServerBootstrap
;
import io
.netty
.channel
.ChannelFuture
;
import io
.netty
.channel
.ChannelInitializer
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.SocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioServerSocketChannel
;
import io
.netty
.handler
.codec
.string
.StringDecoder
;
import io
.netty
.handler
.codec
.string
.StringEncoder
;
public class ChatServer
{
public final
static int PORT
= 8088;
public
static void main(String
[] args
) {
NioEventLoopGroup bossGroup
= new
NioEventLoopGroup(1);
NioEventLoopGroup workGroup
= new
NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap
= new
ServerBootstrap();
serverBootstrap
.group(bossGroup
, workGroup
)
.channel(NioServerSocketChannel
.class
)
.childHandler(new ChannelInitializer
<SocketChannel
>() {
protected
void initChannel(SocketChannel ch
) throws Exception
{
ch
.pipeline()
.addLast(new
StringDecoder())
.addLast(new
StringEncoder())
.addLast(new
ChatServerHandler());
}
});
ChannelFuture cf
= serverBootstrap
.bind(PORT
).sync();
System
.out
.println("服务启动成功");
cf
.channel().closeFuture().sync();
} catch
(Exception e
) {
System
.out
.println("启动失败 " + e
.getMessage());
e
.printStackTrace();
} finally
{
bossGroup
.shutdownGracefully();
workGroup
.shutdownGracefully();
}
}
}
3.对自定义 ChatServerHandler 处理器的编写
服务器接收客户端连接事件,断开连接事件,接收客户端消息,发送消息给客户端的事件处理。
channel_group 管理所有的客户端通道,由这个集合群发消息给客户端。
package com
.liuqi
.chat
;
import io
.netty
.channel
.Channel
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.SimpleChannelInboundHandler
;
import io
.netty
.channel
.group
.ChannelGroup
;
import io
.netty
.channel
.group
.DefaultChannelGroup
;
import io
.netty
.util
.concurrent
.GlobalEventExecutor
;
import java
.text
.SimpleDateFormat
;
import java
.util
.Date
;
public class ChatServerHandler extends SimpleChannelInboundHandler
<String
> {
private final
static ChannelGroup channel_group
= new
DefaultChannelGroup(GlobalEventExecutor
.INSTANCE
);
private final
static SimpleDateFormat sdf
= new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public
void handlerAdded(ChannelHandlerContext ctx
) throws Exception
{
Channel channel
= ctx
.channel();
channel_group
.writeAndFlush(sdf
.format(new
Date()) + " [客户端]" + channel
.remoteAddress() + " 加入聊天");
channel_group
.add(channel
);
}
@Override
public
void handlerRemoved(ChannelHandlerContext ctx
) throws Exception
{
Channel channel
= ctx
.channel();
channel_group
.writeAndFlush(sdf
.format(new
Date()) + "[客户端]" + channel
.remoteAddress() + " 离开了");
System
.out
.println("channelGroup size " + channel_group
.size());
}
@Override
public
void channelActive(ChannelHandlerContext ctx
) throws Exception
{
System
.out
.println(sdf
.format(new
Date()) + "[客户端]" + ctx
.channel().remoteAddress() + " 上线了~");
}
@Override
public
void channelInactive(ChannelHandlerContext ctx
) throws Exception
{
System
.out
.println(sdf
.format(new
Date()) + "[客户端]" + ctx
.channel().remoteAddress() + " 离线了~");
}
protected
void channelRead0(final ChannelHandlerContext ctx
, String msg
) throws Exception
{
channel_group
.forEach(channel
-> {
if (channel
!= ctx
.channel()) {
channel
.writeAndFlush(sdf
.format(new
Date()) + " " + channel
.remoteAddress() + " 说: " + msg
);
} else {
channel
.writeAndFlush(sdf
.format(new
Date()) + " [我发送了] " + msg
);
}
});
}
@Override
public
void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
System
.out
.println("连接异常 " + cause
.getMessage());
ctx
.close();
}
}
4.客户端启动代码
启动客户端,连接到服务端成功后,进行阻塞,监听键盘输入。每输入一行就发送消息给服务端,
服务端在将消息群发给其他客户端。
注意:连接服务端成功后需要阻塞,不然会断开连接.
可以使用 channelFuture.channel().closeFuture().sync()进行阻塞.
Channel channel = channelFuture.channel() 得到的通道可以保存起来,进行发送消息。
package com
.liuqi
.chat
.client
;
import com
.liuqi
.chat
.ChatServer
;
import io
.netty
.bootstrap
.Bootstrap
;
import io
.netty
.channel
.Channel
;
import io
.netty
.channel
.ChannelFuture
;
import io
.netty
.channel
.ChannelInitializer
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.SocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
import io
.netty
.handler
.codec
.string
.StringDecoder
;
import io
.netty
.handler
.codec
.string
.StringEncoder
;
import java
.util
.Scanner
;
public class ChatClient
{
public
static void main(String
[] args
) {
NioEventLoopGroup eventExecutors
= new
NioEventLoopGroup();
Bootstrap bootstrap
= new
Bootstrap();
try
{
bootstrap
.group(eventExecutors
)
.channel(NioSocketChannel
.class
)
.handler(new ChannelInitializer
<SocketChannel
>() {
protected
void initChannel(SocketChannel ch
) throws Exception
{
ch
.pipeline()
.addLast(new
StringDecoder())
.addLast(new
StringEncoder())
.addLast(new
ChatClientHandler());
}
});
ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", ChatServer
.PORT
).sync();
System
.out
.println("建立连接成功");
Channel channel
= channelFuture
.channel();
Scanner scanner
= new
Scanner(System
.in
);
while (scanner
.hasNextLine()) {
String msg
= scanner
.nextLine();
channel
.writeAndFlush(msg
);
}
} catch
(Exception e
) {
System
.out
.println("启动客户端失败 " + e
.getMessage());
e
.printStackTrace();
} finally
{
eventExecutors
.shutdownGracefully();
}
}
}
5 ChatClientHandler 客户端的消息处理器
这个客户端处理器很简单,就接收到服务器发送的消息,输出在控制台。
package com
.liuqi
.chat
.client
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.SimpleChannelInboundHandler
;
public class ChatClientHandler extends SimpleChannelInboundHandler
<String
> {
protected
void channelRead0(ChannelHandlerContext ctx
, String msg
) throws Exception
{
System
.out
.println(msg
);
}
@Override
public
void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
System
.out
.println("连接异常 " + cause
.getMessage());
ctx
.close();
}
}
总结
到此这个简单的群聊服务就做好了 可以多复制几个客户端启动进行调试。