今天看啥  ›  专栏  ›  小圣996

【Netty】Netty的启动过程二

小圣996  · 简书  ·  · 2020-01-11 16:45

在上篇文章《 Netty的启动过程一 》中,我们讲述了Netty服务端boss线程的启动过程,但是worker线程是如何启动的还是未知的。我们知道了boss线程是在ServerBootstrap的bind方法中启动的,再回到上篇文章中Netty的启动代码段,在NioEventLoopGroup的初始化方法和ServerBootstrap的bind方法中间还隔了很多代码,这些源码都还没看的,我们现在来看看这些源码。

继NioEventLoopGroup初始化后,服务端便创建了一个ServerBootstrap实例,这个类是服务端Netty特有的启动类,客户端的为Bootstrap;接下来便把boss线程组和worker线程组分别赋给了ServerBootstrap的group和childGroup变量,注意 worker线程组是赋给了childGroup ;接下来便是设置一些参数,比如channel,option,childOption,handler,childHandler,注意带child的和没带child的区别:带child的基本是设置 ServerChannel 的子 channel 的选项,即没带child的基本都是对boss线程而言的,而带child的基本都是对worker线程而言的。
这里需要注意channel(NioServerSocketChannel.class)一句,它是指设置boss线程channel类型。

接下来要了解下Netty的ChannelPipeline和ChannelHandler的关系了,这里引用《 游戏之网络进阶 》的一幅图:

数据在ChannelPipeline中流程.png

pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler,即负责消息入站和出站的流程。

在上篇文章中,我们知道了在启动boss线程后,虽然boss线程在for循环中无限循环,但是是没有进入到后面的if分支的SelectionKey.OP_ACCEPT中的,只有先进了这里,才会启动服务端的worker线程:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            ...
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
}

因此,我们再把断点打在threadFactory.newThread(command).start()中,然后启动客户端去连接服务端,看下它的调用堆栈是怎样的:

客户端连接启动worker线程.png

从上篇可知,当每次有客户端连接时,此时readyOps=16,继而启动worker线程;每次读取客户端数据时,此时readyOps=1,继而worker线程读取数据;很明显,Netty是以readyOps的值区分连接和读写数据的, 那么readyOps又是如何设置的呢 ?看代码,readyOps取自于SelectionKey,而SelectionKey取自于SelectionKey[]数组,而SelectionKey[]

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

来自selectedKeys.flip(),flip()实现如下:

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }

即SelectionKey[]来自keysA或keysB地址,而上述processSelectedKeys方法处于NioEventLoop的无限循环中,即boss线程(worker线程)的无限循环中:

    @Override
    protected void run() {
        for (;;) {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        ...
                }
                processSelectedKeys();
        }
    }

也就是说,boss线程在无限循环SelectionKey[]即keysA或keysB的值,当读到SelectionKey不为空时,也就读到了readyOps值,根据readyOps值,就知道客户端是什么操作了 ,证据如下:

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            }
        }
    }

现在知道了worker线程启动和读写数据跟这个readyOps值有关,那这个值又是如何设置进去的呢?我们已知SelectionKey[]来自于keysA或keysB,那么我们全局搜索这两个变量看怎么用的,就知道它是如何设置值的了。

keysA全局引用.png

可见,keysA或keysB唯一设置值的地方是在add方法中,因此我们在add方法中打上断点,启动客户端去连接,就应该知道SelectionKey[]值是如何设置的了。

SelectionKey[]设置SelectionKey的readyOps为16.png

果然,当客户端请求连接服务端时, 在boss线程中 ,进入了此断点,而且SelectionKey的readyOps设置成了16,后续在processSelectedKey方法中,boss线程就是根据此readyOps值再启动worker线程的。而且由调用堆栈可知,它正是在boss无限循环的run()方法中进入了select(wakenUp.getAndSet(false))方法,查询是否有就绪的IO事件(读写,连接等),有即设置keysA或keysB的SelectionKey值。 而这些SelectionKey值是Netty监听到了这些IO事件,封装进SelectionKey的。根据操作系统的不同而封装过程不同。

Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
摘自:《 新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

同理,当客户端发数据给服务端时,也进入了此断点,而且SelectionKey的readyOps设置成了1, 只是此时是在worker线程中了。

SelectionKey[]设置SelectionKey的readyOps为1.png

现在知道了worker线程启动的原因,但是过程是怎样的呢?

我们仍在threadFactory.newThread(command).start()处打上断点,由上篇可知,第一次进入此断点,Netty启动了boss线程,第二次进入此断点即启动了worker线程,现在我们来看下第二次进入此断点的情况(请查看上图-> 客户端连接启动worker线程.png ):

由图片堆栈打印所知,在boss线程中,首先由readyOps=16,进入了NioMessageUnsafe.read()方法,如下:

        @Override
        public void read() {
            ...
            do {
                //读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
                int localRead = doReadMessages(readBuf);
                ...
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
                
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
        }

这里有两个重要方法,一为doReadMessages(readBuf),主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用:

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }

再看boss线程中NioSocketChannel继承关系:

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

NioSocketChannel继承自AbstractNioByteChannel,注意在这里先定义了SelectionKey.OP_READ操作,以供worker线程监听此事件:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);//为后续worker线程监听SelectionKey.OP_READ事件
    }

另一为pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            }
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        }

由此,在childGroup.register(child)中,注册了此channel(NioSocketChannel),并设置了pipeline,参数等其他信息。

boss线程中的childGroup.png

此后,在后续的register方法中, 由eventLoop.execute方法,启动了worker线程,也是由MultithreadEventLoopGroup中的register方法,以next()限制了worker线程数量。

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;//将channel和eventLoop关联起来,即将channel和worker线程关联起来

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            }
        }

并在register0方法中,将netty的niochannel绑定到java原生的selectkey参数上,并告知worker线程pipeline各handler channel的注册和激活事件。

        private void register0(ChannelPromise promise) {
            try {
                boolean firstRegistration = neverRegistered;
                doRegister();//将netty的niochannel绑定到java原生的selectkey参数上
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();//告知pipeline中各handler有channel注册

                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();//告知pipeline中各handler有channel激活
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
            }
        }

看doRegister()方法,在AbstractNioChannel下内部抽象类AbstractNioUnsafe的doRegister()方法中:

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
            }
        }
    }

将netty的channel绑定到java原生的selectkey参数上,如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了(见processSelectedKeysOptimized方法k.attachment())。
现在,worker线程如何启动的也知道了, 那么worker线程是如何读取数据的呢?

这次,我们把断点打在if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)一句,然后启动客户端连接服务端并给服务端发数据,这时堆栈为:

服务端读取客户端数据时

把该图与上面“客户端连接启动worker线程.png”对比, 启动worker线程前,readyOps=16,此时是在boss线程中,实际用的unsafe是NioMessageUnsafe.read();读取客户端数据时,readyOps=1,此时是在worker线程中,实际用的是NioByteUnsafe.read()。 此后,经历worker线程的pipeline,将数据发至用户自定义的handler,这便完成了对客户端数据的读取。

那NioMessageUnsafe是如何来的呢?
其实NioMessageUnsafe来自ServerBootstrap的bind方法,跟下去,在AbstractBootstrap的initAndRegister()方法中,调用channelFactory.newChannel()方法用反射实例化了boss线程的NioServerSocketChannel。

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ...
        }

        ChannelFuture regFuture = config().group().register(channel);
        ...
        return regFuture;
    }

证据如下,在初始化ServerBootstrap时,有这样一句bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class),它是指设置boss线程channel类型。

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

在上面设置了ServerBootstrap的channelFactory,反射类为NioServerSocketChannel,再以newChannel()方法实例化了NioServerSocketChannel,最终会来到这里:

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在这里设置了boss线程将监听SelectionKey.OP_ACCEPT事件,再看它的super方法,NioServerSocketChannel继承自AbstractNioMessageChannel,而AbstractNioMessageChannel也继承自AbstractNioChannel,AbstractNioChannel又继承自AbstractChannel,最终也会来到这里:

      protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个AbstractNioMessageChannel类,因此此处实际调用的是AbstractNioMessageChannel的newUnsafe() 方法,该方法中new了一个内部类NioMessageUnsafe实例,该内部类继承了AbstractNioUnsafe。NioMessageUnsafe即来自于此。

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            ...
            do {
                //读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
                int localRead = doReadMessages(readBuf);
                ...
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
                
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
        }
    }

NioByteUnsafe又是如何来的呢?
其实NioByteUnsafe来自于NioMessageUnsafe.read()方法,该方法中有两个重要方法之一doReadMessages(readBuf),作用主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用,我们在前面将它跟踪至了AbstractNioByteChannel,继续跟下去会发现AbstractNioByteChannel又继承自AbstractNioChannel:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ch.configureBlocking(false);
    }

AbstractNioChannel继承自AbstractChannel:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个NioSocketChannel类,因此此处实际调用的是NioSocketChannel的newUnsafe() 方法,该方法中new了一个内部类NioSocketChannelUnsafe实例,该内部类继承了NioByteUnsafe。NioByteUnsafe即来自于此。

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

    private final class NioSocketChannelUnsafe extends NioByteUnsafe {
    }

这样,worker线程的启动过程也讲完了。

包括上篇文章《 Netty的启动过程一 》,大致讲解了Netty服务端是如何启动boss线程和worker线程的,如何读取数据的,但也仅是主要的枝干代码,细节之处还有很多没讲全,还有很多重要组件,它们的功能及实现都没讲的。这两篇文章的主要目的,是以一个Netty新手的角度讲解如何看Netty源码,那就是大胆去猜,去验证,去查资料,去看别人思路,还有就是多打断点去调试,不要想着一次全搞懂,而是多看多查多验证去弥补以前没看到的,没看懂的,并不断纠正以前错误认识的,所谓Netty之大,一锅炖不下,其余的只能在后续文章慢慢讲解了,这里先弄懂个大概即可。




原文地址:访问原文地址
快照地址: 访问文章快照