您好,欢迎来到爱玩科技网。
搜索
您的当前位置:首页Netty4源码再次分析

Netty4源码再次分析

来源:爱玩科技网

先上个demo,好顺着往里跟代码

public class Netty4Hello {

    /**
     * 服务端监听的端口地址
     */
    private static final int portNumber = 7878;

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 以("\n")为结尾分割的 解码器
                    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                    // 字符串解码 和 编码
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());

                    // 自己的逻辑Handler
                    pipeline.addLast("handler", new HelloServerHandler());
                }
            });

            // 服务器绑定端口监听
            ChannelFuture f = b.bind(portNumber).sync();
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();

            // 可以简写为
            /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

NioSocketChannel 的初始化

在 Netty 中, Channel 是一个 Socket 的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例.

io.netty.bootstrap.AbstractBootstrap#channel
	>io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
		>构造io.netty.channel.ReflectiveChannelFactory

Channel 实例化

Channel 是通过工厂方法 ChannelFactory.newChannel() 来实例化的, 那么 ChannelFactory.newChannel() 方法在哪里调用呢?
跟踪调用

io.netty.bootstrap.Bootstrap#connect(java.net.SocketAddress, java.net.SocketAddress)
	>io.netty.bootstrap.Bootstrap#doResolveAndConnect
		>io.netty.bootstrap.AbstractBootstrap#initAndRegister
			>io.netty.bootstrap.ChannelFactory#newChannel

在 newChannel 中, 通过类对象的 newInstance 来获取一个新 Channel 实例, 因而会调用NioSocketChannel 的默认构造器.

NioServerSocketChannel

io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
	>io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
		>java.nio.channels.spi.SelectorProvider#openServerSocketChannel

NioEventLoopGroup构造

类结构图

NioEventLoop 有几个重载的构造器, 不过内容都没有什么大的区别, 最终都是调用的父类MultithreadEventLoopGroup构造器:

io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
	>io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
		>io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
			>executor netty实现的线程池管理 io.netty.util.concurrent.ThreadPerTaskExecutor
			>选择策略io.netty.util.concurrent.DefaultEventExecutorChooserFactory
			>拒绝策略io.netty.util.concurrent.RejectedExecutionHandlers
			// 大小为 nThreads NioEventLoop
			>children = new EventExecutor[nThreads];
				>children[i] = newChild(executor, args)
					>io.netty.channel.nio.NioEventLoopGroup#newChild
					
NioEventLoop解析

类图

NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 因此我们可以认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变.

  • 在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务.
  • 在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.
    在此方法的一开始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 作为可执行的 task 等待被调度执行.

NioEventLoop 肩负着两种任务,

  • 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO
    事件、读写数据与数据的处理等;
  • 第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule
    提交的定时任务
    也是这个线程执行的.

构造方法

io.netty.channel.nio.NioEventLoop#NioEventLoop
	>io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)
		>io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor(io.netty.util.concurrent.EventExecutorGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)

EventLoop 与 Channel 的关联

Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联,

从上图中我们可以看到, 当调用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的关联. register 实现如下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 删除条件检查.
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

在 AbstractChannel#AbstractUnsafe.register 中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程.

什么时候启动

EventLoop 与 Channel 的关联完成后,就会调用eventLoop.execute

// 向任务队列中添加一个 task
io.netty.util.concurrent.SingleThreadEventExecutor#execute
	>io.netty.util.concurrent.SingleThreadEventExecutor#startThread
		>io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
			>java.util.concurrent.Executor#execute 实现类io.netty.util.concurrent.ThreadPerTaskExecutor

NioEventLoop run方法

上面了解到,当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动.
run 方法可以说是十分简单, 主要就是调用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中.

@Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                    	// 关键1:select事件
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                // 有意思点:ioRatio 默认是 50, 则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1. 
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                    	// 迭代 selectedKeys 获取就绪的 IO 事件
                        processSelectedKeys();
                    } finally {
                        // 处理IO的时间,按比例处理任务
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 有限时间内执行taskQueue任务
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
io.netty.channel.nio.NioEventLoop#run
	>io.netty.channel.nio.NioEventLoop#select
		>

分析select方法
2个关键步骤,见下代码

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // 关键步骤1:有任务则执行selectNow(不阻塞),将wakenUp CAS设置为true
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
				
				// 关键步骤2:没有任务,则执行select (阻塞超时时间)
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

最后总结下调用流程

io.netty.bootstrap.AbstractBootstrap#bind(int)
	>io.netty.bootstrap.AbstractBootstrap#bind(int)
		>io.netty.bootstrap.AbstractBootstrap#doBind
			>io.netty.bootstrap.AbstractBootstrap#initAndRegister
				
				>io.netty.bootstrap.ChannelFactory#newChannel 调用NioServerSocketChannel构造器实例化
					>NioServerSocketChannel构造,并注册SelectionKey.OP_ACCEPT事件
					io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)
			
				> io.netty.bootstrap.AbstractBootstrap#init
					>io.netty.bootstrap.ServerBootstrap#init
						>io.netty.channel.ChannelPipeline初始化
							>添加Acceptor Handler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
								
>接收OP_ACCEPT到事件后分发给WorkerGroup处理
		>(WorkerGroup)childGroup io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
			>io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
				>io.netty.channel.AbstractChannel.AbstractUnsafe#register
						>NioEventLoop java.util.concurrent.Executor#execute
							>io.netty.util.concurrent.SingleThreadEventExecutor#execute
								>io.netty.util.concurrent.SingleThreadEventExecutor#addTask
								>io.netty.util.concurrent.SingleThreadEventExecutor#startThread

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- aiwanbo.com 版权所有 赣ICP备2024042808号-3

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务