netty中的發動機--EventLoop及其實現類NioEventLoop的源碼分析

来源:https://www.cnblogs.com/zhuge134/archive/2019/06/27/11094766.html
-Advertisement-
Play Games

EventLoop 在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要瞭解netty的線程模型。NioEventLoopGroup可以理解為一組線程,這些線程每一個都可以獨立地處理多個channel產生的io事件。 N ...


EventLoop

在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要瞭解netty的線程模型。NioEventLoopGroup可以理解為一組線程,這些線程每一個都可以獨立地處理多個channel產生的io事件。

NioEventLoopGroup初始化

我們看其中一個參數比較多的構造方法,其他一些參數較少的構造方法使用了一些預設值,使用的預設參數如下:

  • SelectorProvider類型,用於創建socket通道,udp通道,創建Selector對象等,預設值是SelectorProvider.provider(),大部分情況下使用預設值就行,這個方法最終創建的是一個WindowsSelectorProvider對象
  • SelectStrategyFactory,Select策略類的工廠類,它的預設值是DefaultSelectStrategyFactory.INSTANCE,就是一個SelectStrategyFactory對象本身,而SelectStrategyFactory工廠產生的是DefaultSelectStrategy策略類。
  • RejectedExecutionHandler,拒絕任務的策略類,決定在任務隊列已滿時採取什麼樣的策略,類似於jdk線程池的RejectedExecutionHandler的作用

接下來,我們看一下其中的一個常用的構造方法,

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

可見,當前類中並沒有什麼初始化邏輯,直接調用了父類的構造方法,所以我們接著看父類MultithreadEventLoopGroup的構造方法:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

同樣,並未做任務處理,直接調用父類構造方法,所以我們接著看MultithreadEventExecutorGroup構造方法,初始化邏輯的實現在這個類中,

MultithreadEventExecutorGroup構造方法

通過上一小結的分析,我們知道NioEventLoopGroup的構造方法的主要邏輯的實現是在MultithreadEventExecutorGroup類中,並且在調用構造方法的過程中加上了一個參數的預設值,即EventExecutorChooserFactory類型參數的預設值DefaultEventExecutorChooserFactory.INSTANCE,這個類以輪詢(roundrobin)的方式從多個線程中依次選出線程用於註冊channel。
總結一下這段代碼的主要步驟:

  • 首先是一些變數的非空檢查和合法性檢查
  • 然後根據傳入的線程數量,創建若幹個子執行器,每個執行器對應一個線程
  • 最後以子執行器數組為參數,使用選擇器工廠類創建一個選擇器
  • 最後給每個子執行器添加一個監聽器,以監聽子執行器的終止,做一些簿記工作,使得在所有子執行器全部終止後將當前的執行器組終止

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
    EventExecutorChooserFactory chooserFactory, Object... args) {
    // 首先是變數的非空檢查以及合法性判斷,
    // nThreads在MultithreadEventLoopGroup的構造方法中已經經過一些預設值處理,
    if (nThreads <= 0) {
    throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

      // 這裡一般都會使用預設值,
      // ThreadPerTaskExecutor的作用即字面意思,一個任務一個線程
      if (executor == null) {
          executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
      }
    
      // 子執行器的數組,一個子執行器對應一個線程
      children = new EventExecutor[nThreads];
    
      // 根據傳入的線程數量創建多個自執行器
      // 註意,這裡子執行器創建好後並不會立即運行起來
      for (int i = 0; i < nThreads; i ++) {
          boolean success = false;
          try {
              children[i] = newChild(executor, args);
              success = true;
          } catch (Exception e) {
              // TODO: Think about if this is a good exception type
              throw new IllegalStateException("failed to create a child event loop", e);
          } finally {
              // 如果創建子執行器不成功,那麼需要將已經創建好的子執行器也全部銷毀
              if (!success) {
                  for (int j = 0; j < i; j ++) {
                      children[j].shutdownGracefully();
                  }
    
                  // 等待所以子執行器停止後在退出
                  for (int j = 0; j < i; j ++) {
                      EventExecutor e = children[j];
                      try {
                          while (!e.isTerminated()) {
                              e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                          }
                      } catch (InterruptedException interrupted) {
                          // Let the caller handle the interruption.
                          Thread.currentThread().interrupt();
                          break;
                      }
                  }
              }
          }
      }
    
      // 創建一個子執行器的選擇器,選擇器的作用是從子執行器中選出一個
      // 預設使用roundRobin的方式
      chooser = chooserFactory.newChooser(children);
    
      final FutureListener<Object> terminationListener = new FutureListener<Object>() {
          @Override
          public void operationComplete(Future<Object> future) throws Exception {
              if (terminatedChildren.incrementAndGet() == children.length) {
                  terminationFuture.setSuccess(null);
              }
          }
      };
    
      // 給每個子執行器添加監聽器,在子執行器終止的時候做一些工作
      // 每有一個子執行器終止時就將terminatedChildren變數加一
      // 當所有子執行器全部終止時,當前這個執行器組就終止了
      for (EventExecutor e: children) {
          e.terminationFuture().addListener(terminationListener);
      }
    
      // 包裝一個不可變的集合
      Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
      Collections.addAll(childrenSet, children);
      readonlyChildren = Collections.unmodifiableSet(childrenSet);

    }

NioEventLoopGroup.newChild

上面的方法中調用了newChild方法來創建一個子執行器,而這個方法是一個抽象方法,我們看NioEventLoopGroup類的實現:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可見僅僅是簡單地創建了一個NioEventLoop對象。

小結

到這裡,我們就把NioEventLoopGroup的初始化過程分析完了。我們不禁思考,既然NioEventLoopGroup是一個執行器組,說白了就是一組線程,那這些線程是什麼時候跑起來的呢?如果讀者還有印象,應該能記得我們在分析Bootstrap建立連接過程時,channel初始化之後需要註冊到EventLoopGroup中,其實是註冊到其中的一個EventLoop上,註冊邏輯最終是在AbstractChannel.AbstractUnsafe.register方法中實現的,其中有一段代碼:

      if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

首先調用eventLoop.inEventLoop()判斷執行器的線程與當前線程是否是同一個,如果是則直接執行註冊的代碼,如果不是就調用eventLoop.execute將註冊邏輯封裝成一個任務放到執行器的任務隊列中,接下里我們就以這個方法為切入點,探究一下子執行器線程的啟動過程。

AbstractEventExecutor.inEventLoop

首先,讓我們來看一下這個方法,這個方法的作用是判斷當前線程與執行器的線程是否同一個線程。

public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

SingleThreadEventExecutor.inEventLoop

代碼很簡單,就不多說了。
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}

SingleThreadEventExecutor.execute

方法很簡單,核心邏輯在startThread方法中,

public void execute(Runnable task) {
    // 非空檢查
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 執行到這裡一般都是外部調用者,
    boolean inEventLoop = inEventLoop();
    // 向任務隊列中添加一個任務
    addTask(task);
    // 如果當前線程不是執行器的線程,那麼需要檢查執行器線程是否已經運行,
    // 如果還沒在運行,就需要啟動線程
    if (!inEventLoop) {
        startThread();
        // 檢查線程是否被關閉
        if (isShutdown()) {
            boolean reject = false;
            try {
                // 將剛剛添加的任務移除
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    // addTaskWakesUp不知道這個變數意義是什麼,NioEventLoop傳進來的是false
    // 向任務隊列中添加一個空任務,這樣就能夠喚醒阻塞的執行器線程
    // 有些情況下執行器線程會阻塞在taskQueue上,
    // 所以向阻塞隊列中添加一個元素能夠喚醒哪些因為隊列空而被阻塞的線程
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

SingleThreadEventExecutor.startThread

這個方法的主要作用是維護內部的狀態量state,使用cas指令併發情況下對狀態量的修改是線程安全的,並且對於狀態量的判斷保證啟動邏輯只被執行一次

private void startThread() {
    // 狀態量的維護
    if (state == ST_NOT_STARTED) {
        // 這裡使用了jdk中的原子更新器AtomicIntegerFieldUpdater類,
        // 使用cpu的cas指令保證併發情況下能夠安全地維護狀態量
        // 保證只有一個線程能夠執行啟動邏輯,保證啟動邏輯只被執行一次
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 實際啟動線程的邏輯
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

SingleThreadEventExecutor.doStartThread

這個方法我就不貼代碼了,說一下它的主要作用:

  • 使用內部的Executor對象(一般是一個ThreadPerTaskExecutor)啟動一個線程,並執行任務
  • 維護執行器的運行狀態,主要是通過內部的狀態量和cas指令來保證線程安全;此外維護內部的一些簿記量,例如線程本身的引用,線程啟動時間等
  • 線程結束時做一些收尾和清理工作,例如將剩餘的任務跑完,運行關閉鉤子,關閉底層的selector(這個是具體的子類的清理邏輯),同時更新狀態量

具體的業務邏輯仍然是在子類中實現的,也就是SingleThreadEventExecutor.run()方法的具體實現。

NioEventLoop.run

我們仍然以NioEventLoop為例,看一下它實現的run方法。還大概講一下它的主要邏輯:

  • 首選這個方法是一個迴圈,不斷地通過調用jdk底層的selector接收io事件,並對不同的io事件做處理,同時也會處理任務隊列中的任務,以及定時調度或延遲調度的任務
  • 調用jdk的api, selector接收io事件
  • 處理各種類型的io事件
  • 處理任務

這裡,我就不貼代碼了,其中比較重要的是對一些併發情況的考慮和處理,如selector的喚醒時機。接下來,主要看一下對於各種io事件的處理,至於任務隊列以及調度隊列中任務的處理比較簡單,就不展開了。

NioEventLoop.processSelectedKeysOptimized

這個方法會遍歷所有接受到的io事件對應的selectionKey,然後依次處理。

private void processSelectedKeysOptimized() {
    // 遍歷所有的io事件的SelectionKey
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 處理事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        // 如果需要重新select,那麼把後面的selectionKey全部置0,然後再次調用selectNow方法
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

NioEventLoop.processSelectedKey

這個方法首先對SelectionKey無效的情況做了處理,分為兩種情況:channel本身無效了;channel仍然是正常的,只不過是被從當前的selector上註銷了,可能在其他的selector中仍然是正常運行的

  • 對於第一種情況,需要關閉channel,即關閉底層的連接
  • 對於第二種情況則不需要做任何處理。

接下來,我們著重分析一下對於四種事件的處理邏輯。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 如果selectionKey是無效的,那麼說明相應的channel是無效的,此時需要關閉這個channel
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        // 只關閉註冊在當前EventLoop上的channel,
        // 理論上來說,一個channel是可以註冊到多個Eventloop上的,
        // SelectionKey無效可能是因為channel從當前EventLoop上註銷了,
        // 但是channel本身依然是正常的,並且註冊在其他的EventLoop中
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        // 到這裡說明channel已經無效了,關閉它
        unsafe.close(unsafe.voidPromise());
        return;
    }

    // 下麵處理正常情況
    try {
        // 準備好的io事件
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        // 處理connect事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        // 處理write事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 處理read和accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

connect事件處理

從代碼中可以看出,connect事件的處理時通過調用NioUnsafe.finishConnect完成的,我們看一下AbstractNioUnsafe.finishConnect的實現:

public final void finishConnect() {
        // Note this method is invoked by the event loop only if the connection attempt was
        // neither cancelled nor timed out.

        assert eventLoop().inEventLoop();

        try {
            // 是否已經處於連接成功的狀態
            boolean wasActive = isActive();
            // 抽象方法,有子類實現
            doFinishConnect();
            // 處理future對象,將其標記為成功
            fulfillConnectPromise(connectPromise, wasActive);
        } catch (Throwable t) {
            fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
        } finally {
            // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
            // See https://github.com/netty/netty/issues/1770
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
        }
    }

可以看出,主要是通過調用doFinishConnect實現完成連接的邏輯,具體到子類中,NioSocketChannel.doFinishConnect的實現是:

protected void doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

write事件處理

對於的write事件的處理時通過調用NioUnsafe.forceFlush方法完成,最終的實現在AbstractChannel.AbstractUnsafe.flush0中:
大體上看,這個方法的邏輯比較簡單,但是實際上最複雜也是最核心的寫入邏輯在子類實現的doWrite方法中。由於本篇的重點在於把NioEventLoop的主幹邏輯梳理一下,所以這裡不再繼續展開,後面會單獨來分析這一塊的源碼,這裡涉及到netty中對緩衝區的封裝,其中涉及到一些比較複雜的邏輯。

  protected void flush0() {
        // 如果正在寫數據,直接返回
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }

        // 輸出的緩衝區
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }

        inFlush0 = true;

        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }

        try {
            // 將緩衝區的數據寫入到channel中
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                initialCloseCause = t;
                close(voidPromise(), t, newClosedChannelException(t), false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    initialCloseCause = t;
                    close(voidPromise(), t2, newClosedChannelException(t), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }

read事件和accept事件處理

乍看會比較奇怪,為什麼這兩個事件要放到一起處理呢,他們明明是不同的事件。這裡主要還是考慮到編碼的統一,因為read事件只有NioSocketChannel才會有,而accept事件只有NioServerSocketChannel才會有,所以這裡通過抽象方法,讓不同的子類去實現各自的邏輯,是的代碼結構上更統一。我們這裡看一下NioScketChannel的實現,而對於NioServerSocketChannel的實現我會在後續分析netty服務端的啟動過程時在具體講到,即ServerBootstrap的啟動過程。

NioByteUnsafe.read

總結一下這個方法的主要邏輯:

  • 首先會獲取緩衝分配器和相應的處理器RecvByteBufAllocator.Handle對象
  • 迴圈讀取數據,每次分配一個一定大小(大小可配置)的緩衝,將channel中待讀取的數據讀取到緩衝中
  • 以裝載有數據的緩衝為消息體,向channel的處理流水線(即pipeline)中觸發一個讀取的事件,讓讀取到的數據在流水線中傳播,被各個處理器處理
  • 重覆此過程,知道channel中沒有可供讀取的數據
  • 最後向pipeline中觸發一個讀取完成的事件
  • 最後還要根據最後一次讀取到的數據量決定是否關閉通道,如果最後一次讀取到的數據量小於0,說明對端已經關閉了輸出,所以這裡需要將輸入關閉,即通道處於半關閉狀態。

      public final void read() {
              final ChannelConfig config = config();
              // 如果通道已經關閉,那麼就不需要再讀取數據,直接返回
              if (shouldBreakReadReady(config)) {
                  clearReadPending();
                  return;
              }
              final ChannelPipeline pipeline = pipeline();
              // 緩衝分配器
              final ByteBufAllocator allocator = config.getAllocator();
              // 緩衝分配的處理器,處理緩衝分配,讀取計數等
              final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
              allocHandle.reset(config);
    
              ByteBuf byteBuf = null;
              boolean close = false;
              try {
                  do {
                      // 分配一個緩衝
                      byteBuf = allocHandle.allocate(allocator);
                      // 將通道的數據讀取到緩衝中
                      allocHandle.lastBytesRead(doReadBytes(byteBuf));
                      // 如果沒有讀取到數據,說明通道中沒有待讀取的數據了,
                      if (allocHandle.lastBytesRead() <= 0) {
                          // nothing was read. release the buffer.
                          // 因為沒讀取到數據,所以應該釋放緩衝
                          byteBuf.release();
                          byteBuf = null;
                          // 如果讀取到的數據量是負數,說明通道已經關閉了
                          close = allocHandle.lastBytesRead() < 0;
                          if (close) {
                              // There is nothing left to read as we received an EOF.
                              readPending = false;
                          }
                          break;
                      }
    
                      // 更新Handle內部的簿記量
                      allocHandle.incMessagesRead(1);
                      readPending = false;
                      // 向channel的處理器流水線中觸發一個事件,
                      // 讓取到的數據能夠被流水線上的各個ChannelHandler處理
                      pipeline.fireChannelRead(byteBuf);
                      byteBuf = null;
                      // 這裡根據如下條件判斷是否繼續讀:
                      // 上一次讀取到的數據量大於0,並且讀取到的數據量等於分配的緩衝的最大容量,
                      // 此時說明通道中還有待讀取的數據
                  } while (allocHandle.continueReading());
    
                  // 讀取完成
                  allocHandle.readComplete();
                  // 觸發一個讀取完成的事件
                  pipeline.fireChannelReadComplete();
    
                  if (close) {
                      closeOnRead(pipeline);
                  }
              } catch (Throwable t) {
                  handleReadException(pipeline, byteBuf, t, close, allocHandle);
              } finally {
                  // Check if there is a readPending which was not processed yet.
                  // This could be for two reasons:
                  // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                  // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                  //
                  // See https://github.com/netty/netty/issues/2254
                  // 這裡isAutoRead預設是true, 所以正常情況下會繼續監聽read事件
                  if (!readPending && !config.isAutoRead()) {
                      removeReadOp();
                  }
              }
          }
      }

總結

本篇主要分析了EventLoop的事件監聽以及處理邏輯,此外處理處理io事件,也會處理添加進來的任務和定時調度任務和延遲調度任務。EventLoop就像是整個框架的發動機或者說是心臟,它通過jdk api進而簡介地調用系統調用,不斷地監聽各種io事件,同時對不同的io事件分門別類採用不同的處理方式,對於read事件則會將網路io數據讀取到緩衝中,並將讀取到的數據傳遞給用戶的處理器進行鏈式處理。Channelpipeline就像一個流水線一樣,對觸發的的各種事件進行處理。

遺留問題

  • NioSocketChannel.doWrite方法的寫入邏輯的,待進一步分析
  • ChannelPipeline的詳細分析,各種事件是怎麼在處理器之間傳播的,設計模式,代碼結構等
  • 緩衝分配器和緩衝處理器的分析,它們是怎麼對記憶體進行管理的,這也是netty高性能的原因之一。

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 組件是可復用的Vue實例,一個組件本質上是一個擁有預定義選項的一個Vue實例,組件和組件之間通過一些屬性進行聯繫。 組件有兩種註冊方式,分別是全局註冊和局部註冊,前者通過Vue.component()註冊,後者是在創建Vue實例的時候在components屬性里指定,例如: 渲染DOM為: 其中He ...
  • vue-cli輸入命令:npm run build 即可打包vue.js的項目 打包出來後項目中就會多了一個文件夾dist,下圖為我們打包過後的項目 我們直接運行打包後的文件夾中的index.html文件,會看到網頁一片空白,f12調試,全是css,js路徑引用錯誤的問題。 解決:到config文件 ...
  • // 利用遞歸實現數組的扁平化let ary = [1, [2, [3, [4, 5]]], 6]; let result = []; let fn = function(ary){ for(let i = 0 ; i < ary.length; i++){ let item = ary[i] if ...
  • Vue.js 的核心是一個響應的數據綁定系統,它允許我們在普通 HTML 模板中使用特殊的語法將 DOM “綁定”到底層數據。被綁定的DOM 將與數據保持同步,每當數據有改動,相應的DOM視圖也會更新。基於這種特性,通過vue.js動態綁定class就變得非常簡單。 ...
  • Vue.js 2.0引入Virtual DOM,比Vue.js 1.0的初始渲染速度提升了2-4倍,並大大降低了記憶體消耗。那麼,什麼是Virtual DOM?為什麼需要Virtual DOM?它是通過什麼方式去提升頁面渲染效率的呢?這是本文所要探討的問題。 ...
  • var 如果使用關鍵字 var 聲明一個變數,那麼這個變數就屬於當前的函數作用域,如果聲明是發生在任何函數外的頂層聲明,那麼這個變數就屬於全局作用域。 let 1、let 聲明的變數具有塊作用域的特征。 2、在同一個塊級作用域,不能重覆聲明變數。 function foo(){ let a = 1; ...
  • 201-206:都表示伺服器成功處理了請求的狀態代碼,說明網頁可以正常訪問。 200:(成功) 伺服器已成功處理了請求。通常,這表示伺服器提供了請求的網頁。 201:(已創建) 請求成功且伺服器已創建了新的資源。 202:(已接受) 伺服器已接受了請求,但尚未對其進行處理。 203:(非授權信息) ...
  • 1、安裝:在命令行執行 →npm install -g nrm ,進行全局安裝 2、查看可用的源:執行命令→nrm ls,如下所示, 帶*號的是當前使用的源 PS E:\code> nrm ls * npm https://registry.npmjs.org/ cnpm http://r.cnpm ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...