打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
黄金档 ? Java NIO

java NIO的实现中,有不少细节点非常有学习意义的,就好比下面的三个点:
1) Selector的 wakeup原理是什么?是如何实现的?
2) Channel的close会做哪些事?
3) 会什么代码中经常出现begin()和end()这一对儿?

本文虽然针对这几个点做了点分析,不能算是非常深刻,要想达到通透的地步,看来还得经过实战的洗练。

1、 wakeup()

准确来说,应该是Selector的wakeup(),即Selector的唤醒,为什么要有这个唤醒操作呢?那还得从Selector的选择方式来说明,前文已经总结过Selector的选择方式有三种:select()、select(timeout)、selectNow()。
selectNow的选择过程是非阻塞的,与wakeup没有太大关系。
select(timeout)和select()的选择过程是阻塞的,其他线程如果想终止这个过程,就可以调用wakeup来唤醒。

wakeup的原理

既然Selector阻塞式选择因为找到感兴趣事件ready才会返回(排除超时、中断),就给它构造一个感兴趣事件ready的场景即可。下图可以比较形象的形容wakeup原理:

Selector管辖的FD(文件描述符,linux即为fd,对应一个文件,windows下对应一个句柄;每个可选择Channel在创建的时候,就生成了与其对应的FD,Channel与FD的联系见另一篇)中包含某一个FD A, A对数据可读事件感兴趣,当往图中漏斗端放入(写入)数据,数据会流进A,于是A有感兴趣事件ready,最终,select得到结果而返回。

wakeup在Selector中的定义如下:
public abstract Selector wakeup();

下面结合上图来追寻wakeup的实现:
linux下Selector默认实现为PollSelectorImpl,当内核版本大于2.6时,实现为EPollSelectorImpl,仅看这两者的wakeup方法,代码似乎完全一样:

1
2
3
4
5
6
7
8
9
public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            pollWrapper.interrupt();
            interruptTriggered = true;
        }
    }
    return this;
}

window下Selector的实现为WindowsSelectorImpl,其wakeup实现如下:

1
2
3
4
5
6
7
8
9
public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }       
    return this;
}

其中interruptTriggered为中断已触发标志,当pollWrapper.interrupt()之后,该标志即为true了;得益于这个标志,连续两次wakeup,只会有一次效果。

对比上图及上述代码,其实pollWrapper.interrupt()及setWakeupSocket()就是图中的往漏斗中倒水的过程,不管windows也好,linux也好,它们wakeup的思想是完全一致的,不同的地方就在于实现的细节了,例如上图中漏斗与通道的链接部分,linux下是采用管道pipe来实现的,而windows下是采用两个socket之间的通讯来实现的,它们都有这样的特性:1)都有两个端,一个是read端,一个是write端,windows中两个socket也是一个扮演read的角色,一个扮演write的角色;2)当往write端写入数据,则read端即可以收到数据;从它们的特性可以看出,它们是能够胜任这份工作的。

如果只想理解wakeup的原理,看到这里应该差不多了,不过,下面,想继续深入一下,满足更多人的好奇心。
先看看linux下PollSelector的具体wakeup实现,分阶段来介绍:

1) 准备阶段

PollSelector在构造的时候,就将管道pipe,及wakeup专用FD给准备好,可以看一下它的实现:

1
2
3
4
5
6
7
8
9
10
PollSelectorImpl(SelectorProvider sp) {
    super(sp, 1, 1);
    int[] fdes = new int[2];
    IOUtil.initPipe(fdes, false);
    fd0 = fdes[0];
    fd1 = fdes[1];
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    pollWrapper.initInterrupt(fd0, fd1);
    channelArray = new SelectionKeyImpl[INIT_CAP];
}

IOUtil.initPipe,采用系统调用pipe(int fd[2])来创建管道,fd[0]即为ready端,fd[1]即为write端。
另一个需要关注的点就是pollWrapper.initInterrupt(fd0, fd1),先看一下它的实现:

1
2
3
4
5
6
void initInterrupt(int fd0, int fd1) {
    interruptFD = fd1;
    putDescriptor(0, fd0);
    putEventOps(0, POLLIN);
    putReventOps(0, 0);
}

可以看到,initInterrupt在准备wakeup专用FD,因为fd0是read端fd,fd1是write端fd:
interruptFD被初始化为write端fd;
putDescriptor(0, fd0)初始化pollfd数组中的第一个pollfd,即指PollSelector关注的第一个fd,即为fd0;
putEventOps(0, POLLIN)初始化fd0对应pollfd中的events为POLLIN,即指fd0对可读事件感兴趣;
putReventOps(0, 0)只是初始化一下fd0对应的pollfd中的revents;

2) 执行阶段

有了前面的准备工作,就看PollArrayWrapper中的interrupt()实现:

1
2
3
public void interrupt() {
    interrupt(interruptFD);
}

interrupt是native方法,它的入参interruptFD即为准备阶段管道的write端fd,对应于上图,其实就是漏斗端,因此,就是不看其实现,也知道它肯定扮演着倒水的这个动作,看其实现:

1
2
3
4
5
6
7
8
9
10
JNIEXPORT void JNICALL
Java_sun_nio_ch_PollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
    int fakebuf[1];
    fakebuf[0] = 1;
    if (write(fd, fakebuf, 1) < 0) {
         JNU_ThrowIOExceptionWithLastError(env,
                                          "Write to interrupt fd failed");
    }
}

可以看出,interrupt(interruptFD)是往管道的write端fd1中写入一个字节(write(fd, fakebuf, 1))。
是的,只需要往fd1中写入一个字节,fd0即满足了可读事件ready,则Selector自然会因为有事件ready而中止阻塞返回。

EPollSelector与PollSelector相比,其wakeup实现就只有initInterrupt不同,它的实现如下:

1
2
3
4
5
void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;
    incomingInterruptFD = fd0;
    epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}

epfd之前的篇章里已经讲过,它是通过epoll_create创建出来的epoll文件fd,epollCtl调用内核epoll_ctl实现了往epfd上添加fd0,且其感兴趣事件为可读(EPOLLIN),
因此可以断定,EPollSelector与PollSelector的wakeup实现是一致的。
因为之前一直专注与分析linux下的Java NIO实现,忽略了windows下的选择过程等,这里突然讲解其wakeup实现似乎很突兀,所以打算后面专门起一篇来介绍windows下的NIO实现,这里我们只需要理解wakeup原理,甚至自己去看看其wakeup实现,应该也没什么难度。

关于wakeup,这里还有两个疑问:
为什么wakeup方法返回Selector?
windows下也是有pipe的,为什么使用socket而不是使用pipe来实现wakeup的?

2、 close()

close()操作限于通道,而且还是实现了InterruptibleChannel接口的通道,例如FileChannel就没有close操作。
在分析close()具体实现之前,我们先得理解为什么要有close()这个操作:
一个可选择的通道,在创建之初会生成一个FileDescriptor,linux下即为fd,windows下即为句柄,这些都是系统资源,不能无限占用,当在不使用的时候,就应该将其释放,close即是完成这个工作的。
抽象类AbstractInterruptibleChannel实现了InterruptibleChannel接口,而SelectableChannel继承自AbstractInterruptibleChannel,因此,可选择的通道同时也是可以close的。
AbstractInterruptibleChannel的close实现如下:

1
2
3
4
5
6
7
8
public final void close() throws IOException {
synchronized (closeLock) {
    if (!open)
    return;
    open = false;
    implCloseChannel();
}
}

看来具体关闭逻辑就在implCloseChannel()中了,于是再看AbstractSelectableChannel:

1
2
3
4
5
6
7
8
9
10
11
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
    for (int i = 0; i < count; i++) {
    SelectionKey k = keys[i];
    if (k != null)
        k.cancel();
    }
}
}

先看synchronized同步块,它将当前通道保存的SelectionKey全部cancel,意思就是说,当前通关闭了,与它相关的所有SelectionKey都没有意义了,所以要全部取消掉,之前讲解cancel过程已经说明了,cancel操作只是将SelectionKey加入对应选择器的cancelKeys集合中,在下次正式选择开始的时候再一一清除;
这么看来,还是应该追究一下implCloseSelectableChannel()的实现了,下面分别从ServerSocketChannel和SocketChannel实现出发:
先看ServerSocketChannelImpl,

1
2
3
4
5
6
7
8
9
10
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
    nd.preClose(fd);
    long th = thread;
    if (th != 0)
    NativeThread.signal(th);
    if (!isRegistered())
    kill();
}
}

出现了两个很奇怪的东西,看来要完全弄懂这段代码,是得好好分析一下它们了,它们是:NativeDispatcher nd和NativeThread;
如果已经对linux信号机制非常熟悉,应该很容易猜测到NativeThread.signal(th)在做什么,
是的,它在唤醒阻塞的线程th,下面我们来看看它是如何做到的:
NativeThread类非常简单,几乎全是native方法:

1
2
3
4
5
6
7
8
9
class NativeThread {
    static native long current();
    static native void signal(long nt);
    static native void init();
    static {
        Util.load();
        init();
    }
}

在看其本地实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//自定义中断信号,kill –l
#define INTERRUPT_SIGNAL (__SIGRTMAX - 2)
//自定义的信号处理函数,当前函数什么都不做
static void
nullHandler(int sig)
{
}
#endif
//NativeThread.init()的本地实现,可以看到它用到了sigaction
//sigaction用来install一个信号
JNIEXPORT void JNICALL
Java_sun_nio_ch_NativeThread_init(JNIEnv *env, jclass cl)
{
#ifdef __linux__
sigset_t ss;
// 以下这段代码是常见的信号安装过程
// 讲解这段代码的目的只是为了让大家理解NativeThread.signal
// 的工作原理,故很多细节就简单带过了
struct sigaction sa, osa;
// sa用于定制信号INTERRUPT_SIGNAL的处理方式的
// 如sa_handler = nullHandler即用来指定信号处理函数的
// 即线程收到信号时,为执行这个函数,nullHandler是个空壳
// 函数,所以它什么都不做
// 不用理解sa_flags各个标识代表什么
// sigemptyset顾名思义,它是初始化sigaction的sa_mask位
// sigaction(INTERRUPT_SIGNAL, &sa, &osa)执行后
// 如果成功,则表示INTERRUPT_SIGNAL这个信号安装成功了
// 为什么要有这个init呢,其实不用这不操作也许不会有问题
// 但因为不能确保INTERRUPT_SIGNAL没有被其他线程install
// 过,如果sa_handler对应函数不是空操作,则在使用这个信号
// 时会对当前线程有影响
    sa.sa_handler = nullHandler;
    sa.sa_flags = 0;
    sigemptyset(&sa.sa_mask);
    if (sigaction(INTERRUPT_SIGNAL, &sa, &osa) < 0)
    JNU_ThrowIOExceptionWithLastError(env, "sigaction");
#endif
}
 
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_NativeThread_current(JNIEnv *env, jclass cl)
{
#ifdef __linux__
// pthread_self()即是获取当前线程ID,它与getpid()是不同的
// 具体细节没有研究
    return (long)pthread_self();
#else
    return -1;
#endif
}
 
JNIEXPORT void JNICALL
Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread)
{
#ifdef __linux__
//这个就是最关键的signal实现了,可以看到,它调用了pthread库的pthread_kill
//像thread线程发送一个INTERRUPT_SIGNAL信号,这个信号就是在init中install
//的,对应的处理函数是空函数,也就是说,往thread线程发送一个信号,如果该线程处于
//阻塞状态,则会因为受到信号而终止阻塞,而如果处于非阻塞,则无影响
    if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL))
    JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed");
#endif
}

Java的NativeThread做静态初始化时已经执行了init,也就是说INTERRUPT_SIGNAL信号已经被安装,而ServerSocketChannelImpl中的thread有两种可能值,见代码段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
          begin();
          if (!isOpen())
            return null;
          thread = NativeThread.current();
          for (;;) {
              n = accept0(this.fd, newfd, isaa);
              if ((n == IOStatus.INTERRUPTED) && isOpen())
              continue;
              break;
          }
          } finally {
            thread = 0;
            end(n > 0);
            assert IOStatus.check(n);
        }

try的内部,for循环之前,thread被复制为NativeThread.current()即为当前线程id;finally时thread又被修改回0,因此在implCloseSelectableChannel才有这样一段:

1
2
if (th != 0)
        NativeThread.signal(th);

NativeThread.signal(th)通过像当前线程发送INTERRUPT_SIGNAL信号而确保th线程没有被阻塞,即如果阻塞就停止阻塞。

为了让大家更好的理解信号的安装和使用,下面写了一个小程序来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <pthread.h>
#include <stdio.h>
#include <sys/signal.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
 
#define NUMTHREADS 3
#define INTERRUPT_SIGNAL (SIGRTMAX - 2)
 
void *threadfunc(void *parm)
{
    pthread_t             self = pthread_self(); 
    int                   rc;
    printf("Thread 0x%.8x %.8x entered\n", self);
    errno = 0;
    rc = sleep(30);
    if (rc != 0 && errno == EINTR) {
        printf("Thread 0x%.8x %.8x got a signal delivered to it\n",
                self);
        return NULL;
    }
    printf("Thread 0x%.8x %.8x did not get expected results! rc=%d, errno=%d\n",
            self, rc, errno);
    return NULL;
}
 
void sigroutine(int dunno) {
    printf("\nI''m doing nothing here : %d\n", dunno);
    return;
}
 
int main () {
    int                     i;
    int                     rc;
    struct sigaction        actions;
    pthread_t               threads[NUMTHREADS];
    memset(&actions, 0, sizeof(actions));
    sigemptyset(&actions.sa_mask);
    actions.sa_flags = 0;
    actions.sa_handler = sigroutine;
     
    rc = sigaction(INTERRUPT_SIGNAL,&actions,NULL);
    if(rc){
        printf("sigaction error!\n");
        exit(-1);
    }
     
    for(i = 0; i < NUMTHREADS; ++i) {
        rc = pthread_create(&threads[i], NULL, threadfunc,(void*)i);
        if(rc){
            printf("pthread_create error!\n");
            exit(-1);
        }
    }
 
    sleep(3);
    rc = pthread_kill(threads[0], INTERRUPT_SIGNAL);
    if(rc){
        printf("pthread_kill error!\n");
        exit(-1);
    }
    for(;;);
    return 1;
}

编译命令:gcc -lpthread –o signal_test.out
输出样本:
Thread 0xb77bcb70 00016088 entered
Thread 0xb6fbbb70 00000000 entered
Thread 0xb67bab70 00000000 entered

I’m doing nothing here : 62
Thread 0xb77bcb70 00016088 got a signal delivered to it
Thread 0xb6fbbb70 00000000 did not get expected results! rc=0, errno=0
Thread 0xb67bab70 00000000 did not get expected results! rc=0, errno=0

其实该小程序的意图很简单:创建了3个线程,每个线程内部会sleep 30秒,安装了一个信号INTERRUPT_SIGNAL,然后往第一个线程发送INTERRUPT_SIGNAL信号;可想而知,第一个线程会因为收到信号而终止sleep,后面两个线程就只能等30秒了。

现在理解了NativeThread了,我们再看NativeDispatcher
首先我们得知道在ServerSocketChannelImpl中,nd被初始化为SocketDispatcher,见:

1
2
3
4
5
static {
    Util.load();
    initIDs();
    nd = new SocketDispatcher();
}

又因为linux下一切皆文件的思想(现实虽然不绝对),SocketDispatcher其实就是用FileDispatcher实现的,最终FileDispatcher也只是封装了一大堆native方法,一波三折,
关于FileDispatcher,这里先不详细讲解了,先针对nd.preClose(fd)和kill将implCloseSelectableChannel的过程说明白吧:
首先,我们要明白这样一个道理:在多线程环境下,总是很难知道什么时候可安全的关闭或释放资源(如fd),当一个线程A使用fd来读写,而另一个线程B关闭或释放了fd,则A线程就会读写一个错误的文件或socket;为了防止这种情况出现,于是NIO就采用了经典的two-step处理方案:
第一步:创建一个socket pair,假设FDs为sp[2],先close掉sp[1],这样,该socket pair就成为了一个半关闭的链接;复制(dup2)sp[0]到fd(即为我们想关闭或释放的fd),这个时候,其他线程如果正在读写立即会获得EOF或者Pipe Error,read或write方法里会检测这些状态做相应处理;
第二步:最后一个会使用到fd的线程负责释放
nd.preClose(fd)即为两步曲中的第一步,我们先来看其实现,最终定位到FileDispatcher.c,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int preCloseFD = -1;
 
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_init(JNIEnv *env, jclass cl)
{
    int sp[2];
    if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) {
    JNU_ThrowIOExceptionWithLastError(env, "socketpair failed");
        return;
    }
    preCloseFD = sp[0];
    close(sp[1]);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_preClose0(JNIEnv *env, jclass clazz, jobject fdo)
{
    jint fd = fdval(env, fdo);
    if (preCloseFD >= 0) {
    if (dup2(preCloseFD, fd) < 0)
        JNU_ThrowIOExceptionWithLastError(env, "dup2 failed");
    }
}

从上面两个函数实现,我们可以看到,在init函数中,创建了一个半关闭的socket pair,preCloseFD即为未关闭的一端,init在静态初始化时就会被执行;再来看关键的preClose0,它的确是采用dup2来复制preCloseFD,这样一来,fd就被替换成了preCloseFD,这正是socket pair中未被关闭的一端。
既然nd.preClose(fd)只是预关闭,则真正执行关闭的逻辑肯定在这个kill中了,从代码逻辑上还是比较好懂的,if (!isRegistered())即表示该通道没有被注册,表示所有Selector都没有意愿关心这个通道了,则自然可以放心的关闭fd,通道与fd的联系请看另一篇。
果断猜测kill中有nd.close(fd)这样的代码,不信请看:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void kill() throws IOException {
synchronized (stateLock) {
    if (state == ST_KILLED)
    return;
    if (state == ST_UNINITIALIZED) {
            state = ST_KILLED;
    return;
        }
    assert !isOpen() && !isRegistered();
    nd.close(fd);
    state = ST_KILLED;
}
}

果然如此,这样一来,关闭二步曲就能够较安全的释放我们的fd资源了,至于nd.close(fd)的本地实现,这里就不讲了,肯定是采用了close(fd)的系统调用。
总的来说,通道的close就是为了断开它与内核fd的那点联系。

3、 begin() & end()

begin()和end()总是配对使用的,Channel和Selector均有自己的实现,所完成的功能也是有所区别的:
Selector的begin()和end()是这样使用的:

1
2
3
4
5
6
try {
    begin();
    pollWrapper.poll(timeout);
} finally {
    end();
}

我们先试想这样一个场景,poll阻塞过程中,Selector所在线程被中断了,会发生什么事?具体发生什么事这里就不深究了,至少,我们要通知一下辛苦poll的内核吧,不管是发信号也好,还是其他方式。
Selector不是有个天然的wakeup吗?似乎还挺优雅,为何不直接使用呢?是的,它们的确使用了,请看AbstractSelector中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final void begin() {
if (interruptor == null) {
    interruptor = new Interruptible() {
        public void interrupt() {
        AbstractSelector.this.wakeup();
        }};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
if (Thread.currentThread().isInterrupted())
    interruptor.interrupt();
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}

我们看到,begin中出现了wakeup(),不过要理解begin和end,似乎我们先得弄明白AbstractInterruptibleChannel.blockedOn究竟在干什么:
AbstractInterruptibleChannel是这样写的:

1
2
3
4
static void blockedOn(Interruptible intr) {           
sun.misc.SharedSecrets.getJavaLangAccess()
.blockedOn(Thread.currentThread(),intr);
    }

其中JavaLangAccess接口在java.lang.System中被实例化,它是这样写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void setJavaLangAccess() {
    // Allow privileged classes outside of java.lang
    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
        public sun.reflect.ConstantPool getConstantPool(Class klass) {
            return klass.getConstantPool();
        }
        public void setAnnotationType(Class klass, AnnotationType type) {
            klass.setAnnotationType(type);
        }
        public AnnotationType getAnnotationType(Class klass) {
            return klass.getAnnotationType();
        }
        public <E extends Enum<E>>
        E[] getEnumConstantsShared(Class<E> klass) {
            return klass.getEnumConstantsShared();
        }
        public void blockedOn(Thread t, Interruptible b) {
            t.blockedOn(b);
        }
        public void registerShutdownHook(int slot, Runnable r) {
            Shutdown.add(slot, r);
        }
    });
}

现在我们发现,JavaLangAccess的blockedOn实现,居然只有这么一句t.blockedOn(b),那么AbstractInterruptibleChannel.blockedOn实现就可以翻译成这样:
Thread.currentThread().blockedOn(intr),只因为该方法是包级私有的,并且Interruptible也是对我们不可见的,我们无法直接调用。
最后只用看java.lang.Thread中blockedOn的实现了:

1
2
3
4
5
6
private volatile Interruptible blocker;
void blockedOn(Interruptible b) {
    synchronized (blockerLock) {
        blocker = b;
    }
}

原来Thread类中包含Interruptible的私有成员blocker,blockedOn只是给它赋值而已。
到这里,要理解blockedOn究竟要做什么,就剩下理解这个blocker究竟有什么用,其实找到我们最常用的方法interrupt():

1
2
3
4
5
6
7
8
9
10
11
12
13
public void interrupt() {
if (this != Thread.currentThread())
    checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();       // Just to set the interrupt flag
            b.interrupt();
            return;
        }
   }
   interrupt0();
}

看到了吧,b.interrupt(),java线程执行interrupt时,如果blocker有被赋值,则会执行它的interrupt。
最终回归到begin()和end(),豁然开朗:
begin()中的Interruptible实现的interrupt中就调用了wakeup(),这样一来,当内核poll阻塞中,java线程执行interrupt(),就会触发wakeup(),从而使得内核优雅的终止阻塞;
至于end(),就更好理解了,poll()结束后,就没有必要再wakeup了,所以就blockOn(null)了。
blockOn我们可以理解为,如果线程被中断,就附带执行我的这个interrupt方法吧。

以上讲解了Selector对begin()、end()的运用,下面就来看Channel是如何运用它们,实现在AbstractInterruptibleChannel(blockOn的提供者)中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                public void interrupt() {
                  synchronized (closeLock) {
                    if (!open)
                    return;
                    interrupted = true;
                    open = false;
                    try {
                       AbstractInterruptibleChannel.this.implCloseChannel();
                    } catch (IOException x) { }
                 }
            }};
        }
        blockedOn(interruptor);
        if (Thread.currentThread().isInterrupted())
            interruptor.interrupt();
}
 
protected final void end(boolean completed)
    throws AsynchronousCloseException
    {
        blockedOn(null);
        if (completed) {
            interrupted = false;
            return;
        }
        if (interrupted) throw new ClosedByInterruptException();
        if (!open) throw new AsynchronousCloseException();
    }

理解了Selector的begin()、end()实现,再来看这个,基本没什么难度,其实也可以猜想到,Selector既然在begin()和end()作用域内挂上wakeup,则Channel肯定会在begin()和end()作用域内挂上close之类的。
的确如此,它在begin()中挂上的是implCloseChannel()来实现关闭Channel。
Channel的使用地方非常多,在涉及到与内核交互(体现在那些native方法上)时,都会在头尾加上这个begin()、end()。
另外,似乎Channel的end有所不同,它还包含一个参数completed,用于表示begin()和end()之间的操作是否完成,意图也很明显,begin()的interrupt()中已经设置,如果线程中断时,interrupted会被更改为true,这样在end()被执行的时候,如果未完成,则会跑出ClosedByInterruptException异常,当然,如果操作确实没有被打断,则会将其平反。
见ServerSocketChannel#accept实现的代码端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
begin();
if (!isOpen())
    return null;
thread = NativeThread.current();
for (;;) {
    n = accept0(this.fd, newfd, isaa);
    if ((n == IOStatus.INTERRUPTED) && isOpen())
    continue;
    break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}

n为accept0 native方法的返回值,当且仅当n>0时属于正常返回,所以才有了这个end(n > 0),从上述代码我们可以看到,当前这个begin()和end()就是防止在accept0时被中断所做的措施。

Channel & FileDescriptor

0
1年前
fp1203发布

之前在分析Java NIO的实现时,一直有一个点没有分析清楚,那就是Channel与FD(文件描述符)之间的关系,这里讨论的通道当然是指可选择的Channel。
Selector是服务于Channel的,Selector在linux下的底层实现其实就是系统调用poll或epoll,而poll及epoll是服务于FD的,这两个部分都有分析过,现在就剩Channel与FD是如何对应起来的。
猜想,应该是在通道创建的时候会生成这么个FD与之对应吧,现在我们查看一下ServerSocketChannel及SocketChannel的构造实现,发现下面两行代码:
this.fd = Net.serverSocket(true);
this.fd = Net.socket(true);
Net.java中相关代码如下:

1
2
3
4
5
6
static FileDescriptor socket(boolean stream) {
    return IOUtil.newFD(socket0(stream, false));
}
static FileDescriptor serverSocket(boolean stream) {
    return IOUtil.newFD(socket0(stream, true));
}

IOUtil.java中相关代码如下:

1
2
3
4
5
static FileDescriptor newFD(int i) {
    FileDescriptor fd = new FileDescriptor();
    setfdVal(fd, i);
    return fd;
}

再看IOUtil.c中的本地实现:

1
2
3
4
5
6
7
8
9
10
11
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_initIDs(JNIEnv *env, jclass clazz)
{
    clazz = (*env)->FindClass(env, "java/io/FileDescriptor");
    fd_fdID = (*env)->GetFieldID(env, clazz, "fd", "I");
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val)
{
    (*env)->SetIntField(env, fdo, fd_fdID, val);
}

initIDs在IOUtil.java静态初始化时被触发,fd_fdID因此指向了FileDescriptor中的fd,于是在调用setfdVal时,会设置fd_fdID的指向域的值为val,setfdVal最终等效于设置FileDescriptor对象的fd值。
理解了setfdVal,再来看newFD的实现,就很清楚了,newFD(int i)指创建一个fd值为i的FileDescriptor,因此,fd就和FileDescriptor等效了,这也是为什么之前我一直没有区分它们两个的原因。
看到这里,我们仍然不算找到了答案,因为fd的来源还需要跟进,翻阅前面的代码,很快定位到这个socket0,是一个native方法,实现在Net.c中,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean stream,
                            jboolean reuse)
{
    int fd;
 
#ifdef AF_INET6
    if (ipv6_available())
    fd = socket(AF_INET6, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);
    else
#endif /* AF_INET6 */
    fd = socket(AF_INET, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);
 
    if (fd < 0) {
    return handleSocketError(env, errno);
    }
    if (reuse) {
    int arg = 1;
        if (NET_SetSockOpt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
                           sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "sun.nio.ch.Net.setIntOption");
        }
    }
    return fd;
}

有两个地方需要我们关注:
1) fd = socket(AF_INET, (stream ? SOCK_STREAM : SOCK_DGRAM), 0):
这就是fd产生的原因,如果是ipv6则采用AF_INET6协议簇,默认即为ipv4的AF_INET,当stream为true是,则表示socket类型为SOCK_STREAM(TCP),否则为SOCK_DGRAM(UDP),指定了详细的type,则protocol为0即可;现在再来看Net.serverSocket(true)、Net.socket(true)则表明均选择的是TCP,至于serverSocket与socket的区别就在于serverSocket的reuse为true。
2) NET_SetSockOpt,当且仅当reuse为true时才会经历这个:
也就是定义Net.serverSocket(true)时,会执行该函数,该函数底层就是采用setsockopt来实现,setsockopt有很多种场景,具体这里就不一一来讲了,就只说明一下我们所关注的场景,SO_REUSEADDR:
closesocket(一般不会立即关闭而经历TIME_WAIT的过程)后想继续重用该socket:

1
2
BOOL bReuseaddr=TRUE;
setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char*)&bReuseaddr,sizeof(BOOL));

这就是地址重用的应用场景,其实可以理解,server端的端口都是固定的,client端的端口一般是随机的,故client端很难实现地址重用,因此只针对ServerSocket提供地址重用,本地实现中的arg = 1,其实就相当于bReuseaddr=TRUE。

结合这两点,以及上文描述的内容,其实就很清楚了,SocketChannel、ServerSocketChannel这两个可选择Channel在创建的时候,会创建一个socket,这样Channel就可socket对应起来,也因为如此,Channel也肩负着socket的使命,甚至在用户看来,SocketChannel等同于Socket,而ServerSocketChannel等同于ServerSocket。至于ServerSocketChannel与SocketChannel的区别,除了沿袭Socket与ServerSocket的职责差别外,从底层创建socket上比较,ServerSocketChannel对应的socket多了开启了地址重用的功能而已。

Java NIO 选择器(Selector) 知识预备 (linux epoll)

4
1年前
fp1203发布

最近花些功夫在研究Java NIO的JDK源码,发现Selector的实现,除了在唤醒机制上做了手脚,主要依赖操作系统的实现,为了无负担的弄懂Selector,有必要研究一下操作系统是如何实现选择的。本文主要参考linux-2.6.10内核epoll的实现(poll见上一篇)。

本文可能会表现得很肤浅,高手们请直接略过,另外,本文所出现的“政府”字样,乃比喻性质的,或者就认为它是“清政府”好了,请相关人员不要曲解。

上回冒充大侠poll府上走了一遭,感觉还不过瘾,于是计划再到它表哥epoll家去闯闯,可是man了一下之后,我有点退却了,丫的,还以为它表哥是一个人,原来是仨儿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
 
typedef union epoll_data {
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;
 
struct epoll_event {
    __uint32_t   events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

首先看看epoll_event是啥玩意儿,应该和pollfd类似吧?
还记得pollfd的定义吗?

1
2
3
4
5
struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

对比一下,发现区别不大,epoll_data_t是一个共用体,至少我们可以认为它可以是一个fd,所以较大的不同点就在于epoll_event没有revents了,上次探索poll的时候不是发现,这是poll一个很关键的地方吗?最终事件是否发生就看它的值了。决心带着这个疑问去探一探。
这次,我不打算带大家一步一步的串门,因为要理解epoll,最关键的就是它的结构设计,所以这里先从epoll的结构出发,请看下面一副简化的结构及联系图:

先介绍一下图中涉及到的各种结构体:

  • 先说明一下epitem结构体是什么,顾名思义,即为epollitem,epoll的基本单元,下面分别介绍一下几个主要的变量的含义:
  • struct list_head rdllink,或者取名为ready_list_link你会更容易理解,当epitem对应的fd的存在已经ready的I/O事件,则ep_poll_callback回调函数会将该结点链接到eventpoll中的rdllist循环链表中去,这样就将ready的epitem都串连起来了
  • struct epoll_filefd ffd,ffd中只包含一个fd及fd对应的file的指针
  • struct eventpoll *ep,eventpoll的指针,每个epitem都有这样一个指针,它指向对应的eventpoll变量,其实它的作用很简单,我们只要拿到了epitem,就可以根据它拿到eventpoll
  • struct event_poll event,还记得epoll_ctl的参数类型吗?其中就有一个event_poll指针,而该event即用来存放总用户空间拷贝的event_poll
  • 然后说明一下eventpoll结构体中主要的变量的含义:
    • struct list_head rdllist,ready_link_list,表示这是一个链表,事实上它就是一个循环链表,链表中的每个结点即为epitem中的rdllink,rdllist中链接的所有rdllink对应的epitem有事件ready
    • struct rb_root rbr,红黑树的根结点,其实每一个epitem中的第一个变量即为struct rb_node rbn;即表示红黑树的一个结点,所以rbr即是这样一颗红黑树,它的结点都为epitem变量,即相当于一个Set,将所有epitem管理起来,通过它可以很方便的增删改查epitem
  • 再看eppoll_entry结构体,它主要有这样几个变量:
    • void * base,base指向其对应的epitem
    • wait_queue_t wait,等待队列的项,wait中有一个唤醒回调函数指针,且该指针被初始化为ep_poll_callback,wait会被挂在到设备的等待队列中,等待设备的唤醒,当设备因状态改变,唤醒wait时,会执行ep_poll_callback,而ep_poll_callback会做这样一件事:list_add_tail(&epi->rdllink,&ep->rdllist),其中epi即为epitem变量,通过wait偏移拿到eppoll_entry,然后可以拿到base指针,即拿到了对应的epitem,而ep即为eventpoll变量,通过epitem的ep指针即可拿到,list_add_tail将epi的rdllink链到ep的rdllist中

下面结合这幅图大致讲解一下epoll_create、epoll_ctl、epoll_wait都在做些什么:

  • 首先,epoll_create会创建一个epoll的文件(epfd),同时创建并初始化一个struct eventpoll,其中file的private_data指针即指向了eventpoll变量,因此,知道epfd就可以拿到file,即拿到了eventpoll变量,这就是epoll_create所做的工作
  • epoll_ctl又做了什么事呢?首先大家看到了eventpoll中的rb_root红黑树吗?epoll_ctl其实就是在操作这颗红黑树,epoll_ctl有三种操作:
  • EPOLL_CTL_ADD:往红黑树中创建并添加一个epitem,对应处理函数为ep_insert
    在添加epitem时,也就是在ep_insert过程中,会创建一个eppoll_entry,并将wait_queue挂载到设备的等待队列上,其中该wait_queue的唤醒回调函数为ep_poll_callback,当设备有事件ready而唤醒wait_queue时,就会执行ep_poll_callback将当前epitem链接到eventpoll中的rdllist中去,另外,如果在挂载wait_queue时就发现设备有事件ready了,同样会将epitem链接到rdllist中去
  • EPOLL_CTL_MOD:修改对应的epitem,对应处理函数为ep_modify
    在ep_modify过程中,处理会修改epitem对应的event值,同样会先查看一下对应设备的当前状态,如果有ready事件发生,则会将当前epitem链接到rdllist中去
  • EPOLL_CTL_DEL:从红黑树中删除对应的epitem,对应处理函数为ep_remove
    释放钩子、链接、资源空间等,如epitem所占的空间
  • 其实epoll_ctl已经将绝大部分事情都做了,epoll_wait有只需要收集结果就行了,它的目标也很单一,就看rdllist中是否有元素即可,当然,它还需要控制timeout,及结果转移,因为对于rdllist链接的epitem,只能说明其对应的fd有事件ready,但是哪些事件是不知道的,因此epoll_ctl再收集结果时,会亲自查看一下对应file的ready状态,来写回events

在给大家大致讲解了epoll涉及到的结构及epoll三兄弟大概在做些什么之后,开始我们的探索之旅吧:

epoll_create

先看sys_epoll_create系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
asmlinkage long sys_epoll_create(int size)
{
    int error, fd;
    struct inode *inode;
    struct file *file;
 
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n",
             current, size));
 
    /* Sanity check on the size parameter */
    error = -EINVAL;
    if (size         goto eexit_1;
 
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure, and inode and a free file descriptor.
     */
    error = ep_getfd(&fd, &inode, &file);
    if (error)
        goto eexit_1;
 
    /* Setup the file internal data structure ( "struct eventpoll" ) */
    error = ep_file_init(file);
    if (error)
        goto eexit_2;
 
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",
             current, size, fd));
 
    return fd;
 
eexit_2:
    sys_close(fd);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",
             current, size, error));
    return error;
}

我们只需要注意到两个函数:ep_getfd和ep_file_init

  • ep_getfd其实就是在创建文件,我们这里不讲文件是如何创建的,大家只需要知道,调用了这个函数之后,除非出错,否则epoll文件就会被创建出来
  • ep_file_init我们到可以讲一下:
    顾名思义,它就是初始化刚才创建的文件的,下面看看它究竟初始化了哪些内容:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    static int ep_file_init(struct file *file)
    {
        struct eventpoll *ep;
     
        if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))
            return -ENOMEM;
     
        memset(ep, 0, sizeof(*ep));
        rwlock_init(&ep->lock);
        init_rwsem(&ep->sem);
        init_waitqueue_head(&ep->wq);
        init_waitqueue_head(&ep->poll_wait);
        INIT_LIST_HEAD(&ep->rdllist);
        ep->rbr = RB_ROOT;
     
        file->private_data = ep;
     
        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n",
                 current, ep));
        return 0;
    }

    从这几行代码可以看出,ep_file_init就做了两件事:

    • 创建并初始化一个eventpoll结构体变量
    • 指定file的private_data指针指向刚创建的eventpoll变量,这样,只要根据epoll文件描述符epfd就可以拿到file进而就拿到了eventpoll变量了,该eventpoll就是epoll_ctl和epoll_wait工作的场所

    对外看来,epoll_create就做了一件事,那就是创建一个epoll文件,事实上,更关键的是,它创建了一个eventpoll结构体变量,该变量为epoll_ctl和epoll_wait的工作打下了基础。

epoll_ctl

展示一下epoll_ctl系统调用先:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p)\n",
             current, epfd, op, fd, event));
    error = -EFAULT;
    if (EP_OP_HASH_EVENT(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto eexit_1;
    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    file = fget(epfd);
    if (!file)
        goto eexit_1;
    /* Get the "struct file *" for the target file */
    tfile = fget(fd);
    if (!tfile)
        goto eexit_2;
    /* The target file descriptor must support poll */
    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll)
        goto eexit_3;
    /*
     * We have to check that the file structure underneath the file descriptor
     * the user passed to us _is_ an eventpoll file. And also we do not permit
     * adding an epoll file descriptor inside itself.
     */
    error = -EINVAL;
    if (file == tfile || !IS_FILE_EPOLL(file))
        goto eexit_3;
    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    ep = file->private_data;
    down_write(&ep->sem);
    /* Try to lookup the file inside our hash table */
    epi = ep_find(ep, tfile, fd);
    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    /*
     * The function ep_find() increments the usage count of the structure
     * so, if this is not NULL, we need to release it.
     */
    if (epi)
        ep_release_epitem(epi);
    up_write(&ep->sem);
eexit_3:
    fput(tfile);
eexit_2:
    fput(file);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
             current, epfd, op, fd, event, error));
    return error;
}

记得前文提到过eventpoll结构体包含一个变量struct rb_root rbr,这就是一颗红黑树的根结点,epoll_ctl的ADD、DEL、MOD操作,就是在操作这颗红黑树。先分析一下代码流程:

  • 首先copy_from_user将用户传入的event_poll拷贝到epds中,以供自己使用
  • file = fget(epfd),根据epoll文件的描述符拿到对应的文件file
  • tfile = fget(fd),同理,根据fd拿到目标文件tfile
  • ep = file->private_data,即拿到了epoll_create创建的eventpoll结构体变量,准备开始工作
  • epi = ep_find(ep, tfile, fd),这里不详细讲解ep_find源码,只需要说明一下即可,ep_find即从ep中的红黑树中根据tfile和fd来查找epitem,还记得epitem结构体吗,这是epoll的基本单元,每个被epoll_ctl添加过的fd都会保存在一个epitem变量中,每个epitem变量都是红黑树的结点,如果不理解红黑树也不要紧,就简单把它看做一个Map,其Key为tfile+fd,Value即为epitem的指针,因此能够根据ep_find查找到tfile+fd对应的epitem,当然,如果找到的epi==NULL,自然表明不存在了
  • 接着根据op的三种类型分别操作:
    • EPOLL_CTL_ADD
      首先epds.events |= POLLERR | POLLHUP确保“出错、连接挂起”被当做感兴趣事件,因为底层有义务将出错信息返回给应用;然后调用ep_insert生成一个epitem并插入到ep对应的红黑树中;这里详细看一下ep_insert的实现:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      struct ep_pqueue {
          // poll_table结构体在讲解poll实现那篇有说明,内部只包含一个回调函数指针
          poll_table pt;
          // epitem指针
          struct epitem *epi;
      };
      static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
                   struct file *tfile, int fd)
      {
          int error, revents, pwake = 0;
          unsigned long flags;
          // 待创建的epitem变量的指针
          struct epitem *epi;
          // ep_pqueue结构体变量
          struct ep_pqueue epq;
       
          error = -ENOMEM;
          // 为epi分配空间
          if (!(epi = EPI_MEM_ALLOC()))
              goto eexit_1;
       
          /* Item initialization follow here ... */
          EP_RB_INITNODE(&epi->rbn); // 初始化红黑树结点
          // 初始化各种链表
          INIT_LIST_HEAD(&epi->rdllink);
          INIT_LIST_HEAD(&epi->fllink);
          INIT_LIST_HEAD(&epi->txlink);
          INIT_LIST_HEAD(&epi->pwqlist);
          // 初始化epi的ep指针指向ep,这样一来,只要拿到epi就可以拿到ep了
          epi->ep = ep;
          // 初始化epi的ffd变量,该变量包含一个文件指针+文件描述符
          EP_SET_FFD(&epi->ffd, tfile, fd);
          // 初始化event
          epi->event = *event;
          atomic_set(&epi->usecnt, 1);
          epi->nwait = 0;
       
          /* Initialize the poll table using the queue callback */
          // 初始化ep_pqueue变量的epi指针为刚创建的epq
          epq.epi = epi;
          // 初始化epq中回调函数指针指向ep_ptable_queue_proc
          // ep_ptable_queue_proc见下文
          init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
       
          /*
           * Attach the item to the poll hooks and get current event bits.
           * We can safely use the file* here because its usage count has
           * been increased by the caller of this function.
           */
          // 这里再说明一下为什么会有ep_pqueue这个结构体,它很像一个
          // 中转变量,我可以看到tfile->f_op->poll(tfile, &epq.pt)
          // 好像只用到了回调函数指针而已,那有必要用epq吗?这其实就是
          // 内核的一种惯用手法,在回调函数中,通过poll_table指针偏移
          // 即可拿到ep_pqueue,进而拿到对应的epitem指针
          // tfile对应poll方法中,epq.pt方法会被调用,即ep_ptable_queue_proc
          // 会被执行,从而将等待队列项(见ep_ptable_queue_proc)挂载到
          // 设备的等待队列上,当设备唤醒等待队列项时,ep_poll_callback将会被执行
          revents = tfile->f_op->poll(tfile, &epq.pt);
       
          /*
           * We have to check if something went wrong during the poll wait queue
           * install process. Namely an allocation for a wait queue failed due
           * high memory pressure.
           */
          if (epi->nwait < 0)        
              goto eexit_2;    
       
          spin_lock(&tfile->f_ep_lock);
          // 将epi的fllink链接到tfile的f_ep_links上
          list_add_tail(&epi->fllink, &tfile->f_ep_links);
          spin_unlock(&tfile->f_ep_lock);
       
          /* We have to drop the new item inside our item list to keep track of it */
          write_lock_irqsave(&ep->lock, flags);
       
          /* Add the current item to the rb-tree */
          // 将创建并初始化好的epitem插入到eventpoll的红黑树中
          ep_rbtree_insert(ep, epi);
       
          /* If the file is already "ready" we drop it inside the ready list */
          // 因为刚才的file->f_op->poll执行之后,有可能对应file已经是ready状态了
          // 如果发现的确是感兴趣的事件发生,并且当前epitem没有链接(即没有被收集到ep的
          // rdllist中,简单说,不需要重复收集),则就将其链接到ep的rdllist上
          if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {
              // 将epi的rdllink结点链接到ep的rdllist头结点上
              list_add_tail(&epi->rdllink, &ep->rdllist);
       
              /* Notify waiting tasks that events are available */
              // 将ep从等待队列中唤醒,或者这样理解,这里已经找到满意结果了,不用在等待了
              if (waitqueue_active(&ep->wq))
                  wake_up(&ep->wq);
              if (waitqueue_active(&ep->poll_wait))
                  pwake++;
          }
       
          write_unlock_irqrestore(&ep->lock, flags);
       
          /* We have to call this outside the lock */
          if (pwake)
              ep_poll_safewake(&psw, &ep->poll_wait);
       
          DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)\n",
                   current, ep, tfile, fd));
       
          return 0;
       
      eexit_2:
          ep_unregister_pollwait(ep, epi);
       
          /*
           * We need to do this because an event could have been arrived on some
           * allocated wait queue.
           */
          write_lock_irqsave(&ep->lock, flags);
          if (EP_IS_LINKED(&epi->rdllink))
              EP_LIST_DEL(&epi->rdllink);
          write_unlock_irqrestore(&ep->lock, flags);
       
          EPI_MEM_FREE(epi);
      eexit_1:
          return error;
      }
       
      // 回调函数
      static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                       poll_table *pt)
      {
          // 通过pt拿到epitem指针
          struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt);
          // 待创建的eppoll_entry结构体变量指针
          struct eppoll_entry *pwq;
       
          if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) {
              // 这个为epoll的一个关键的地方,给pwq中的等待队列项初始化唤醒
              // 回调函数,这里初始化为ep_poll_callback
              init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
              // 等待队列的头指针,例如,当执行file->f_op->poll,则whead即
              // 为file对应设备的等待队列头指针
              pwq->whead = whead;
              // pwq的base指针指向epi,这样只要拿到eppoll_entry就能拿到epitem了
              pwq->base = epi;
              // 挂载pwq中等待队列项,当设备唤醒该项时,wait中回调函数会被调用
              add_wait_queue(whead, &pwq->wait);
              // 将pwq的llink链接到epi的pwqlist上
              list_add_tail(&pwq->llink, &epi->pwqlist);
              epi->nwait++;
          } else {
              /* We have to signal that an error occurred */
              epi->nwait = -1;
          }
      }
      static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
      {
          int pwake = 0;
          unsigned long flags;
          // 通过wait拿到eppoll_entry中的base,即拿到了epitem
          struct epitem *epi = EP_ITEM_FROM_WAIT(wait);
          // 通过epi的ep指针即拿到了eventpoll
          struct eventpoll *ep = epi->ep;
       
          DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n",
                   current, epi->file, epi, ep));
       
          write_lock_irqsave(&ep->lock, flags);
       
          /*
           * If the event mask does not contain any poll(2) event, we consider the
           * descriptor to be disabled. This condition is likely the effect of the
           * EPOLLONESHOT bit that disables the descriptor when an event is received,
           * until the next EPOLL_CTL_MOD will be issued.
           */
          if (!(epi->event.events & ~EP_PRIVATE_BITS))
              goto is_disabled;
       
          /* If this file is already in the ready list we exit soon */
          if (EP_IS_LINKED(&epi->rdllink))
              goto is_linked;
       
          // 因为被设备唤醒,则说明当前epi对应的fd有事件ready
          // 则将其链接到ep的rdllist上
          list_add_tail(&epi->rdllink, &ep->rdllist);
       
      is_linked:
          /*
           * Wake up ( if active ) both the eventpoll wait list and the ->poll()
           * wait list.
           */
          // 已经找到结果了,不需要等待了,就notify一下吧,告诉大家不用再等了
          if (waitqueue_active(&ep->wq))
              wake_up(&ep->wq);
          if (waitqueue_active(&ep->poll_wait))
              pwake++;
       
      is_disabled:
          write_unlock_irqrestore(&ep->lock, flags);
       
          /* We have to call this outside the lock */
          if (pwake)
              ep_poll_safewake(&psw, &ep->poll_wait);
       
          return 1;
      }

      ep_insert相关函数是整个epoll的核心,结合上面的图再来看,ep_pqueue结构体及ep_ptable_queue_proc方法其实就是起着桥梁的作用,通过它们,有着ep_poll_callback等待队列项被挂在到设备的等待队列上,当设备唤醒该等待队列项时,自然就将当前epitem链接到eventpoll的rdllist链表上。

    • EPOLL_CTL_DEL
      调用的函数为epoll_remove,这里不打算详细讲解,其实很容易理解,就是将epitem从eventpoll的红黑树中移除,起到取消注册的作用。
    • EPOLL_CTL_MOD
      调用的函数为epoll_modify,这里先看看它的实现:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)
      {
          int pwake = 0;
          unsigned int revents;
          unsigned long flags;
       
          /*
           * Set the new event interest mask before calling f_op->poll(), otherwise
           * a potential race might occur. In fact if we do this operation inside
           * the lock, an event might happen between the f_op->poll() call and the
           * new event set registering.
           */
          // 这个就是modify需要修改的地方,即修改对应的events
          epi->event.events = event->events;
       
          /*
           * Get current event bits. We can safely use the file* here because
           * its usage count has been increased by the caller of this function.
           */
          // 这个地方不要感到奇怪,说明几点后大家应该就容易理解了:
          // 1、既然是modify则说明之前已经被add过,不需要重复挂等待队列,因此回调函数为NULL
          // 2、同时因为NULL参数,即说明不需要回调,也不会有挂等待队列的操作
          // 该调用其实就是去file那里收集一下事件而已
          revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
       
          write_lock_irqsave(&ep->lock, flags);
       
          /* Copy the data member from inside the lock */
          // 这个就是modify需要修改的地方,即修改对应的data
          epi->event.data = event->data;
       
          /*
           * If the item is not linked to the hash it means that it''s on its
           * way toward the removal. Do nothing in this case.
           */
          // 这个if不准备详细讲,其实很简单,前面不是已经问过file得到revents吗?
          // 如果当前epi已经被链接的话,就看是否是感兴趣事件发生,如果是,则同样将其
          // 添加到eventpoll的rdllist链表中,并notify
          if (EP_RB_LINKED(&epi->rbn)) {
              /*
               * If the item is "hot" and it is not registered inside the ready
               * list, push it inside. If the item is not "hot" and it is currently
               * registered inside the ready list, unlink it.
               */
              if (revents & event->events) {
                  if (!EP_IS_LINKED(&epi->rdllink)) {
                      list_add_tail(&epi->rdllink, &ep->rdllist);
       
                      /* Notify waiting tasks that events are available */
                      if (waitqueue_active(&ep->wq))
                          wake_up(&ep->wq);
                      if (waitqueue_active(&ep->poll_wait))
                          pwake++;
                  }
              }
          }
       
          write_unlock_irqrestore(&ep->lock, flags);
       
          /* We have to call this outside the lock */
          if (pwake)
              ep_poll_safewake(&psw, &ep->poll_wait);
       
          return 0;
      }

      可以看到,修改操作其实就修改红黑树中对应的epitem的event值,有个细节点需要注意,也就是内核不放弃任何一次机会,修改过程中也不忘问一下file的事件状态,如果有事件ready则同样将其链接到rdllist链表中。

epoll_wait

在讲解了epoll_ctl的过程之后,epoll_wait的确没什么内容了,也不想贴一大堆源码什么的,这里分几个点将其描述一下:

  • 前文已经多次出现一个链表rdllist,该链表位于eventpoll结构体变量中,当ep_poll_callback回调函数被调用时,肯定会将当前epitem链接进来,或者在ep_insert、ep_modify过程中,如果发现file有事件ready也会将当前epitem链接到rdllist上,因此,我们可以猜测得到epoll_wait在做什么,看下面关键部分代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // 如果rdllist中还没有epitem时,就开始等待了
    if (list_empty(&ep->rdllist)) {
            /*
             * We don''t have any available event to return to the caller.
             * We need to sleep here, and we will be wake up by
             * ep_poll_callback() when events will become available.
             */
            // 初始化等待队列,等待队列项对应的线程即为当前线程
            init_waitqueue_entry(&wait, current);
            // 不用多说,先将当前线程挂到等待队列上,之后在调用schedule_timeout
            // 时,就开始了超时等待了
            add_wait_queue(&ep->wq, &wait);
     
            for (;;) {
                /*
                 * We don''t want to sleep if the ep_poll_callback() sends us
                 * a wakeup in between. That''s why we set the task state
                 * to TASK_INTERRUPTIBLE before doing the checks.
                 */
                // 这块内容比较熟悉,在poll讲解过程中也有说明,它与schedule_timeout配合
                // 因为会被阻塞,这里先设置线程状态为可中断
                set_current_state(TASK_INTERRUPTIBLE);
                // 整个循环的核心,其实就在看rdllist中是否有数据,或者等待超时
                // 应征了前面的说明,epoll_wait只需要等着收集数据即可
                if (!list_empty(&ep->rdllist) || !jtimeout)
                    break;
                // 如果被中断。。。后面部分比较简单,可以参照poll那篇
                if (signal_pending(current)) {
                    res = -EINTR;
                    break;
                }
     
                write_unlock_irqrestore(&ep->lock, flags);
                jtimeout = schedule_timeout(jtimeout);
                write_lock_irqsave(&ep->lock, flags);
            }
            remove_wait_queue(&ep->wq, &wait);
     
            set_current_state(TASK_RUNNING);
        }
  • 其实还有一点需要说明,大家可能也会想到,rdllist中的epitem只能表示对应fd有事件ready,可是自始至终都没看到有地方回写revents,我们怎么知道到底是哪些事件ready了呢?
    在ep_send_events函数中有这么一段代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    list_for_each(lnk, txlist) {
        epi = list_entry(lnk, struct epitem, txlink);
     
        /*
         * Get the ready file event set. We can safely use the file
         * because we are holding the "sem" in read and this will
         * guarantee that both the file and the item will not vanish.
         */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
     
        /*
         * Set the return event set for the current file descriptor.
         * Note that only the task task was successfully able to link
         * the item to its "txlist" will write this field.
         */
        epi->revents = revents & epi->event.events;

    看到这段代码,应该很清楚了,只需要遍历链表,再去拿一次就好了,见关键代码:
    revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
    这一句在ep_modify中也有出现。

  • 还有一点我这里故意隐瞒,其实不是我特别想说明的点,对理解epoll影响也不大,那就是收集结果不是直接从rdllist中进行的,这中间还有一个转移的过程,在epoll_wait的最后进行,关键代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist, int maxevents)
    {
        int nepi;
        unsigned long flags;
        // rdllist里存放的就是当前ready的epitem链表,且至少存在一个epitem
        struct list_head *lsthead = &ep->rdllist, *lnk;
        struct epitem *epi;
     
        write_lock_irqsave(&ep->lock, flags);
     
        // 遍历rdllist链表
        for (nepi = 0, lnk = lsthead->next; lnk != lsthead && nepi < maxevents;) {       
            // 先拿到epitem
            epi =   list_entry(lnk, struct epitem, rdllink);      
            lnk = lnk->next;
     
            /* If this file is already in the ready list we exit soon */
            // 确保不会被重复链接到txlink上
            if (!EP_IS_LINKED(&epi->txlink)) {
                /*
                 * This is initialized in this way so that the default
                 * behaviour of the reinjecting code will be to push back
                 * the item inside the ready list.
                 */
                epi->revents = epi->event.events;
     
                /* Link the ready item into the transfer list */
                // 将epi的txlink链接到ep的txlist上,简单的说
                // 将对应的epitem链接到txlist链表上
                list_add(&epi->txlink, txlist);
                nepi++;
     
                /*
                 * Unlink the item from the ready list.
                 */
                // 因为已经被转移了,所以从rdllist链表中清除
                EP_LIST_DEL(&epi->rdllink);
            }
        }
     
        write_unlock_irqrestore(&ep->lock, flags);
     
        return nepi;
    }

    经过这一步,rdllist中当前的结果已经被转移到txlist中,之后如果有新加入到rdllist的话,本次epoll_wait不会再关心,不过可以留到下次再收集。
    还记得写回events的过程吗?最后的工作,当前是遍历txlist链表,并将结果写回到用户空间中了。

总结

后面详细讲解epoll_create、epoll_ctl、epoll_wait只是为了让大家强化理解前面的那副图,这里讲解epoll并不涉及到内存映射等优化点,只是为了让大家理解,epoll到底在干什么,到最后,留给大家的,也只是这幅图,或者更简单的一个点:原来回调函数是epoll比poll高明的地方啊。至于为什么要创建一个文件来承载eventpoll,甚至采用红黑树来保存数据,都只是空间换时间而已。

Java NIO 选择器(Selector)与通道(Channel)_5(选择器的选择过程)

0
1年前
fp1203发布

前面已经分析了选择器的创建和处理注册和取消注册的过程,这里主要讲解选择器的选择过程。

本文同样以linux内核为基础,讲解PollSelectorImpl和EPollSelectorImpl的选择过程:

其实在前面一篇已经把poll和epoll的准备工作介绍得七七八八了,既然PollSelectorImpl
和EPollSelectorImpl最终是依赖内核的实现,似乎只需要直接调用就好,那不是没多少内容了? 也不尽然,还记得上一篇介
绍SelectionKey的时候,提到过readyOps,就允诺在这一篇中来说明它是如何被改变的,当然,附带有其他的内容需要一一道来。

选择器的选择过程

在Selector抽象类中,放出了三个抽象方法:
java.nio.channels.Selector

1
2
3
public abstract int selectNow() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int select() throws IOException;

三个方法大致描述如下:

  • selectNow()是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0
  • select(long timeout)是阻塞的,有三种条件可以停止阻塞:
    1)至少存在一条通道是ready I/O的;
    2)等待超时;
    3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量。
  • select()是阻塞的,有两种条件可以停止阻塞:
    1)至少存在一条通道是ready I/O的;
    2)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量。

select方法有这么几种,下面跟随源码的脚步,去看看它的实现吧,首先定位到SelectorImpl:
sun.nio.ch.SelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected abstract int doSelect(long timeout) throws IOException;
 
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
    if (!isOpen())
    throw new ClosedSelectorException();
    synchronized (publicKeys) {
    synchronized (publicSelectedKeys) {
        return doSelect(timeout);
    }
    }
    }
}
 
public int select(long timeout)
    throws IOException
{
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
 
public int select() throws IOException {
    return select(0);
}
 
public int selectNow() throws IOException {
return lockAndDoSelect(0);
}

可以发现:

  • selectNow() -> lockAndDoSelect(0) -> doSelect(0)
  • select(timeout), timeout > 0 -> lockAndDoSelect(timeout) -> doSelect(timeout)
  • select() == select(timeout), timeout == 0 -> lockAndDoSelect(-1) -> doSelect(-1)

三种select最终都归结到doSelect,不禁让我想起了poll及epoll_wait的timeout参数,这里再啰嗦一下:

  • poll
    • timeout == 0, 则poll操作为非阻塞的,轮询一遍,无论是否存在ready事件,均会返回;
    • timeout < 0, 则poll操作是阻塞的,无限轮询,当且仅当存在ready事件,才会返回;
    • timeout > 0, 则poll将会在timeout时间内无限轮询,当且仅当存在ready事件,才会返回;
  • epoll_wait
    • timeout == 0, 则epoll操作为非阻塞的,轮询一遍,无论是否存在ready事件,均会返回;
    • timeout == -1, 则epoll操作是阻塞的,无限轮询,当且仅当存在ready事件,才会返回;
    • timeout > 0, 则epoll将会在timeout时间内无限轮询,当且仅当存在ready事件,才会返回;

可以看到,poll与epoll_wait唯一不同点就是第二种情况,一个是 < 0,一个是 == -1,因此,只需要timeout == -1这两
种情况均能满足了,所以在SelectorImpl层面就可以把所有select归结到doSelect了,可想而知,最终要么是调用内核的poll
要么是调用epoll来完成选择工作。
PS:还记得最开始详细讲过的内核poll的过程吗?当timeout == 0时,poll只会遍历一遍,无需等待即退出
循环;当timeout 0时(当然,timeout数值也不能无限大,具体多大限制没有研究),即在timeout = schedule_timeout(timeout)
时,不断的消磨timeout,当其为0时,会在遍历结束时退出;更多细节还请大家看看讲内核poll的那篇吧。

现在,就需要拿doSelect开涮了,老规矩,PollSelectorImpl和EPollSelectorImpl分别介绍:

PollSelectorImpl的doSelect

先上源码:
sun.nio.ch.PollSelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected int doSelect(long timeout)
    throws IOException
{
    if (channelArray == null)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(totalChannels, 0, timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.getReventOps(0) != 0) {
        // Clear the wakeup pipe
        pollWrapper.putReventOps(0, 0);
        synchronized (interruptLock) {
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}
1. 贯彻一贯的风格,先从变量入手:
  • processDeregisterQueue()方法
    在讲解反注册的那篇中,已经讲过该方法,反注册有两个阶段,第一个阶段就是SelectionKey的cancel,它只会将SelectionKey
    放入cancelledKeys集合中,第二个阶段才是真正取消注册的过程,即是processDeregisterQueue的过程,它保证在进入内核poll
    之前,pollfd数组中的fd都是真正需要被轮询的。
  • pollWrapper.poll
    其实这就是“大名鼎鼎“的内核poll调用了,它的参数需求已经在注册过程中PollArrayWrapper组装好了,而且经过processDeregisterQueue
    的预处理,将所有需要取消的pollfd剔除。
  • begin() & end()
    这里先不详细讲解它们在做什么,可以先告诉你们的是,begin和end相互配合:begin为当前线程挂上中断处理函数,意思就是,如
    果当前线程被中断,就执行我指定的那个函数,至于那个神秘的函数,其实就是wakeup,用于唤醒内核阻塞中的线程;end就是扫尾
    工作,就是取消挂载的中断函数,因为wakeup只有在内核poll的过程中才会有意义,也只应该在这个过程中。
  • updateSelectedKeys()
    从函数名称大概可以猜到,这个就是用来更新已选择的SelectionKey的吧,先看看AbstractPollSelectorImpl中它的实现:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    protected int updateSelectedKeys() {
        int numKeysUpdated = 0;
        // Skip zeroth entry; it is for interrupts only
        for (int i=channelOffset; i            int rOps = pollWrapper.getReventOps(i);
            if (rOps != 0) {
                SelectionKeyImpl sk = channelArray[i];
                pollWrapper.putReventOps(i, 0);
                if (selectedKeys.contains(sk)) {
                    if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
                        numKeysUpdated++;
                    }
                } else {
                    sk.channel.translateAndSetReadyOps(rOps, sk);
                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                        selectedKeys.add(sk);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

    该函数中,就只有一个调用我们没见过,那就是sk.channel.translateAndSetReadyOps,我的爷,又是与具体通道有关,
    那就先拎一个典型来讲解,具体内容就在讲解通道时再详细展开,请看ServerSocketChannelImpl:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public boolean translateReadyOps(int ops, int initialOps,
                                     SelectionKeyImpl sk) {
        // intOps为注册SelectionKey时,设定的感兴趣事件的值
        int intOps = sk.nioInterestOps();
        // oldOps为当前未更改前的SelectionKey的readyOps值
        int oldOps = sk.nioReadyOps();
        // newOps为临时的readyOps值,用于位操作生成最终readyOps值
        // translateAndSetReadyOps时,该值初始值为0,即准备全新设置
        // translateAndUpdateReadyOps时,该值初始值为通道当前的readyOps,即准备累加更新
        int newOps = initialOps;
        // 如果通道未打开,则无需继续,直接返回
        if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
        // This should only happen if this channel is pre-closed while a
        // selection operation is in progress
        // ## Throw an error if this channel has not been pre-closed
            return false;
        }
        /* 如果读写出错或连接中断时,JDK的做法看似很奇怪,不过可以理解:
           即不隐瞒IO异常,所以才在这种情形下直接将readyOps设置为interestOps,
           意思是你对什么感兴趣我就把它设置成什么,如是应用以为真的ready而去操作时,
           即可以看到IOException
        */
        if ((ops & (PollArrayWrapper.POLLERR
                    | PollArrayWrapper.POLLHUP)) != 0) {
            newOps = intOps;
            sk.nioReadyOps(newOps);
            return (newOps & ~oldOps) != 0;
        }
        // 如果pollfd的revents包含POLLIN,并且感兴趣事件中
        // 包含OP_ACCEPT时,newOps添加OP_ACCEPT
        if (((ops & PollArrayWrapper.POLLIN) != 0) &&
            ((intOps & SelectionKey.OP_ACCEPT) != 0))
                newOps |= SelectionKey.OP_ACCEPT;
        // 设置readyOps为newOps
        sk.nioReadyOps(newOps);
        // 判断readyOps中有没有被更改过,如果有,说明新的感兴趣事件ready
        return (newOps & ~oldOps) != 0;
    }
    public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
        return translateReadyOps(ops, 0, sk);
    }

    可以看出,translateAndSetReadyOps即是用来设置readyOps的,我们再来看updateSelectedKeys的逻辑:

    • 拿到SelectionKey sk
    • 清除对应pollfd的revents值
    • 如果sk已经存在于selectedKeys中,即sk从未被处理或者未被remove,则更新readyOps并判断是否有新的感兴趣事件ready,
      如果有,则只需要增加numKeysUpdated即可
    • 如果sk不存在于selectedKeys中,则先更新readyOps,判断readyOps中是否有新的感兴趣事件ready,
      如果有则将sk收录到selectedKeys中,并增加numKeysUpdated,最终numKeysUpdated即为本次select有事件
      ready的通道的数量

    在看translateAndSetReadyOps那段代码时,一直卡壳在对POLLERR及POLLHUP的处理上,不明白为什么出错了,还把readyOps
    设置成interestOps,这样updateSelectedKeys过程中,如果某个Channel遇到了POLLERR或POLLHUP时,肯定会被放入selectedKeys
    中,为什么要这么做呢?一开始很不理解,查看了OP_READ或OP_WRITE等Javadoc注释,也发现OP_READ不是完全代表ready可读,也可
    能是连接被挂起、已经到达流末端或者出错;不过还是不知道为什么要这么做,最终查看了一篇oracle的关于nio的一个BugReport
    中发现了这样的字眼:“When the interest set is non-0 then the POLLHUP/POLLERR will need to be translated to OP_READ
    or OP_WRITE events in the ready set to allow the application a chance to see the IOException.“,于是有点理解了JDK
    为什么要这么做了,也的确如此,如果应用对某通道的一些事件感兴趣,则在通道IO出错时,有必要知会它,而如果直接抛出异常则肯定
    会影响其他通道,故干脆就”欺骗“应用说,你感兴趣的事件都发生了,于是在应用处理时,就能及时发现IOException了。

    本文只是在讲解ServerSocketChannel中translateReadyOps的实现,SocketChannel实现更为复杂,不过关于在IO出错的处理上,
    是一致的。

  • IOUtil.drain(fd0)
    drain中文译“排干“,用到fd0上非常贴切,fd0作为管道的读端,drain即将其排干,简单说就是清空,其C源码中,其实就是不停
    的read,知道无数据为止。这里不理解为什么要这么做不要紧,后续在讲解wakeup机制的时候会详细说明。
2. 现在再来看代码流程:
  • 预处理cancel掉的SelectionKey,将同时清理pollfd数组中的对应pollfd、已注册和已选择的SelectionKey的集合,
    还有Selector及Channel持有的SelectionKey引用
  • 挂载中断处理函数,后续过程如果被中断,则会调用wakeup优雅的唤醒
  • 开始内核poll,因为之前注册过程已经将所有参数准备好了,轮询返回时,如果有感兴趣事件ready,
    则pollfd的revents将被更新
  • 这里再一次清理,主要是针对在poll过程中有取消的SelectionKey,则没必要返回结果,故在统计结果之前将其清除
  • 然后调用updateSelectedKeys来更新readyOps并统计ready的通道数
  • 清理wakeup管道及还原中断触发标志,一遍下次select使用
3. 小结

PollSelectorImpl选择器的doSelect过程分三个阶段,第一个阶段是清除取消SelectionKey,第二阶段就是最重要的阶段:
调用内核poll,轮询收集ready通道信息,最后一个阶段就是结果处理,将poll返回的结果转移到Java容器中。

EPollSelectorImpl的doSelect

一样先上源码:
sun.nio.ch.EPollSelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected int doSelect(long timeout)
    throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}
1. 关键变量和调用

单从doSelect代码上看,除了做管道清理时有些许不同,似乎与PollSelectorImpl一摸一样,不过,还是有几个重要的调用需
要讲解一下:

  • processDeregisterQueue前面已经详细讲解过,就是处理取消的SelectionKey逻辑;
  • 有点不一样的就是poll过程,与PollSelectorImpl不太一样,PollArrayWrapper的poll其实就是内核的poll
    调用,这里的pollWrapper即是EPollArrayWrapper,其poll实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    • 还记得updateRegistrations吗?前面已经在讲解注册过程的时候说明过,与PollSelector很不同,它不是在注册过程就将
      pollfd全部装填好,而是在执行内核epoll_wait之前,统一由updateRegistrations处理,updateRegistractions的过程也
      比较简单, 利用epoll_ctl一一处理注册信息。
    • 因为epoll内核调用分三步,第一步是epoll_create创建epoll文件,在EPollArrayWrapper构造函数中调用;
      第二步就在updateRegistractions中调用,用来注册所有的通道fd及感兴趣事件;
      最后就只需要用epoll_wait来等待收集最后的结果了
    • 至于最后的for循环,是与wakeup有关,如果是因为wakeup而结束epollWait,则在这里会找到对应event_poll的下标,
      并更新中断标志interrupted
  • 再看看updateSelectedKeys过程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = (SelectionKeyImpl) fdToKey.get(
                new Integer(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

    先解释一下几个变量或函数:

    • pollWrapper.updated为EPollArrayWrapper调用epoll_wait返回的值,其实就是有ready更新的通道数
    • pollWrapper.getDescriptor(i),即为获取第i个epoll_event中的fd
    • 还记得fdToKey吗?它就是一个fd到SelectionKey的HashMap,PollSelectorImpl是采用channelArray数组来
      保存SelectionKey的,EPollSelectorImpl就是采用fdToKey来保存
    • pollWrapper.getEventOps(i),即为获取第i个epoll_event中的events,注意epoll_wait返回的epoll_event
      中的events并不是指感兴趣的事件集合,而是值ready的事件集合,可能你会有点混淆,在采用epoll_ctl注册时,
      events用来表示感兴趣的事件集合,而在epoll_wait返回结果时,这里的events又是用来保存ready事件集合的
    • translateAndSetReadyOps即为设置readyOps的,因为虽然epoll_wait将结果返回,但应用时看不到这个结果的,
      所以通过更新SelectionKey中的readyOps,可以使应用看到通道的状态

    现在再来看看整个过程,其实与PollSelectorImpl的updateSelectedKeys非常类似:遍历所有结果,如果是selectedKeys中
    已经存在的Key,并且有新的感兴趣事件ready,则只需要增加numKeysUpdated即可;如果selectedKeys中不存在,并且有新的
    感兴趣事件ready,则需要增加numKeysUpdated,并添加到selectedKeys中。

2. 小结

虽然PollSelectorImpl依赖内核的poll系统调用,而EPollSelectorImpl依赖的是内核的epoll_create、epoll_ctl及epoll_wait
三个系统调用,但从doSelect这个层面上去看,它们的过程是一致的,均是为系统调用铺路,比较有点特色的还是它们的wakeup
机制,EPollSelectorImpl与PollSelectorImpl采用的是同一套wakeup机制,具体内容就看下一篇的分析了。


反注册由SelectionKey的cancel开始:
Selector一直都很被动,它的创建需要工厂SelectorProvider来完成,处理register也是因为通道在自行注册时触发,而处理cancel与register非常类似,不过它是由SelectionKey的cancel来触发,唯一值得欣慰的是它能够有自己的select过程,这个过程的分析后续再给出,我们先看看AbstractSelectionKey中的cancel实现:
java.nio.channels.spi.AbstractSelectionKey

1
2
3
4
5
6
7
8
9
10
11
public final void cancel() {
    // Synchronizing "this" to prevent this key from getting canceled
    // multiple times by different threads, which might cause race
    // condition between selector''s select() and channel''s close().
    synchronized (this) {
    if (valid) {
            valid = false;
            ((AbstractSelector)selector()).cancel(this);
    }
   }
}

可见,最终取消还是交给选择器自己去完成,它自己只是更新了valid状态标志

先看Abstractor中cancel的实现吧:
java.nio.channels.spi.AbstractSelector

1
2
3
4
5
6
private final Set cancelledKeys = new HashSet();
void cancel(SelectionKey k) {          
    synchronized (cancelledKeys) {
        cancelledKeys.add(k);
    }
}

AbstractSelector中定义了集合cancelledKeys,用于存储所有需要取消的SelectionKey

咋一看,这么做就结束了吗?这样就算取消注册了吗?大家难免会有疑问,我也不卖关子了,要知道,前期所有的种种准备,最终都是会poll或epoll做准备的,我们先从这个角度出发,知道PollSelectorImpl和EPollSelectorImpl在doSelect实现代码中都有这样的代码processDeregisterQueue(),猜测这个就是实现取消注册逻辑的,于是找到其实现代码如下:
sun.nio.ch.SelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void processDeregisterQueue() throws IOException {
    Set cks = cancelledKeys();
    synchronized (cks) {
        Iterator i = cks.iterator();
        while (i.hasNext()) {
            SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
            try {
                implDereg(ski);
            } catch (SocketException se) {
                IOException ioe = new IOException(
                    "Error deregistering key");
                ioe.initCause(se);
                throw ioe;
            } finally {
                i.remove();
            }
        }
    }
}

我们首先看到了一个cancelledKeys(),这个即是获得AbstractSelector中定义的cancelledKeys集合的,该方法就是把所有cancelledKeys中的SelectionKey取出来一个一个处理掉,调用的方法是implDereg。

在分析processDeregisterQueue之前,就能大胆猜测,具体取消注册逻辑,肯定要深入到各个具体的Selector实现类,因为不同Selector在存储SelectionKey的方式上往往不同,如PollSelectorImpl采用数组来存储,而EPollSelectorImpl采用HashMap来存储,下面分别分析两种实现:

PollSelectorImpl的implDereg

PollSelectorImpl的implDereg与implRegister一样,由AbstractPollSelectorImpl代劳:
sun.nio.ch.AbstractSelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected void implDereg(SelectionKeyImpl ski) throws IOException {
    // Algorithm: Copy the sc from the end of the list and put it into
    // the location of the sc to be removed (since order doesn''t
    // matter). Decrement the sc count. Update the index of the sc
    // that is moved.
    int i = ski.getIndex();
    assert (i >= 0);
    if (i != totalChannels - 1) {
        // Copy end one over it
        SelectionKeyImpl endChannel = channelArray[totalChannels-1];
        channelArray[i] = endChannel;
        endChannel.setIndex(i);
        pollWrapper.release(i);
        PollArrayWrapper.replaceEntry(pollWrapper, totalChannels - 1,
                                      pollWrapper, i);
    } else {
        pollWrapper.release(i);
    }
    // Destroy the last one
    channelArray[totalChannels-1] = null;
    totalChannels--;
    pollWrapper.totalChannels--;
    ski.setIndex(-1);
    // Remove the key from keys and selectedKeys
    keys.remove(ski);
    selectedKeys.remove(ski);
    deregister((AbstractSelectionKey)ski);
    SelectableChannel selch = ski.channel();
    if (!selch.isOpen() && !selch.isRegistered())
        ((SelChImpl)selch).kill();
}
1. 有必要先把几个盲点拎出来给大家说明一下:

如果大家影响还不够深刻的话,先把几个主要的变量给大家复习一下:

  • channelArray作为内部使用的用来存储SelectionKey引用的数组
  • SelectionKeyImpl作为SelectionKey的实现,为PollSelectorImpl准备一个变量index,
    该index即为SelectionKey在channelArray中的下标
  • pollWrapper有点像pollfd数组的内存装配器,即能够装填pollfd,又能够释放其空间,当然也能替换pollfd了,
    这里出现的release即为释放相关的工作,而replaceEntry即为替换pollfd了
  • totalChannels为channelArray的总体通道数,同时也是第一个可以存放新的SelectionKey的数组下标
  • keys是一个Set,是选择器用来存储所有已注册的SelectionKey的集合
  • selectedKeys同样是一个Set,不过它是用来存储所有已经ready的SelectionKey的集合,选择器在select过程之后,
    已经ready而被选择的SelectionKey即保存在其中
  • 这里还涉及到两个方法,一个是deregister,另一个就是kill:
    • deregister逻辑比较简单,见AbstractSelector中的实现,
      protected final void deregister(AbstractSelectionKey key) {
              ((AbstractSelectableChannel)key.channel()).removeKey(key);
      }
      • 作为Selector,它保留了所有注册到它之上的所有SelectionKey引用,而通道也这么做,它也将所有自己注册过的所有
        SelectionKey引用保留下来
      • 再通道自行注册时,会把注册工作交给选择器去完成,完成之后,就将SelectionKey保留在自己的集合中
      • 在取消注册中,一定要在通道的SelectionKey集合中移除key,所以就有了deregister的过程
    • 至于kill,这里就先不讲了,后面会专门起一章来说明这些如wakeup,interrupt之类的内容,我们只需有这样的认识,
      就是把该通道消除掉就对了,其实可以看看kill的条件:如果该通道未打开,并且没有注册过,那么就是名存实亡了,
      当然要kill掉了
2. 现在再来看整个过程
  • 先拿到通道在channelArray中的有效下标i
  • 如果i指示不是最后一个SelectionKey,我们就要开工了:channelArray中用最后一个SelectionKey覆盖i指向的
    SelectionKey,并更新自己的下标index,然后用pollWrapper同步pollfd数组
  • channelArray只需要将最后一个SelectionKey清空,并更新总通道数即可
  • pollWrapper也只需要更新总通道数,即总pollfd数
  • 然后就是一些清理工作,如ski设置自己的index为无效;keys中将ski移除;通道保留的keys中同步移除ski,
    最后判断该通道是否名存实亡,如果是则kill掉
3. 小结

PollSelector的取消注册过程要分两个阶段,第一个阶段就是将需要取消的SelectionKey放入cancelledKeys中,这个过程不会真正取消注册,第二个阶段,就是在PollSelectorImpl执行doSelect的时候,在预处理processDeregisterQueue过程中会遍历cancelledKeys,执行具体的取消注册逻辑:从已注册和已选择的集合中剔除该SelectionKey,同时需要在选择器和通道保留的SelectionKey引用容器中剔除,并且需要同步到pollfd数组。分析代码可以发现,取消注册过程主要消耗在doSelect过程中。

EPollSelectorImpl的implDereg

分析了PollSelectorImpl的取消过程之后,已经决定没太大必要讲解EPollSelectorImpl,大体上肯定是差不多的,不信大家可以结合EPollSelectorImpl特有的数据类型,看一下implDereg的实现:
sun.nio.ch.EPollSelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void implDereg(SelectionKeyImpl ski) throws IOException {
    assert (ski.getIndex() >= 0);
    SelChImpl ch = ski.channel;
    int fd = ch.getFDVal();
    fdToKey.remove(new Integer(fd));
    pollWrapper.release(ch);
    ski.setIndex(-1);
    keys.remove(ski);
    selectedKeys.remove(ski);
    deregister((AbstractSelectionKey)ski);
    SelectableChannel selch = ski.channel();
    if (!selch.isOpen() && !selch.isRegistered())
        ((SelChImpl)selch).kill();
}


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
epoll()实现分析
epoll源码实现分析[整理]
epoll源码分析
理解网络IO模型(四)
select,poll,epoll总结
深入浅出NIO Socket
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服