一站式学习Java网络编程 全面理解BIO

    科技2022-07-11  111

    目录

    1. AIO模型分析2. 聊天室分析2.1 服务器端2.1.1 字段2.1.2 主方法2.1.3 AcceptHandler2.1.4 ClientHandler(处理读写请求)2.1.5 添加和删除用户2.1.6 接收和转发方法 2.2 客户端2.2.1 主方法2.2.2 发送消息2.2.3 用户的输入线程 3. 测试结果4.完整代码4.1 服务器端4.2 客户端


    1. AIO模型分析

    AsynchronousServerSocket:它属于一个 AsynchronousChannelGroup,这个通道组,其实是被多个异步通道共享的资源群组,这里边我们之前提到过,有一个非常重要的资源:线程池,系统会利用线程池中的线程,来处理一些handler请求。系统利用这个资源组还为我们做了很多的事情,包括它能在数据准备好的时候通知我们和利用handler做一些异步的操作。当我们在创建AsynchronousServerSocket时(open()),我们可以自定义一个通道组,当然我们不传参的时候,系统会默认给我们一个群组。

    当客户端请求与服务器建立连接时,系统会异步的调用AcceptHandler来处理连接请求,成功建立连接后,会返回一个AsynchronousSocketChannel对象,每个对象还会有一个ClientHandler来处理读写请求,在请求处理的过程中,并不是在主线程中完成的,而是通道组利用线程池资源,在不同的线程中完成异步处理。

    2. 聊天室分析

    2.1 服务器端

    2.1.1 字段

    2.1.2 主方法

    2.1.3 AcceptHandler

    2.1.4 ClientHandler(处理读写请求)

    2.1.5 添加和删除用户

    2.1.6 接收和转发方法

    2.2 客户端

    客户端中使用的Future来处理异步请求,非常简单

    2.2.1 主方法

    2.2.2 发送消息

    2.2.3 用户的输入线程

    3. 测试结果

    4.完整代码

    4.1 服务器端

    package server; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ChatServer { private static final String LOCALHOST = "localhost"; private static final int DEFAULT_PORT = 8888; private static final String QUIT = "quit"; private static final int BUFFER = 1024; private AsynchronousServerSocketChannel serverChannel; private AsynchronousChannelGroup asynchronousChannelGroup; private List<ClientHandler> connectedClients; private Charset charset = StandardCharsets.UTF_8; private int port; public ChatServer(int port) { this.port = port; connectedClients = new ArrayList<>(); } public ChatServer() { this(DEFAULT_PORT); } public void start(){ try { //自定义ChannelGroup ExecutorService executorService = Executors.newFixedThreadPool(10); asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(executorService); serverChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup); serverChannel.bind(new InetSocketAddress(LOCALHOST,port)); System.out.println("服务器已经启动成功,随时等待客户端连接..."); while (true){ serverChannel.accept(null,new AcceptHandler()); System.in.read(); } } catch (IOException e) { e.printStackTrace(); }finally { close(serverChannel); } } private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object> { @Override public void completed(AsynchronousSocketChannel clientChannel, Object attachment) { if(serverChannel.isOpen()) serverChannel.accept(null,this); if(clientChannel != null && clientChannel.isOpen()){ ClientHandler clientHandler = new ClientHandler(clientChannel); ByteBuffer buffer = ByteBuffer.allocate(BUFFER); addClient(clientHandler); clientChannel.read(buffer,buffer,clientHandler); } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("连接失败:" + exc.getMessage()); } } private class ClientHandler implements CompletionHandler<Integer,ByteBuffer>{ private AsynchronousSocketChannel clientChannel; public ClientHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } public AsynchronousSocketChannel getClientChannel() { return clientChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer != null){ //buffer不为空的时候,这要执行的是read之后的回调方法 if(result <= 0){ //客户端异常,将客户端从连接列表中移除 removeClient(this); }else{ buffer.flip(); String fwdMsg = receive(buffer); System.out.println(getClientName(clientChannel) + fwdMsg); forwardMsg(clientChannel,fwdMsg); buffer.clear(); if(readyToQuit(fwdMsg)){ removeClient(this); }else { clientChannel.read(buffer,buffer,this); } } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("读写操作失败:" + exc.getMessage()); } } private synchronized void addClient(ClientHandler clientHandler) { connectedClients.add(clientHandler); System.out.println(getClientName(clientHandler.getClientChannel()) + "已经连接"); } private synchronized void removeClient(ClientHandler clientHandler) { AsynchronousSocketChannel clientChannel = clientHandler.getClientChannel(); connectedClients.remove(clientHandler); System.out.println(getClientName(clientChannel) + "已经断开连接"); close(clientChannel); } private void close(Closeable closeable){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } private boolean readyToQuit(String msg){ return QUIT.equals(msg); } private synchronized String receive(ByteBuffer buffer) { return String.valueOf(charset.decode(buffer)); } private synchronized void forwardMsg(AsynchronousSocketChannel clientChannel,String fwdMsg) { for (ClientHandler connectedHandler : connectedClients) { AsynchronousSocketChannel client = connectedHandler.getClientChannel(); if(!client.equals(clientChannel)){ //注意这个try,catch是自己加的 try { //将消息存入缓存区中 ByteBuffer buffer = charset.encode(getClientName(client) + fwdMsg); //写给每个客户端 client.write(buffer,null,connectedHandler); } catch (Exception e) { e.printStackTrace(); } } } } private String getClientName(AsynchronousSocketChannel clientChannel) { int port = -1; try { InetSocketAddress remoteAddress = (InetSocketAddress) clientChannel.getRemoteAddress(); port = remoteAddress.getPort(); } catch (IOException e) { e.printStackTrace(); } return "客户端[" + port + "]:"; } public static void main(String[] args) { ChatServer chatServer = new ChatServer(); chatServer.start(); } }

    4.2 客户端

    package client; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class ChatClient { private static final String LOCALHOST = "localhost"; private static final int DEFAULT_PORT = 8888; private final String QUIT = "quit"; private final int BUFFER = 1024; private String host; private int port; private AsynchronousSocketChannel clientChannel; private Charset charset = StandardCharsets.UTF_8; public ChatClient() { this(LOCALHOST,DEFAULT_PORT); } public ChatClient(String host,int port){ this.host = host; this.port = port; } public void start(){ try { clientChannel = AsynchronousSocketChannel.open(); Future<Void> connect = clientChannel.connect(new InetSocketAddress(host, port)); connect.get(); System.out.println("与服务已成功建立连接"); new Thread(new UserInputHandler(this)).start(); ByteBuffer buffer = ByteBuffer.allocate(BUFFER); while (clientChannel.isOpen()){ Future<Integer> read = clientChannel.read(buffer); int result = read.get(); if(result <= 0){ //这里是,当我们输入quit时,在服务器端会自动将我们移除 //所以这里关闭就好了 close(clientChannel); }else { buffer.flip(); String msg = String.valueOf(charset.decode(buffer)); System.out.println(msg); buffer.clear(); } } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { close(clientChannel); } } public void sendMsg(String msg){ if(msg.isEmpty()){ return; }else { ByteBuffer buffer = charset.encode(msg); Future<Integer> write = clientChannel.write(buffer); try { write.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } private void close(Closeable closeable){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } public boolean readyToQuit(String msg){ return QUIT.equals(msg); } public static void main(String[] args) { ChatClient chatClient = new ChatClient(); chatClient.start(); } } package client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class UserInputHandler implements Runnable{ private ChatClient client; public UserInputHandler(ChatClient client) { this.client = client; } @Override public void run() { BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in)); while (true){ try { String msg = consoleReader.readLine(); client.sendMsg(msg); if(client.readyToQuit(msg)){ System.out.println("成功退出聊天室"); break; } } catch (IOException e) { e.printStackTrace(); } } } }
    Processed: 0.009, SQL: 8