Netty学习——源码篇5 EventLoop 备份

 1 Reactor线程模型

        Reactor线程模型 中对Reactor的三种线程模型——单线程模型、多线程模型、主从多线程模型做了介绍,这里具体分析Reactor在Netty中的应用。

1.1单线程模型

单线程模型处理流程如下图:

        单线程模型,即Accept的处理和Handler的处理都在同一个线程中。这个模型的弊端是:当其中某个Handler阻塞时,会导致其他所有的Client的Handler都无法执行,并且更严重是,Handler的阻塞也会导致整个服务不能接收新的Client请求(因为Accept也被阻塞了)。 因为这个缺陷,所以单线程Reactor模型在Netty中的应用场景比较少。

1.2 多线程模型

        Netty中Reactor多线程模型的应用如如下图:

        1、设计一个专门的线程Accept,用于监听客户端的TCP连接请求。

        2、客户端的I/O操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定,因此在这个客户端连接中的所有I/O操作都是在同一个线程中完成的。

        3、客户端连接有很多,但是NIO线程数是比较少的,因此一个NIO线程可以同时绑定到多个客户端连接中。

1.3 主从多线程模型

        主从Reactor多线程模型在Netty中的应用,如下图

             一般情况下,Reactor的多线程模型已经适用于大部分业务场景。但如果服务端需要同时处理大量的客户端连接请求,或者需要再客户端连接时增加一些诸如权限的校验等操作,那么单个Accept就很有可能处理不过来,将会造成大量的客户端连接超时。主从Reactor多线程模型将服务端接收客户端的连接请求专门设计为一个独立的连接池。主从Reactor多线程模型和Reactor多线程模型很类似,只是在主动Reactor多线程模型的Accept线程池中获取数据,通过认证鉴权后进行派遣,再分配给Reactor线程池来处理客户端请求。

2 EventLoopGroup与Reactor关联

        不同的设置NioEventLoopGroup的方式对应了不同的Reactor线程模型。

        1、单线程模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

            首先实例化一个NioEventLoopGroup,接着调用bootstrap.group(bossGroup)设置服务端的EventLoopGroup。这里有个疑惑:在启动服务端的Netty程序时,需要设置bossGroup和workerGroup,为什么这里只设置了1个bossGroup?原因很简单,ServerBootstrap重写了group方法,代码如下:

@Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

        因此,当传入一个group时,bossGroup和workerGroup就是同一个NioEventLoopGrouop,并且这个NioEventLoopGroup线程池数量只设置了1个线程,也就是说Netty中的Acceptor和后续的所有客户端连接的I/O操作都是在一个线程中处理的。那么对应到Reactor的线程模型中,这样设置NioEventLoopGroup,就相当于Reactor的单线程模式。

        2、多线程在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(16);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

        从代码中可以看出,只需要将NioEventLoopGroup的参数设置大于1,就是Reactor多线程模型。

        3、主从Reactor模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(bossGroup,workGroup);

        bossGroup为主线程,而workerGroup中的线程数是CPU核数*2,因此对应到Reactor线程模型中,这样设置的NioevGroup就是主从Reactor多线程模型。

3 EventLoopGroup的实例化

        首先,来看一下EventLoopGroup的类结构图,如下图:

        然后,在通过时序图来了解一下EventLoopGroup初始化的基本过程,如下图:

 

          基本步骤如下:

        1、EventLoopGroup内部维护一个属性为EventExecutor的children的数组,其大小是nThreads,这样就初始化一个线程池。

        2、在实例化NioEventLoopGroup时,如果指定县城池大小,则nThreads就是指定的值,否则是CPU核数 * 2。

        3、在MultithreadEventExecutorGroup中调用newChild()抽象方法来初始化children数组。

        4、newChild()抽象方法实际上是在NioEventLoopGroup中实现的,由它返回一个NioEventLoop实例。

        5、初始化NioEventLoop对象并给属性赋值,具体赋值属性如下:

                (1)provider:就是在NioEventLoopGroup构造器中,调用SelectorProvider的provider()方法获取的SelectorProvider对象。

                (2)selector:就是在NioEventLoop构造器中,调用selector=provider.openSelector()方法获取的Selector对象。

4 执行任务者EventLoop

        NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop又继承自SingleThreadEventExecutor。SingleThreadEventExecutor是Netty对本地线程的抽象,它内部有一个Thread属性,实际上就是存储了一个本地Java 线程。因此可以简单的认为,一个NioEventLoop对象其实就是一个和特定的线程进行绑定,并且在NioEventLoop声明周期内,其绑定的线程都不会再改变,NioEventLoop的类层次结构图如下:

        NioEventLoop的类层次结构比较复杂,只需要关注重点即可。首先看NioEventLoop的继承关系:NioEventLoop继承SingleThreadEventLoop, SingleThreadEventLoop继承SingleThreadEventExecutor,SingleThreadEventExecutor继承AbstractScheduledEventExecutor。

        在AbstractScheduledEventExecutor,Netty实现了NioEventLoop的Schedule功能,即通过调用一个NioEventLoop实例的schedule方法来运行一些定时任务。而在SingleThreadEventLoop中,又实现了任务队列的功能。通过它,可以调用一个NioEventLoop实例的execute()方法向任务队列中添加一个Task,并由NioEventLoop进行调度执行。

        通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是作为任务队列,执行taskQueue中的人物,例如用户调用eventLoop.schedule提交的定时任务也是由这个线程执行的。

4.1 NioEventLoop的实例化过程

        先了解一下EventLoop实例化的运行时序图,如下:

        从上图可以看出,SingleThreadEventExecutor有一个名为thread的Thread类型属性,这个属性就是与SingleThreadEventExecutor关联的本地线程。来看thread是在哪里被赋值的,代码如下:

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

         前面分析过,SingleThreadEventExecutor启动时会调用doStartThread方法,然后调用executor.execute方法,将当前线程赋值给thread。在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法,因为NioEventLoop实现了这个方法,所以根据多态性,其实调用的是NioEventLoop.run方法。

4.2 EventLoop与Channel关联

        在Netty中,每个Channel都有且仅有一个EventLoop与之关联,它们的关联过程如下图:

               

        从上图可以看到,当调用AbstractChannel.register()方法后,就完成了Channel和EventLoop的关联,register方法的具体实现如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        在register方法中,会将一个EventLoop赋值给AbstractChannel内部的eventLoop属性,这句代码就完成了EventLoop与Channel的关联过程。

4.3 EventLoop 启动

        前面已经介绍NioEventLoop本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。

        按照这个思路,只需要找到在哪里调用了SingleThreadExecutor中thread属性的start方法就可以知道在哪里启动这个线程了。前面分析过,其实thread.start()被封装在SingleThreadExecutor.startThread()方法中,代码如下:

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

        STATE_UPDATER是SingleThreadExecutor内部维护的一个属性,它的作用是标识当前的Thread的状态。在初始化的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用startThread方法时,就会进入if语句内,进而调用thread.start方法。而这个关键的startThread方法又是在哪调用的呢?用方法调用关系反向查找功能,就恢复阿贤,startTahread方法是在SingleThreadEventExecutor的execute方法中调用的,代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

        既然如此,现在只需要找到第一次调用SingleThreadEventExecutor的execute方法的位置即可。前面在提到注册Channel的过程中,会在AbstractChannel的register方法中调用eventLoop.execute方法,在EventLoop中进行Channel注册代码的执行,AbstractChannel的register方法的关键代码如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           //删除判断代码

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   //删除异常处理代码
                }
            }
        }

        很明显,从Boostrap的bind方法一路跟踪到AbstractChannel的register方法,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()返回值为false,于是进入else分支,在这个分支中调用eventLoop.execute方法,而NioEventLoop没有实现execute方法,因此调用的是SingleThreadEventExecutor的execute方法,关键代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

       由于 inEventLoop ==false,因此直行道else分支就调用startThread方法来启动SingleThreadEventExecutor内部关联的Java本地线程。用一句话总结:当EventLoop的execute方法第一次被调用时,会触发startThread方法的调用,进而启动EventLoop所对应的Java本地线程。

        完整的EventLoop启动时序图如下: