关闭
Hit
enter
to search or
ESC
to close
May I Suggest ?
#leanote #leanote blog #code #hello world
Okeeper's Blog
Home
Archives
Tags
DevOps
软件笔记
Spring
学习
JVM系列
关于我
深入理解NIO
无
458
0
0
zhangyue
最近看开开源框架的源码,发现涉及到网络传输的基本都是居于NIO实现的,尽管之前也或多或少接触过一点NIO的知识,毕竟还是在工作中使用接触的比较少,总觉得不够深入,特此写一遍博客来梳理下,自己对NIO的认识。 # 同步,异步,阻塞,非阻塞 ## 同步 与 异步 同步与异步关注的是消息通讯机制,所谓同步就是主动发出调用,在结果返回之前改调用不会结束,换句话说就是程序主动待定调用结果的返回。 而异步则不同,当程序调用请求发出之后就立即返回了,此时是没有得到调用的结果的,当真正有结果是通过回调、通知、状态等方式告知给调用者,调用者在处理这个结果,这个就叫做异步。典型的ajax请求,MQ消息消费。 ## 阻塞 与 非阻塞 阻塞与非阻塞关注的是程序在等待调用结果时的状态。 阻塞是指在调用结果未返回时,当前线程的挂起状态(await)。 非阻塞是指调用不会立马返回结果,当前线程继续往下走,不会阻塞等待。 # 什么是NIO 在jdk1.4之前,是通过BIO来实现IO通讯的,服务端需要为每一个客户端开启一个请求处理线程为其服务,而从jdk1.4之后增加了NIO(非阻塞IO),NIO的服务端通过一个线程就能接收多个客户端的请求,而真正处理请求的线程可以使用线程池的方式提高效率,实现了请求事件接收与具体处理请求的线程分开的分离,做到了IO复用,即称之为多路复用。 举个例子吧,如果做一个聊天服务器,按照之前的BIO模式,你必须为连接上来的每一个客户端都创建一个现场来处理,InputStream.read()是阻塞的。而NIO是使用Reactor反应堆模式,使用一个线程轮询各个客户端连接的事件,当有可处理事件时返回对应的SelectionKey集合,对对应的SelectionKey进程处理就可以了,而不需要傻傻的监听每一个客户端连接。 再深入一点,我们都知道所有的网络通讯都是基于socket通讯,它们都有个共同的几个步奏: - Read Request 接收请求 - Decode Request 解析请求 - Process Service 处理请求 - Encode Reply 包装响应 - Send Reply 发送响应 以下是BIO模式下的网络请求线程模型图:  由图可以看出,clien数量与server的handler数量是1:1的关系,每个handler都独自处理以上5个步奏,这是不是很浪费资源(CPU为了阻塞线程之前的来回切换消耗计算能力,是计算机整体CPU性能利用不高),能不能有一个人帮所有线程监听着有没有可处理的请求,我在站出来处理呢,所以NIO做了如下优化。  NIO将监听客户端请求(轮询实现)交给了一个叫Reactor反应堆的东西(Selector),当有可处理请求时在dispatch给具体的线程处理,而此时线程处理的是非阻塞的流程,Selector既然告诉你可以读取或者可以写了他就一定可以读或可以写,channle.read()或channel.write()方法时不用等待。那么这是不是大大减少了线程资源开销,提高了并发效率。我们可以进一步提高并发,我们开启多个线程(或者线程池)并发处理那些可处理的Chanel。 > 关于**Reactor反应堆模式** 它实际上有点像Observer模式,所有的IO通道注册到Reactor中,Reactor底层轮询算法检查操作系统IO是否就绪,当有IO事件处理是返回到指定的handler进行处理,这个角色就是Reactor # NIO组成部分 - Channel,管道,是一个可双向通讯的连接 - Selector,是管道的管理对象,负责接收Chanel的IO事件 - SelectionKey,IO事件的选择键,Selector当有IO事件就绪时,通过SelectionKey关联到对应的通道进行处理 # NIO源码分析 NIO怎么做到多路复用与多个客户端的监听呢,那么这里最重要的就是Selector的实现,下面我们来看下Selector的源码。 ## 源码分析 先来看下`Selector.open()` ``` public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } //获取Selector的实现类 public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } ``` 可以看到在widows JDK返回的provider实现是`WindowsSelectorProvider`,而在Linux系统中`DefaultSelectorProvider`会根据不同的内核版本选择select/poll还是epoll模式的实现。 下面是Linux JDK的`DefaultSelectorProvider`实现 ``` public static SelectorProvider create() { PrivilegedAction pa = new GetPropertyAction("os.name"); String osname = (String) AccessController.doPrivileged(pa); if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } // use EPollSelectorProvider for Linux kernels >= 2.6 if ("Linux".equals(osname)) { pa = new GetPropertyAction("os.version"); String osversion = (String) AccessController.doPrivileged(pa); String[] vers = osversion.split("\\.", 0); if (vers.length >= 2) { try { int major = Integer.parseInt(vers[0]); int minor = Integer.parseInt(vers[1]); if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } catch (NumberFormatException x) { // format not recognized } } } return new sun.nio.ch.PollSelectorProvider(); } sun.nio.ch.EPollSelectorProvider public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } sun.nio.ch.PollSelectorProvider public AbstractSelector openSelector() throws IOException { return new PollSelectorImpl(this); } ``` 可以看到,如果Linux版本大于2.6就返回epoll模式的实现 下面我们以Window的实现代码分析: ``` public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { return new WindowsSelectorProvider(); } } WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } ``` 其中Pipe.open()是关键,这个方法的调用过程是: ``` public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); } SelectorProvider 中: public Pipe openPipe() throws IOException { return new PipeImpl(this); } PipeImpl(SelectorProvider sp) { long pipeFds = IOUtil.makePipe(true); int readFd = (int) (pipeFds >>> 32); int writeFd = (int) pipeFds; FileDescriptor sourcefd = new FileDescriptor(); IOUtil.setfdVal(sourcefd, readFd); source = new SourceChannelImpl(sp, sourcefd); FileDescriptor sinkfd = new FileDescriptor(); IOUtil.setfdVal(sinkfd, writeFd); sink = new SinkChannelImpl(sp, sinkfd); } ``` 其中IOUtil.makePipe(true)是个native方法: ``` /** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */ static native long makePipe(boolean blocking); ``` 正如这段注释的意思:它返回包含两个文件描述的long类型数字,低位存放管道Pipe读read的描述文件,高位存放写write端的文件描述,所以PipeImpl构造方法中要对返回的pipeFds作位运算获取这两个值,并且实例化两个文件描述对象source和sink,然后回到WindowsSelectorImpl的构造方法: ``` WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } ``` `pollWrapper.addWakeupSocket(wakeupSourceFd, 0); `将源头读的一端加到pollWrapper中保存。到此Windows下`Selector.open()`执行完毕,总结一下就是:新建了一个可读可写的一个管道,并返回两端的描述文件。 下面我们来看`ServerSocketChannel.open()`的实现: ``` public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); } SelectorProvider: public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); } ``` 可以看到这里也用到了`SelectorProvider.provider()`由于刚刚初始化过一遍,将直接返回刚刚初始化的对象。下面看`new ServerSocketChannelImpl(this)` ``` ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); //打开一个socket,返回FD this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; } ``` 然后通过然后通过`serverChannel.register(selector, SelectionKey.OP_ACCEPT);`把selector和channel绑定在一起,也将刚才`ServerSocketChannelImpl`中的fd绑定在一起。 到此一个服务端的Selector和ServerSocketChannel初始化完成。生成了以下几个对象: WindowsSelectorProvider:单例 WindowsSelectorImpl中包含: pollWrapper:保存selector上注册的FD,包括pipe的write端FD和ServerSocketChannel所用的FD wakeupPipe:通道(其实就是两个FD,一个read,一个write) 下面我们再到`selector.select();`主要调用了WindowsSelectorImpl中的这个方法: ``` protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; } ``` 这个方法的核心就是`subSelector.poll()`也是获取到pollWrapper中保存的`ServerChannelSocket`的socket的fd进行轮询,底层调用的就是操作系统的`poll`方法,阻塞直到有IO事件就绪时才返回,还记得刚刚初始化`Selector.open()`时候时候保存了Pipe的write端的文件描述fd,所以只要Pipe调用这个描述文件write一下,这个poll阻塞也会返回。所以如果这两者都不发生,将一直阻塞。 我们再来看下怎样通过Piped的write端FD唤醒poll的`WindowsSelectorImpl#wakeup()`怎么实现的: ``` public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } //这是一个native方法 private native void setWakeupSocket0(int wakeupSinkFd); //底层实现代码如下 JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) { /* Write one byte into the pipe */ const char byte = 1; send(scoutFd, &byte, 1, 0); } ``` 可以看成它就是拿到sourceFd的write描述文件写入一个1,poll就唤醒了。所以可以调用`selector.wakeup()`唤醒正在阻塞的poll ## 总结 从上面代码分析可知: 1. 在操作系统中,所有的硬件设备的操作都可以看出是一个文件操作,文件描述就相当这个文件引用,通过它可以对文件进行操作,socket在操作系统中返回一个socketFd 1. Pipe.open()打开了一个管道,拿到了wakeupSourceFd和wakeupSinkFd两个文件描述符 2. 实例化了一个pollWrapper,用于存放注册serverSocket的描述文件fd和Pipe管道的write描述文件sourceFd(用来select.wakeup())唤醒。 3. 调用`select.doSelect()`,传入所有socket的FD给poll轮询所有的IO事件,直至有socket就绪 4. 可以使用`select.wakeup()`向sourceFd写入1个字节,实现poll唤醒 # NIO原理 ## IO模型 在Linux下,根据IO的同步、异步、阻塞和非阻塞,IO模型可以分为以下5个类。  ### 阻塞IO模型 最常见的阻塞IO模型,进程调用InputStream.read时底层调用recvfrom阻塞直至有读取到数据返回  ### 非阻塞IO模型 进程吧套接字接口设置成非阻塞模式,应用进程反复轮询是否可以读取数据,如果没有不阻塞并返回一个错误提示,程序继续轮询直至返回读取的数据库。  ### IO复用模型(多路复用模型) 在Linux系统下,提供了poll/select底层接口调用,进程只需要将对于的文件描述(linux下一切结尾文件)fd传给select或者poll,select或者poll就会监听扫描socket是否准备就绪,者两个接口fd的参数个数有限。select或者poll轮询到有IO就绪时返回到应用程序,应用程序在对socket进行io从内核空间拷贝到用户空间。  ### 信号驱动/事件驱动IO模型 先开启socket信号驱动IO功能,并调用系统sigaction设置一个回调处理函数。当有IO数据准备就绪时,就会生成一个SIGIO信号传递给回调处理函数,处理函数在通过recvfrom来读取拷贝数据到用户空间。在Linux下的实现就是epoll  ### 异步IO模型 发起IO读取操作时立即返回,操作系统内核将数据拷贝纸用户空间时,在回调通知用户进程直接对数据进行操作,不需要自己拷贝了。与信号驱动不同的是,由内核帮我们将需要读取的数据准备好,不需要用户进程自己读取。对应Linux系统实现就是AIO,对应Windows系统就是IOCP,异步IO模型叶城Proactor模型,与Reactor模型对应  ## 总结 前四种都是同步IO读取,需要由用户进程将内核数据拷贝纸用户空间,这个过程是阻塞的,; 而最后一种异步IO,是将IO读取的所有过程交给操作系统内核处理,读取完成之后回调返回读取结果即可。 > 关于epoll,select,poll 这些都是Linux操作系统提供的多路复用轮询机制的接口实现,可以同时监听多个socket文件描述,一旦有IO事件就绪,就通知用户进程进行相应的读写操作。 select 每次调用select都需要将监听的描述文件集合从用户空间传递给内核空间,当描述文件很多事开销很大;每次轮询都遍历所有的描述文件;select支持的文件描述数量有限,默认是1024 poll poll与select相似,只不过传递描述文件集合的方式不一样,poll使用pollfd而不是select的描述文件集合fd_set. epoll的效率更高,优化了select的轮询操作,使用信号量callback事件响应通知,epoll优化了描述文件的拷贝,减少拷贝次数,同时它监听的描述文件数量不受限制(系统最大文件打开数)。再有一点就是使用了mmap内存映射加速了内核与用户空间的数据拷贝,无论是select,poll还是epoll都需要内核把fd描述文件传递给用户进程操作,然后由用户进程对将内核数据拷贝到用户空间,怎样加快这个拷贝过程就显得很重要,在这一点上,epoll是用了mmap将用户进程与内核进程共享同一块内存,用户进程就可以不需要拷贝可以直接读取使用。 > **关于IOCP**,是widows环境下的异步IO处理模型,IOCP采用线程池+队列+重叠结构的内核机制完成。AIO是linux系统的底层实现。 # 参考资料 [《Reactor模式和NIO》](http://blog.csdn.net/it_man/article/details/38417761) [《关于同步,异步,阻塞,非阻塞,IOCP/epoll,select/poll,AIO ,NIO ,BIO的总结》](http://blog.csdn.net/chen8238065/article/details/48315085) [《Java NIO——Selector机制源码分析---转》](https://www.cnblogs.com/davidwang456/p/3831617.html)
觉得不错,点个赞?
Please enable JavaScript to view the
comments powered by Disqus.
comments powered by
Disqus
文章目录