努力写更多优质文章,欢迎关注CC_且听风吟~
Java NIO(Java non-blocking IO),指的是JDK提供的新API,从JDK1.4开始提供的一系列改进的输入输出特性,被称为NIO,是同步非阻塞的
相关包为java.nio包,并且对原来的java.io包中的很多类进行了改写
NIO的三个核心部分:Selector、Channel、Buffer,一张图:
NIO是面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可以在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
JavaNIO的非阻塞模式,使一个线程从某Channel发送请求或者读取数据,但是它仅能得到目前可用的数据;如果目前没有数据可用,就什么都不会获取,而不是阻塞等待(不会阻塞)。
所以,直到有了新的数据之前,这个线程可以处理其他内容,这是非阻塞读取read
非阻塞写也是如此,一个线程请求写入一些数据到某通道,但是不需要等待他完全写入,这个线程可以处理其他内容。
NIO可以做到使用一个线程来处理多个操作。假设有10000个请求,可以分配50个线程来处理即可,而不是像BIO一样需要10000个线程
HTTP2.0使用了多路复用技术,实现了一个连接并发处理多个请求。
程序输出内容如下:
0 1 2 3 4缓冲区(Buffer)
底层就是一个数组,提供了操作该数组的方法,可以更轻松地使用内存块;缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。
数据的读取或者写入都必须经过Buffer
以下代码摘取自java.nio包下的Buffer类:
public abstract class Buffer { // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; // Creates a new buffer with the given mark, position, limit, and capacity, // after checking invariants. Buffer(int mark, int pos, int lim, int cap) { // package-private if (cap < 0) throw new IllegalArgumentException("Negative capacity: " + cap); this.capacity = cap; limit(lim); position(pos); if (mark >= 0) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")"); this.mark = mark; } } }Buffer抽象类中有上面四个主要的属性:mark、position、limit、capacity
position:可以理解为内置数组的当前下标limit:position不能超过的最大位置,可以比capacity小capacity:内置数组的最大容量mark:标记位置,reset时需要例如Buffer的子类ByteBuffer:
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> { final byte[] hb; // Non-null only for heap buffers final int offset; boolean isReadOnly; // Valid only for heap buffers // Creates a new buffer with the given mark, position, limit, capacity, // backing array, and array offset ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); // 调用了父类方法 this.hb = hb; // byte内部数组 this.offset = offset; } }网络编程中常用ByteBuffer
ByteBuffer支持类型化的put和get方法,放入的是什么数据类型,就应该使用相应的数据类型并且按照相同的顺序来取出,否则结果错误或者报错BufferUnderflowException,buffer溢出
import java.nio.ByteBuffer; import java.nio.IntBuffer; public class BufferExample1 { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(2333); buffer.putChar('c'); buffer.putDouble(0.1234567); buffer.putShort((short) 4); // 取出 buffer.flip(); System.out.println(buffer.getInt()); System.out.println(buffer.getDouble()); System.out.println(buffer.getShort()); System.out.println(buffer.getChar()); } }上面这个例子里面没有按照相同的数据类型顺序取出,结果是错误的内容(和内存占用划分相似):
2333 8.56609708773623E-307 -9614 []可以将一个普通Buffer转换成只读Buffer
import java.nio.ByteBuffer; import java.nio.IntBuffer; public class BufferExample1 { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.putInt(2333); buffer.putChar('c'); buffer.putDouble(0.1234567); buffer.putShort((short) 4); // 得到只读buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); System.out.println(readOnlyBuffer.getInt()); System.out.println(readOnlyBuffer.getChar()); System.out.println(readOnlyBuffer.getDouble()); System.out.println(readOnlyBuffer.getShort()); readOnlyBuffer.putInt(111); } }Exception in thread “main” java.nio.ReadOnlyBufferException
使用MappedByteBuffer可以让文件直接在内存(堆外内存)中进行修改,操作系统不需要额外拷贝一次进入堆内存:
import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.IntBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; public class BufferExample2 { public static void main(String[] args) throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("./nio/1.txt", "rw"); FileChannel channel = randomAccessFile.getChannel(); /* FileChannel.MapMode.READ_WRITE 使用读写模式 0 可以直接修改的起始位置 5 文件的多少个字节映射到内存,可以直接修改的范围是[0, 0+5) 实际类型是DirectByteBuffer */ MappedByteBuffer mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); mappedBuffer.put(0, (byte) 'A'); mappedBuffer.put(3, (byte) 'B'); mappedBuffer.put(4, (byte) 'C'); mappedBuffer.put(5, (byte) 'D'); // 越界 IndexOutOfBoundsException randomAccessFile.close(); } }上面例子修改过后的文件是直接在内存中进行修改的,因此IDE中无法观察变化,在磁盘上重新打开文件可以观察文件内容的变化
缓冲区可以使用Buffer数组来充当缓冲区
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; public class BufferExample3 { public static void main(String[] args) throws IOException { // 创建一个Buffer数组 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(5); FileInputStream fin = new FileInputStream("./nio/2.txt"); FileChannel channel = fin.getChannel(); long len = channel.read(byteBuffers); System.out.println("总长度:" + len); Arrays.stream(byteBuffers) .map(buffer -> "已经占用: "+buffer.position() + ", buffer总计: "+ buffer.limit()) .forEach(System.out::println); } }输出结果如下:
总长度:8 已经占用: 5, buffer总计: 5 已经占用: 3, buffer总计: 5NIO的Channel通道类似于流,但是有一些区别:
Channel可以同时进行读写,而流只能读或者写;
Channel可以实现异步读写数据
Channel可以从缓冲区中读取数据,也可以写入数据到缓冲区
Channel是NIO中的一个接口,有很多实现的子类接口:
常用的抽象实现类有:
类名说明FileChannel本地文件的读写DatagramChannelUDP数据读写ServerSocketChannel服务端TCP数据读写SocketChannel客户端TCP数据读写上面这个例子需要注意的是在一次读取完成之后的clear操作,不然由于position和limit没有复位,下次再进行read操作的时候就会一直得到read=0的结果
这是对上面的例子,有更好的简化:使用transfeFrom或者transferTo函数
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; // 拷贝文件 public class FileChannelExample4 { public static void main(String[] args) throws IOException { FileInputStream fin = new FileInputStream("./nio/1.txt"); FileChannel channel_read = fin.getChannel(); FileOutputStream fout = new FileOutputStream("./nio/4.txt"); FileChannel channel_write = fout.getChannel(); // 使用函数来进行channel之间的拷贝操作 channel_write.transferFrom(channel_read, 0, channel_read.size()); channel_read.transferTo(0, channel_read.size(), channel_write); channel_read.close(); channel_write.close(); } }Java NIO使用的非阻塞IO方式,可以使用Selector(选择器)来实现一个线程处理多个客户端请求
Selector会监测多个注册的通道Channel上是否有时间发生(多个Channel以事件的方式可以注册到同一个Selector)。如果有事件发生,就获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道的请求。
只有在通道真正有读写事件OP_READ、OP_WRITE发生的时候,才会进行读写,大大地减少了系统开销,并且不必为每个连接都创建一个线程,避免了多线程之间的上下文切换导致的开销。
当线程从某个客户端Socket通道进行读写数据时,如果没有数据可用时,该线程可以进行其他任务。
线程通常将非阻塞I/O的空闲时间用于在其他通道上执行IO操作,这样执行效率很高
由于读写操作都是非阻塞的,这样就可以充分提升I/O线程的运行效率,避免由于频繁I/O阻塞造成的线程挂起Selector就是通过select方法来循环遍历channels获取事件,如果有新的事件产生就加入selectedKeys集合,如果没有就根据要求返回或者阻塞
根据上面的简易版的NIO网络通信流程,下面是代码实现
服务器端 NIOServer:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; public class NIOServer { public static void main(String[] args) throws IOException { // 创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 获取Selector对象 Selector selector = Selector.open(); // 绑定端口,在服务器端进行监听 serverSocketChannel.socket().bind(new InetSocketAddress(8888)); // 设置为非阻塞 serverSocketChannel.configureBlocking(false); // 将serverSocketChannel注册到selector,其关注的事件为Accept serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 循环等待客户端连接 while (true){ // 等待1s,如果没有连接事件发生,就返回 (阻塞1s) int select = selector.select(1000); if (select == 0){ System.out.println("服务器等待了1s...无连接"); continue; } // 如果返回的不是0,获取到相关的selectionKey集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("活动中:" + selectionKeys.size()); // 使用selectionKeys获取Channel for (SelectionKey key : selectionKeys) { // 如果发生了连接OP_ACCEPT事件 if (key.isAcceptable()){ // 这里已经确定了accept,所以使用accept()方法不会阻塞,会直接执行 SocketChannel socketChannel = serverSocketChannel.accept(); // 客户端也是非阻塞的 socketChannel.configureBlocking(false); // 将socketChannel注册到selector,同时关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接成功:" + socketChannel.hashCode()); System.out.println("总计注册:" + selector.keys().size()); } // 如果发生了读取OP_READ事件 if (key.isReadable()){ // 使用key反向获取socketChannel通道 SocketChannel channel = (SocketChannel) key.channel(); // 使用attachment()方法获取该channel关联的buffer ByteBuffer buffer = (ByteBuffer) key.attachment(); int len = channel.read(buffer); System.out.println("From Client: " + new String(buffer.array(), 0, len)); } // 在处理完这个key的事件之后,将这个key从活动中的selectionKeys中移除 selectionKeys.remove(key); } } } }客户端 NIOClient:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) throws IOException { // 得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); // 设置非阻塞 socketChannel.configureBlocking(false); // 提供服务器端的ip和端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8888); // 连接服务器 if (!socketChannel.connect(inetSocketAddress)){ while (!socketChannel.finishConnect()){ System.out.println("未连接,此时不会阻塞,可以进行其他活动"); } } // 连接成功,设置buffer ByteBuffer buffer = ByteBuffer.wrap("hello dzzhyk".getBytes()); // 写入数据 socketChannel.write(buffer); // 防止客户端断开 System.in.read(); } }首先启动服务器端,然后启动客户端输出结果如下:
服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接 客户端连接成功:1338668845 From Client: hello dzzhyk 服务器等待了1s...无连接 服务器等待了1s...无连接 服务器等待了1s...无连接可以看到,服务器在处理完成accept和read事件之后没有一直阻塞等待,而是非阻塞的
NIOServer中的select方法也可以直接被selectNow方法替代
服务端:
import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Set; public class Server { private static String HOST = "127.0.0.1"; private static int PORT = 8888; private static ServerSocketChannel serverSocketChannel; private static Selector selector; public Server() { try { serverSocketChannel = ServerSocketChannel.open(); selector = Selector.open(); serverSocketChannel.socket().bind(new InetSocketAddress(HOST, PORT)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e){ e.printStackTrace(); } } public void listen() throws IOException { while (true){ if (selector.select(1000) == 0){ continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { if (key.isAcceptable()){ SocketChannel socketChannel = serverSocketChannel.accept(); // 注册channel socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 获取ip SocketAddress address = socketChannel.getRemoteAddress(); String msg = "[" + address + "] login.."; System.out.println(msg); broadcast(msg, socketChannel); } if (key.isReadable()){ handleRead(key); } keys.remove(key); } } } private void handleRead(SelectionKey key){ SocketChannel channel = null; String msg; try { channel = (SocketChannel) key.channel(); channel.configureBlocking(false); // 获取ip SocketAddress address = channel.getRemoteAddress(); ByteBuffer buffer = ByteBuffer.allocate(1024); msg = "[" + address + "] "; // 内容 int len = channel.read(buffer); msg += new String(buffer.array(), 0, len); // 将此对应的channel设置为准备下一次接受数据 key.interestOps(SelectionKey.OP_READ); System.out.println(msg); broadcast(msg, channel); }catch (Exception e){ try { // 出错或者客户端断开 assert channel != null; msg = "[" + channel.getRemoteAddress() + "] leaved..."; broadcast(msg, channel); System.out.println(msg); key.cancel(); channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 转发消息 */ private void broadcast(String msg, SocketChannel except) throws IOException { Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { Channel target = key.channel(); if (target instanceof SocketChannel && target != except){ SocketChannel dst = (SocketChannel) target; dst.write(ByteBuffer.wrap(msg.getBytes())); } } } public static void main(String[] args) throws IOException { new Server().listen(); } }客户端Client:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Scanner; import java.util.Set; public class Client { private static String HOST = "127.0.0.1"; private static int PORT = 8888; private Selector selector; private SocketChannel socketChannel; private InetSocketAddress address; private Scanner scanner; public Client(){ try { socketChannel = SocketChannel.open(); selector = Selector.open(); address = new InetSocketAddress(HOST, PORT); scanner = new Scanner(System.in); } catch (IOException e) { e.printStackTrace(); } } public void start() throws IOException { socketChannel.connect(address); if (socketChannel.isConnectionPending()){ socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("Login to:" + socketChannel.getRemoteAddress()); new Thread(()->{ try { while (true){ if (selector.select(1000) == 0){ continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { if (key.isReadable()){ read(key); } keys.remove(key); } } }catch (IOException e){ e.printStackTrace(); } }).start(); while (true){ if (scanner.hasNextLine()) { String s = scanner.nextLine(); if ("".equals(s.trim())) continue; write(s); } } } private void write(String msg) throws IOException { socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); System.out.println(new String(buffer.array(), 0, len)); } public static void main(String[] args) { try { new Client().start(); }catch (IOException e){ e.printStackTrace(); } } }运行效果如下:
服务端输出
[/127.0.0.1:52109] login.. [/127.0.0.1:52122] login.. [/127.0.0.1:52122] hello [/127.0.0.1:52109] ?? [/127.0.0.1:52122] 在吗? [/127.0.0.1:52109] 在的