aio编程例子

    科技2022-07-16  122

    AioClient 客户端主程序

    import static cn.enjoyedu.ch01.Ch01Const.DEFAULT_PORT; import static cn.enjoyedu.ch01.Ch01Const.DEFAULT_SERVER_IP; /** * @author yun * 类说明:aio的客户端主程序 */ public class AioClient { //IO通信处理器 private static AioClientHandler clientHandle; public static void start(){ if(clientHandle!=null) return; clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT); //负责网络通讯的线程 new Thread(clientHandle,"Client").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMessag(msg); return true; } public static void main(String[] args) throws Exception{ AioClient.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } }

    Ch01Const常量类

    /** * @author yun * 类说明: */ public class Ch01Const { //服务器端口号 public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; //返回给客户端的应答 public static String response(String msg){ return "Hello,"+msg+",Now is "+new java.util.Date( System.currentTimeMillis()).toString() ; } }

    AioClientHandler通信处理器,负责连接服务器,对外暴露对服务端发送数据的API

    /** * @author yun * 类说明:IO通信处理器,负责连接服务器,对外暴露对服务端发送数据的API */ public class AioClientHandler implements CompletionHandler<Void,AioClientHandler>,Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch;//防止线程退出 public AioClientHandler(String host, int port) { this.host = host; this.port = port; try { //创建一个实际异步的客户端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //创建CountDownLatch,因为是异步调用,下面的connect不会阻塞, // 那么整个run方法会迅速结束,那么负责网络通讯的线程也会迅速结束 latch = new CountDownLatch(1); //发起异步连接操作,回调参数就是这个实例本身, // 如果连接成功会回调这个实例的completed方法 clientChannel.connect(new InetSocketAddress(host,port), null,this); try { latch.await(); clientChannel.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //连接成功,这个方法会被系统调用 @Override public void completed(Void result, AioClientHandler attachment) { System.out.println("已经连接到服务端。"); } //连接失败,这个方法会被系统调用 @Override public void failed(Throwable exc, AioClientHandler attachment) { System.err.println("连接失败。"); exc.printStackTrace(); latch.countDown(); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //对外暴露对服务端发送数据的API public void sendMessag(String msg){ /*为了把msg变成可以在网络传输的格式*/ byte[] bytes = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); /*进行异步写,同样的这个方法会迅速返回, 需要提供一个接口让系统在一次网络写操作完成后通知我们的应用程序。 所以我们传入一个实现了CompletionHandler的AioClientWriteHandler 第1个writeBuffer,表示我们要发送给服务器的数据; 第2个writeBuffer,考虑到网络写有可能无法一次性将数据写完,需要进行多次网络写, 所以将writeBuffer作为附件传递给AioClientWriteHandler。 */ clientChannel.write(writeBuffer,writeBuffer, new AioClientWriteHandler(clientChannel,latch)); } }

    AioClientWriteHandler网络写的处理器

    /** * @author yun * 类说明:网络写的处理器,CompletionHandler<Integer, ByteBuffer>中 * Integer:本次网络写操作完成实际写入的字节数, * ByteBuffer:写操作的附件,存储了写操作需要写入的数据 */ public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //有可能无法一次性将数据写完,需要检查缓冲区中是否还有数据需要继续进行网络写 if(buffer.hasRemaining()){ clientChannel.write(buffer,buffer,this); }else{ //写操作已经完成,为读取服务端传回的数据建立缓冲区 ByteBuffer readBuffer = ByteBuffer.allocate(1024); /*这个方法会迅速返回,需要提供一个接口让 系统在读操作完成后通知我们的应用程序。*/ clientChannel.read(readBuffer,readBuffer, new AioClientReadHandler(clientChannel,latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }

    网络读的处理器

    /** * @author yun * 类说明:网络读的处理器 * CompletionHandler<Integer, ByteBuffer>中 * Integer:本次网络读操作实际读取的字节数, * ByteBuffer:读操作的附件,存储了读操作读到的数据 * */ public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg; try { msg = new String(bytes,"UTF-8"); System.out.println("accept message:"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }

    AioServer服务器主程序

    /** * @author yun * 类说明:服务器主程序 */ public class AioServer { private static AioServerHandler serverHandle; //统计客户端个数 public volatile static long clientCount = 0; public static void start(){ if(serverHandle!=null) return; serverHandle = new AioServerHandler(DEFAULT_PORT); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ AioServer.start(); } }

    AioServerHandler响应网络操作的处理器

    /** * @authoryun * 类说明:响应网络操作的处理器 */ public class AioServerHandler implements Runnable { public CountDownLatch latch; /*进行异步通信的通道*/ public AsynchronousServerSocketChannel channel; public AioServerHandler(int port) { try { //创建服务端通道 channel = AsynchronousServerSocketChannel.open(); //绑定端口 channel.bind(new InetSocketAddress(port)); System.out.println("Server is start,port:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); //用于接收客户端的连接,异步操作, // 需要实现了CompletionHandler接口的处理器处理和客户端的连接操作 channel.accept(this,new AioAcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }

    AioAcceptHandler处理用户连接的处理器

    /** * @author yun * 类说明:处理用户连接的处理器 */ public class AioAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerHandler> { @Override public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) { AioServer.clientCount++; System.out.println("连接的客户端数:" + AioServer.clientCount); //重新注册监听,让别的客户端也可以连接 serverHandler.channel.accept(serverHandler,this); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //1)ByteBuffer dst:接收缓冲区,用于从异步Channel中读取数据包; //2) A attachment:异步Channel携带的附件,通知回调的时候作为入参使用; //3) CompletionHandler<Integer,? super A>:系统回调的业务handler,进行读操作 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } @Override public void failed(Throwable exc, AioServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }

    AioReadHandler读数据的处理器

    /** * @author yun * 类说明:读数据的处理器 */ public class AioReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public AioReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //读取到消息后的处理 @Override public void completed(Integer result, ByteBuffer attachment) { //如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了 if(result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } //flip操作 attachment.flip(); byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { System.out.println(result); String msg = new String(message,"UTF-8"); System.out.println("server accept message:"+msg); String responseStr = response(msg); //向客户端发送消息 doWrite(responseStr); } catch (Exception e) { e.printStackTrace(); } } //发送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //异步写数据 channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if(attachment.hasRemaining()){ channel.write(attachment,attachment,this); }else{ //读取客户端传回的数据 ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读数据 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
    Processed: 0.070, SQL: 8