JAVA IO

    科技2022-07-10  109

    IO有内存IO、网络IO和磁盘IO三种,通常我们说的IO指的是后两者(网络IO把磁盘换做网卡即可)。

    Terminologies

    同步IO(Synchronous I/O):A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes.异步IO(Asynchronous I/O):An asynchronous I/O operation does not cause the requesting process to be blocked.阻塞IO(Blocking I/O):非阻塞IO(Non-Blocking I/O):

    一、IO模型

    对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的Process (or Thread),另一个就是系统内核(Kernel)。当一个read操作发生时,它会经历两个阶段(阶段一由操作系统决定,阶段二由用户程序决定???):

    1. 等待Kernel准备好数据 (Waiting for the data to be ready) 2. Process将数据从Kernel拷贝到Process中(Copying the data from the kernel to the process)

    1.1 阻塞IO(Blocking IO)

    去餐馆吃饭,点一个自己最爱吃的盖浇饭,然后在原地等着一直到盖浇饭做好,自己端到餐桌就餐。这就是典型的阻塞IO。当厨师给你做饭的时候,你需要一直在那里等着。

    Netty包的Socket类中定义的recvFrom方法:

    private static native DatagramSocketAddress recvFrom( int fd, ByteBuffer buf, int pos, int limit) throws IOException;

    阻塞IO模式下,阶段一和阶段二都是Blocked的。

    1.2 非阻塞IO(Non-Blocking IO)

    接着上面的例子,你每次点完饭就在那里等着,突然有一天你发现自己真傻。于是,你点完之后,就回桌子那里坐着,然后估计差不多了,就问老板饭好了没,如果好了就去端,没好的话就等一会再去问,依次循环直到饭做好。这就是非阻塞IO。

    这种方式在编程中对socket设置O_NONBLOCK即可。但此方式仅仅针对网络IO有效,对磁盘IO并没有作用。因为本地文件IO就没有被认为是阻塞的,我们所说的网络IO的阻塞是因为网路IO有无限阻塞的可能,而本地文件除非是被锁住,否则是不可能无限阻塞的,因此只有锁这种情况下,O_NONBLOCK才会有作用。而且,磁盘IO时要么数据在内核缓冲区中直接可以返回,要么需要调用物理设备去读取,这时候进程的其他工作都需要等待。因此,后续的IO复用和信号驱动IO对文件IO也是没有意义的。

    非阻塞IO模式下,阶段一是Non-Blocked,而阶段二是Blocked的。

    1.3 IO多路复用(IO Multiplexing,事件驱动IO)

    IO 复用的实现方式目前主要有select、poll和epoll。

    IO多路复用模式下,阶段一是Non-Blocked,而阶段二是Blocked的。

    1.4 信号驱动IO

    1.5 异步IO(Asynchronous I/O)

    Linux 上没有,Windows 上对应的是 IOCP。

    异步IO模式下,阶段一和阶段二都是Non-Blocked的。

    按照上面的定义,之前所述的Blocking IO,Non-Blocking IO,IO multiplexing都属于Synchronous IO。

    有人可能会说,Non-Blocking IO并没有被Blocked啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个System Call。Non-Blocking IO在执行recvfrom这个System Call的时候,如果Kernel的数据没有准备好,这时候不会Block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从Kernel拷贝到用户内存中,这个时候进程是被Blocked了,在这段时间内,进程是被Block的。而Asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到Kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被Block。

    二、网络编程模型

    上文讲述了 UNIX 环境的五种 IO 模型。基于这五种模型,在 Java 中,随着 NIO 和 NIO2.0(AIO) 的引入,一般具有以下几种网络编程模型:

    2.1 BIO

    BIO 是一个典型的网络编程模型,是通常我们实现一个服务端程序的过程,步骤如下:

    主线程accept请求阻塞;请求到达,创建新的线程来处理这个套接字,完成对客户端的响应;主线程继续accept下一个请求

    这种模型有一个很大的问题是:当客户端连接增多时,服务端创建的线程也会暴涨,系统性能会急剧下降。因此,在此模型的基础上,类似于 Tomcat 的bio connector,采用的是线程池来避免对于每一个客户端都创建一个线程。有些地方把这种方式叫做伪异步IO(把请求抛到线程池中异步等待处理)。

    2.2 NIO

    JDK1.4 开始引入了 NIO 类库,这里的 NIO 指的是 Non-blcok IO,主要是使用Selector多路复用器来实现。Selector在 Linux 等主流操作系统上是通过epoll实现的。

    2.3 AIO

    2.4 模型对比

    三、网络IO的设计模式

    3.1 Reactor 模式

    主动模式,所谓主动,是指应用程序不断去轮询,问操作系统,IO是否就绪。Linux 下的select/poll/epoll就属于主动模式,需要应用程序中有个循环,一直去 poll。在这种模式下,实际的IO操作还是应用程序做的。

    3.2 Preactor模式

    被动模式,你把read/write全部交给操作系统,实际的IO操作由操作系统完成,完成之后,再 Callback 你的应用程序。Windows 下的 IOCP 就属于这种模式,再比如 C++ Boost 中的 Asio 库,就是典型的 Proactor 模式。

    四、epoll 编程模型 - 3个阶段

    在 Linux 平台上,Java NIO 就是基于epoll来实现的。所有基于epoll的框架,都有3个阶段:

    注册事件(connect, accept, read, write)轮询IO是否就绪执行实际IO操作。

    下面的代码展示了在 Linux 下,用 C语言epoll编程的基本框架:

    // 阶段1: 调用epoll_ctl(xx) 注册事件 for( ; ; ) { nfds = epoll_wait(epfd,events,20,500); //阶段2:轮询所有的socket for(i=0;i<nfds;++i) // 处理轮询结果 { if(events[i].data.fd==listenfd) // accept事件就绪 { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); // 阶段3:执行实际的IO操作,accept ev.data.fd=connfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); // 回到阶段1:重新注册 } else if( events[i].events&EPOLLIN ) // 读就绪 { n = read(sockfd, line, MAXLINE)) < 0 // 阶段3:执行实际的io操作 ev.data.ptr = md; ev.events=EPOLLOUT|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 回到阶段1:重新注册事件 } else if(events[i].events&EPOLLOUT) // 写就绪 { struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; sockfd = md->fd; send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); // 阶段3: 执行实际的io操作 ev.data.fd=sockfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 回到阶段1,重新注册事件 } else { // 其他的处理 } } }

    同样, Java NIO 中的Selector同样有以下3个阶段,下面把Selector和epoll的使用做个对比:

    Java NIOepoll注册channel.register(selector, xxx) selectKey.interOps = xxxepoll_ctr(…)轮训selector.poll()epoll_wait(…)实际IO操作channel.accept channel.read channel.writeaccept read write

    可以看到,两者只是写法不同,同样的, 都有这3个阶段。

    下面的表格展示了connect, accept, read, write 这4种事件,分别在这3个阶段对应的函数:

    下面看一下Kafka client中Selector的核心实现:

    @Override public void poll(long timeout) throws IOException { clear(); // 清空各种状态 if (hasStagedReceives()) timeout = 0; long startSelect = time.nanoseconds(); int readyKeys = select(timeout); // 轮询 long endSelect = time.nanoseconds(); currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0) { Set<SelectionKey> keys = this.nioSelector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); lruConnections.put(channel.id(), currentTimeNanos); try { if (key.isConnectable()) { // 有连接事件 channel.finishConnect(); this.connected.add(channel.id()); this.sensors.connectionCreated.record(); } if (channel.isConnected() && !channel.ready()) channel.prepare(); // 这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // 读就绪 NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); // 实际的读动作 } if (channel.ready() && key.isWritable()) { // 写就绪 Send send = channel.write(); // 实际的写动作 if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); // 出现异常关闭 Channel this.disconnected.add(channel.id()); } } } addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); maybeCloseOldestConnection(); }

    五、LT & ET 模式

    我们知道,epoll里面有2种模式:

    LT(水平触发)ET(边缘触发)。

    水平触发,又叫条件触发;边缘触发,又叫状态触发。这2种到底有什么区别呢?

    在这里就要引入socket的“读/写缓冲区”的概念了:

    水平触发(条件触发):读缓冲区只要不为空,就一直会触发读事件;写缓冲区只要不满,就一直会触发写事件。这个比较符合编程习惯,也是epoll的缺省模式。

    边缘触发(状态触发):读缓冲区的状态,从空转为非空的时候,触发1次;写缓冲区的状态,从满转为非满的时候,触发1次。比如你发送一个大文件,把写缓存区塞满了,之后缓存区可以写了,就会发生一次从满到不满的切换。

    通过分析,我们可以看出: 对于LT模式,要避免“写的死循环”问题:写缓冲区为满的概率很小,也就是“写的条件“会一直满足,所以如果你注册了写事件,没有数据要写,但它会一直触发,所以在LT模式下,写完数据,一定要取消写事件;

    对应ET模式,要避免“short read”问题:比如你收到100个字节,它触发1次,但你只读到了50个字节,剩下的50个字节不读,它也不会再次触发,此时这个socket就废了。因此在ET模式,一定要把“读缓冲区”的数据读完。

    另外一个关于LT和ET的区别是:LT适用于阻塞和非阻塞IO, ET只适用于非阻塞IO。

    还有一个说法是ET的性能更高,但编程难度更大,容易出错。到底ET的性能,是不是一定比LT高,这个有待商榷,需要实际的测试数据来说话。

    上面说了,epoll缺省使用的LT模式,而 Java NIO 用的就是epoll的LT模式。下面就来分析一下 Java NIO 中connect/read/write事件的处理。

    connect 事件的注册:

    // Selector public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); SocketChannel socketChannel = SocketChannel.open(); try { socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); // 构造channel的时候,注册connect事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); this.channels.put(id, channel); }

    connect 事件的取消

    // 在上面的poll函数中,connect 事件就绪,也就是指 connect 连接完成,连接建立 if (key.isConnectable()) { // 有连接事件 channel.finishConnect(); ... } //PlainTransportLayer public void finishConnect() throws IOException { socketChannel.finishConnect(); // 调用channel的finishConnect() key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 取消connect事件,新加read事件组册 }

    read 事件的注册

    从上面也可以看出,read 事件的注册和 connect 事件的取消,是同时进行的。

    read 事件的取消

    因为 read 是要一直监听远程,是否有新数据到来,所以不会取消,一直监听。并且因为是 LT 模式,只要“读缓冲区”有数据,就会一直触发。

    write 事件的注册

    // Selector public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); try { channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } } // KafkaChannel public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 每调用一次 Send,注册一次 Write 事件 }

    write 事件的取消

    // 上面的 poll 函数里面 if (channel.ready() && key.isWritable()) { // write 事件就绪 Send send = channel.write(); // 在这个 write 里面,取消了 write 事件 if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); // 取消 write 事件 return send.completed(); }

    总结一下:

    (1)“事件就绪“这个概念,对于不同事件类型,还是有点歧义的

    read 事件就绪:这个最好理解,就是远程有新数据到来,需要去 read。这里因为是 LT 模式,只要读缓冲区有数据,会一直触发。

    write 事件就绪:这个指什么呢? 其实指本地的 socket 缓冲区有没有满。没有满的话,就会一直触发写事件。所以要避免”写的死循环“问题,写完,要取消写事件。

    connect 事件就绪: 指 connect 连接完成

    accept 事件就绪:有新的连接进来,调用 accept 处理

    (2)不同类型事件,处理方式是不一样的:

    connect 事件:注册1次,成功之后,就取消了。有且仅有1次

    read 事件:注册之后不取消,一直监听

    write 事件: 每调用一次 send,注册1次,send 成功,取消注册

    Processed: 0.051, SQL: 8