一文打穿JavaNIO

    科技2024-08-16  31

    努力写更多优质文章,欢迎关注CC_且听风吟~

    文章目录

    Java NIOBuffer简单使用实例NIO和BIO的比较NIO核心组件关系BufferBuffer常用方法Buffer使用注意事项使用多个Buffer ChannelFileChannel写入文件FileChannel读取文件两个Channel共用BufferFileChannel拷贝文件 SelectorSelector类的方法 NIO网络通信流程NIO通信流程实例NIO群聊应用实例

    Java NIO

    Java NIO(Java non-blocking IO),指的是JDK提供的新API,从JDK1.4开始提供的一系列改进的输入输出特性,被称为NIO,是同步非阻塞的

    相关包为java.nio包,并且对原来的java.io包中的很多类进行了改写

    NIO的三个核心部分:Selector、Channel、Buffer,一张图:

    NIO是面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可以在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。

    JavaNIO的非阻塞模式,使一个线程从某Channel发送请求或者读取数据,但是它仅能得到目前可用的数据;如果目前没有数据可用,就什么都不会获取,而不是阻塞等待(不会阻塞)。

    所以,直到有了新的数据之前,这个线程可以处理其他内容,这是非阻塞读取read

    非阻塞写也是如此,一个线程请求写入一些数据到某通道,但是不需要等待他完全写入,这个线程可以处理其他内容。

    NIO可以做到使用一个线程来处理多个操作。假设有10000个请求,可以分配50个线程来处理即可,而不是像BIO一样需要10000个线程

    HTTP2.0使用了多路复用技术,实现了一个连接并发处理多个请求。

    Buffer简单使用实例

    import java.nio.IntBuffer; public class BufferExample { public static void main(String[] args) { // 创建一个Buffer,大小为5,可以存放5个int IntBuffer intBuffer = IntBuffer.allocate(5); // 向buffer中存放数据 for (int i = 0; i < intBuffer.capacity(); i++) { intBuffer.put(i); } // 将buffer读写切换 intBuffer.flip(); // 从buffer中读取数据 while (intBuffer.hasRemaining()){ System.out.println(intBuffer.get()); } } }

    程序输出内容如下:

    0 1 2 3 4

    NIO和BIO的比较

    NIO以流的方式处理数据,而NIO以块的方式处理数据,NIO效率高BIO是阻塞的,NIO是非阻塞的BIO基于字节流和字符流进行操作,而NIO是基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Select用于监听多个通道的事件(连接请求、数据到达等等),因此单个线程就可以监听多个客户端通道。

    NIO核心组件关系

    每个Channel都会对应一个Buffer每个Selector对应一个线程,一个线程的对应多个Channel每个Channel会注册到Selector程序切换到哪个Channel,是由事件Event决定的Selector会根据不同的事件,在各个通道上切换Buffer就是一个内存块,底层有一个数组数据的读取或者写入是通过Buffer,Buffer可以使用filp方法切换输入输出,BIO不能双向Channel是双向的,可以返回底层操作系统的情况,如Linux底层通道就是双向的

    Buffer

    缓冲区(Buffer)

    底层就是一个数组,提供了操作该数组的方法,可以更轻松地使用内存块;缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。

    数据的读取或者写入都必须经过Buffer

    以下代码摘取自java.nio包下的Buffer类:

    public abstract class Buffer { // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; // Creates a new buffer with the given mark, position, limit, and capacity, // after checking invariants. Buffer(int mark, int pos, int lim, int cap) { // package-private if (cap < 0) throw new IllegalArgumentException("Negative capacity: " + cap); this.capacity = cap; limit(lim); position(pos); if (mark >= 0) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")"); this.mark = mark; } } }

    Buffer抽象类中有上面四个主要的属性:mark、position、limit、capacity

    position:可以理解为内置数组的当前下标limit:position不能超过的最大位置,可以比capacity小capacity:内置数组的最大容量mark:标记位置,reset时需要

    例如Buffer的子类ByteBuffer:

    public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> { final byte[] hb; // Non-null only for heap buffers final int offset; boolean isReadOnly; // Valid only for heap buffers // Creates a new buffer with the given mark, position, limit, capacity, // backing array, and array offset ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); // 调用了父类方法 this.hb = hb; // byte内部数组 this.offset = offset; } }

    Buffer常用方法

    方法作用allocate( )分配Buffer大小put( )添加数据get( )获取数据flip( )翻转输入/输出mark( )在此缓冲区的位置设置标记reset( )将缓冲区的位置重置为以前标记地位置rewind( )重置mark并将position设置到起始位置0clear( )重置Buffer的mark、position、limitarray( )获取Buffer底层数组对象(JDK1.6)

    网络编程中常用ByteBuffer

    Buffer使用注意事项

    ByteBuffer支持类型化的put和get方法,放入的是什么数据类型,就应该使用相应的数据类型并且按照相同的顺序来取出,否则结果错误或者报错BufferUnderflowException,buffer溢出

    import java.nio.ByteBuffer; import java.nio.IntBuffer; public class BufferExample1 { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(2333); buffer.putChar('c'); buffer.putDouble(0.1234567); buffer.putShort((short) 4); // 取出 buffer.flip(); System.out.println(buffer.getInt()); System.out.println(buffer.getDouble()); System.out.println(buffer.getShort()); System.out.println(buffer.getChar()); } }

    上面这个例子里面没有按照相同的数据类型顺序取出,结果是错误的内容(和内存占用划分相似):

    2333 8.56609708773623E-307 -9614 []

    可以将一个普通Buffer转换成只读Buffer

    import java.nio.ByteBuffer; import java.nio.IntBuffer; public class BufferExample1 { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(2333); buffer.putChar('c'); buffer.putDouble(0.1234567); buffer.putShort((short) 4); // 得到只读buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getInt()); System.out.println(readOnlyBuffer.getChar()); System.out.println(readOnlyBuffer.getDouble()); System.out.println(readOnlyBuffer.getShort()); readOnlyBuffer.putInt(111); } }

    Exception in thread “main” java.nio.ReadOnlyBufferException

    使用MappedByteBuffer可以让文件直接在内存(堆外内存)中进行修改,操作系统不需要额外拷贝一次进入堆内存:

    import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.IntBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; public class BufferExample2 { public static void main(String[] args) throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("./nio/1.txt", "rw"); FileChannel channel = randomAccessFile.getChannel(); /* FileChannel.MapMode.READ_WRITE 使用读写模式 0 可以直接修改的起始位置 5 文件的多少个字节映射到内存,可以直接修改的范围是[0, 0+5) 实际类型是DirectByteBuffer */ MappedByteBuffer mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); mappedBuffer.put(0, (byte) 'A'); mappedBuffer.put(3, (byte) 'B'); mappedBuffer.put(4, (byte) 'C'); mappedBuffer.put(5, (byte) 'D'); // 越界 IndexOutOfBoundsException randomAccessFile.close(); } }

    上面例子修改过后的文件是直接在内存中进行修改的,因此IDE中无法观察变化,在磁盘上重新打开文件可以观察文件内容的变化

    使用多个Buffer

    缓冲区可以使用Buffer数组来充当缓冲区

    import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; public class BufferExample3 { public static void main(String[] args) throws IOException { // 创建一个Buffer数组 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(5); FileInputStream fin = new FileInputStream("./nio/2.txt"); FileChannel channel = fin.getChannel(); long len = channel.read(byteBuffers); System.out.println("总长度:" + len); Arrays.stream(byteBuffers) .map(buffer -> "已经占用: "+buffer.position() + ", buffer总计: "+ buffer.limit()) .forEach(System.out::println); } }

    输出结果如下:

    总长度:8 已经占用: 5, buffer总计: 5 已经占用: 3, buffer总计: 5

    Channel

    NIO的Channel通道类似于流,但是有一些区别:

    Channel可以同时进行读写,而流只能读或者写;

    Channel可以实现异步读写数据

    Channel可以从缓冲区中读取数据,也可以写入数据到缓冲区

    Channel是NIO中的一个接口,有很多实现的子类接口:

    常用的抽象实现类有:

    类名说明FileChannel本地文件的读写DatagramChannelUDP数据读写ServerSocketChannel服务端TCP数据读写SocketChannel客户端TCP数据读写

    FileChannel写入文件

    import sun.nio.ch.FileChannelImpl; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class FileChannelExample { public static void main(String[] args) throws IOException { String ss = "hello"; FileOutputStream fout = new FileOutputStream("./nio/1.txt"); // 通过流获取对应的FileChannel FileChannel channel = fout.getChannel(); // 创建一个缓冲区Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 将字符串放入Buffer,然后进行翻转准备写出 buffer.put(ss.getBytes()); buffer.flip(); // 将Buffer中的数据写入到Channel channel.write(buffer); fout.close(); } }

    FileChannel读取文件

    import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class FileChannelExample2 { public static void main(String[] args) throws IOException { // 创建文件输入流 File file = new File("./nio/1.txt"); FileInputStream fin = new FileInputStream(file); // 通过流获取对应的FileChannel FileChannel channel = fin.getChannel(); // 创建一个缓冲区Buffer ByteBuffer buffer = ByteBuffer.allocate((int) file.length()); // 将数据读取到Buffer channel.read(buffer); System.out.println(new String(buffer.array())); fin.close(); } }

    两个Channel共用Buffer

    import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; // 使用一个Buffer实现读取和写入 public class FileChannelExample3 { public static void main(String[] args) throws IOException { FileInputStream fin = new FileInputStream("./nio/1.txt"); FileChannel channel_read = fin.getChannel(); FileOutputStream fout = new FileOutputStream("./nio/2.txt"); FileChannel channel_write = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(2); int len = 0; while((len = channel_read.read(buffer))!=-1){ // 读入之后需要翻转buffer buffer.flip(); channel_write.write(buffer); // 在一次写入完成之后需要清空Buffer buffer.clear(); } } }

    上面这个例子需要注意的是在一次读取完成之后的clear操作,不然由于position和limit没有复位,下次再进行read操作的时候就会一直得到read=0的结果

    FileChannel拷贝文件

    这是对上面的例子,有更好的简化:使用transfeFrom或者transferTo函数

    import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; // 拷贝文件 public class FileChannelExample4 { public static void main(String[] args) throws IOException { FileInputStream fin = new FileInputStream("./nio/1.txt"); FileChannel channel_read = fin.getChannel(); FileOutputStream fout = new FileOutputStream("./nio/4.txt"); FileChannel channel_write = fout.getChannel(); // 使用函数来进行channel之间的拷贝操作 channel_write.transferFrom(channel_read, 0, channel_read.size()); channel_read.transferTo(0, channel_read.size(), channel_write); channel_read.close(); channel_write.close(); } }

    Selector

    Java NIO使用的非阻塞IO方式,可以使用Selector(选择器)来实现一个线程处理多个客户端请求

    Selector会监测多个注册的通道Channel上是否有时间发生(多个Channel以事件的方式可以注册到同一个Selector)。如果有事件发生,就获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道的请求。

    只有在通道真正有读写事件OP_READ、OP_WRITE发生的时候,才会进行读写,大大地减少了系统开销,并且不必为每个连接都创建一个线程,避免了多线程之间的上下文切换导致的开销。

    当线程从某个客户端Socket通道进行读写数据时,如果没有数据可用时,该线程可以进行其他任务。

    线程通常将非阻塞I/O的空闲时间用于在其他通道上执行IO操作,这样执行效率很高

    由于读写操作都是非阻塞的,这样就可以充分提升I/O线程的运行效率,避免由于频繁I/O阻塞造成的线程挂起

    Selector类的方法

    方法说明open( )获取selectorselect( )阻塞select(1000)阻塞1000ms,1000ms之后返回wakeup( )唤醒selectorselectNow( )不阻塞,立马返回

    Selector就是通过select方法来循环遍历channels获取事件,如果有新的事件产生就加入selectedKeys集合,如果没有就根据要求返回或者阻塞

    NIO网络通信流程

    首先启动服务端,创建ServerSocketChannel,并且注册到一个Selector,服务端关注的事件为OP_ACCEPTSelector调用select方法,无限循环监听事件,返回有事件发生的通道个数当客户端连接时,会通过serverSocketChannel获取socketChannel会话通道将这个socketChannel注册到Selector,其关注的事件为OP_READ或者OP_WRITE给socketChannel分配一个Buffer用于输入输出操作当某个socketChannel产生了OP_READ或者OP_WRITE事件之后,使用==selectionKey.channel( )方法反向获取该channel,并且使用selectionKey.attachment( )==方法获取该channel的Buffer使用channel进行输入输出操作

    NIO通信流程实例

    根据上面的简易版的NIO网络通信流程,下面是代码实现

    服务器端 NIOServer:

    import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; public class NIOServer { public static void main(String[] args) throws IOException { // 创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 获取Selector对象 Selector selector = Selector.open(); // 绑定端口,在服务器端进行监听 serverSocketChannel.socket().bind(new InetSocketAddress(8888)); // 设置为非阻塞 serverSocketChannel.configureBlocking(false); // 将serverSocketChannel注册到selector,其关注的事件为Accept serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 循环等待客户端连接 while (true){ // 等待1s,如果没有连接事件发生,就返回 (阻塞1s) int select = selector.select(1000); if (select == 0){ System.out.println("服务器等待了1s...无连接"); continue; } // 如果返回的不是0,获取到相关的selectionKey集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("活动中:" + selectionKeys.size()); // 使用selectionKeys获取Channel for (SelectionKey key : selectionKeys) { // 如果发生了连接OP_ACCEPT事件 if (key.isAcceptable()){ // 这里已经确定了accept,所以使用accept()方法不会阻塞,会直接执行 SocketChannel socketChannel = serverSocketChannel.accept(); // 客户端也是非阻塞的 socketChannel.configureBlocking(false); // 将socketChannel注册到selector,同时关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接成功:" + socketChannel.hashCode()); System.out.println("总计注册:" + selector.keys().size()); } // 如果发生了读取OP_READ事件 if (key.isReadable()){ // 使用key反向获取socketChannel通道 SocketChannel channel = (SocketChannel) key.channel(); // 使用attachment()方法获取该channel关联的buffer ByteBuffer buffer = (ByteBuffer) key.attachment(); int len = channel.read(buffer); System.out.println("From Client: " + new String(buffer.array(), 0, len)); } // 在处理完这个key的事件之后,将这个key从活动中的selectionKeys中移除 selectionKeys.remove(key); } } } }

    客户端 NIOClient:

    import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) throws IOException { // 得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); // 设置非阻塞 socketChannel.configureBlocking(false); // 提供服务器端的ip和端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8888); // 连接服务器 if (!socketChannel.connect(inetSocketAddress)){ while (!socketChannel.finishConnect()){ System.out.println("未连接,此时不会阻塞,可以进行其他活动"); } } // 连接成功,设置buffer ByteBuffer buffer = ByteBuffer.wrap("hello dzzhyk".getBytes()); // 写入数据 socketChannel.write(buffer); // 防止客户端断开 System.in.read(); } }

    首先启动服务器端,然后启动客户端输出结果如下:

    服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 客户端连接成功:1338668845 From Client: hello dzzhyk 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接

    可以看到,服务器在处理完成accept和read事件之后没有一直阻塞等待,而是非阻塞的

    NIOServer中的select方法也可以直接被selectNow方法替代

    NIO群聊应用实例

    服务端:

    import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; public class Server { private static String HOST = "127.0.0.1"; private static int PORT = 8888; private static ServerSocketChannel serverSocketChannel; private static Selector selector; public Server() { try { serverSocketChannel = ServerSocketChannel.open(); selector = Selector.open(); serverSocketChannel.socket().bind(new InetSocketAddress(HOST, PORT)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e){ e.printStackTrace(); } } public void listen() throws IOException { while (true){ if (selector.select(1000) == 0){ continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { if (key.isAcceptable()){ SocketChannel socketChannel = serverSocketChannel.accept(); // 注册channel socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 获取ip SocketAddress address = socketChannel.getRemoteAddress(); String msg = "[" + address + "] login.."; System.out.println(msg); broadcast(msg, socketChannel); } if (key.isReadable()){ handleRead(key); } keys.remove(key); } } } private void handleRead(SelectionKey key){ SocketChannel channel = null; String msg; try { channel = (SocketChannel) key.channel(); channel.configureBlocking(false); // 获取ip SocketAddress address = channel.getRemoteAddress(); ByteBuffer buffer = ByteBuffer.allocate(1024); msg = "[" + address + "] "; // 内容 int len = channel.read(buffer); msg += new String(buffer.array(), 0, len); // 将此对应的channel设置为准备下一次接受数据 key.interestOps(SelectionKey.OP_READ); System.out.println(msg); broadcast(msg, channel); }catch (Exception e){ try { // 出错或者客户端断开 assert channel != null; msg = "[" + channel.getRemoteAddress() + "] leaved..."; broadcast(msg, channel); System.out.println(msg); key.cancel(); channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 转发消息 */ private void broadcast(String msg, SocketChannel except) throws IOException { Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { Channel target = key.channel(); if (target instanceof SocketChannel && target != except){ SocketChannel dst = (SocketChannel) target; dst.write(ByteBuffer.wrap(msg.getBytes())); } } } public static void main(String[] args) throws IOException { new Server().listen(); } }

    客户端Client:

    import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Scanner; import java.util.Set; public class Client { private static String HOST = "127.0.0.1"; private static int PORT = 8888; private Selector selector; private SocketChannel socketChannel; private InetSocketAddress address; private Scanner scanner; public Client(){ try { socketChannel = SocketChannel.open(); selector = Selector.open(); address = new InetSocketAddress(HOST, PORT); scanner = new Scanner(System.in); } catch (IOException e) { e.printStackTrace(); } } public void start() throws IOException { socketChannel.connect(address); if (socketChannel.isConnectionPending()){ socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("Login to:" + socketChannel.getRemoteAddress()); new Thread(()->{ try { while (true){ if (selector.select(1000) == 0){ continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { if (key.isReadable()){ read(key); } keys.remove(key); } } }catch (IOException e){ e.printStackTrace(); } }).start(); while (true){ if (scanner.hasNextLine()) { String s = scanner.nextLine(); if ("".equals(s.trim())) continue; write(s); } } } private void write(String msg) throws IOException { socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); System.out.println(new String(buffer.array(), 0, len)); } public static void main(String[] args) { try { new Client().start(); }catch (IOException e){ e.printStackTrace(); } } }

    运行效果如下:

    服务端输出

    [/127.0.0.1:52109] login.. [/127.0.0.1:52122] login.. [/127.0.0.1:52122] hello [/127.0.0.1:52109] ?? [/127.0.0.1:52122] 在吗? [/127.0.0.1:52109] 在的
    Processed: 0.013, SQL: 8