天天看點

死磕Tomcat系列(2)——EndPoint源碼解析死磕Tomcat系列(2)——EndPoint源碼解析

死磕Tomcat系列(2)——EndPoint源碼解析

在上一節中我們描述了Tomcat的整體架構,我們知道了Tomcat分為兩個大元件,一個連接配接器和一個容器。而我們這次要講的

EndPoint

的元件就是屬于連接配接器裡面的。它是一個通信的端點,就是負責對外實作TCP/IP協定。

EndPoint

是個接口,它的具體實作類就是

AbstractEndpoint

,而

AbstractEndpoint

具體的實作類就有

AprEndpoint

Nio2Endpoint

NioEndpoint

  • AprEndpoint

    :對應的是APR模式,簡單了解就是從作業系統級别解決異步IO的問題,大幅度提高伺服器的處理和響應性能。但是啟用這種模式需要安裝一些其他的依賴庫。
  • Nio2Endpoint

    :利用代碼來實作異步IO
  • NioEndpoint

    :利用了JAVA的NIO實作了非阻塞IO,Tomcat預設啟動是以這個來啟動的,而這個也是我們的講述重點。

NioEndpoint中重要的元件

我們知道

NioEndpoint

的原理還是對于Linux的多路複用器的使用,而在多路複用器中簡單來說就兩個步驟。

  1. 建立一個Selector,在它身上注冊各種Channel,然後調用select方法,等待通道中有感興趣的事件發生。
  2. 如果有感興趣的事情發生了,例如是讀事件,那麼就将資訊從通道中讀取出來。

NioEndpoint

為了實作上面這兩步,用了五個元件來。這五個元件是

LimitLatch

Acceptor

Poller

SocketProcessor

Executor

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile LimitLatch connectionLimitLatch = null;
/**
 * The socket pollers. 
 */
private Poller[] pollers = null;

内部類

SocketProcessor

/**
 * External Executor based thread pool.
 */
private Executor executor = null;
           

我們可以看到在代碼中定義的這五個元件。具體這五個元件是幹嘛的呢?

  • LimitLatch

    :連接配接控制器,負責控制最大的連接配接數
  • Acceptor

    :負責接收新的連接配接,然後傳回一個

    Channel

    對象給

    Poller

  • Poller

    :可以将其看成是NIO中

    Selector

    ,負責監控

    Channel

    的狀态
  • SocketProcessor

    :可以看成是一個被封裝的任務類
  • Executor

    :Tomcat自己擴充的線程池,用來執行任務類

用圖簡單表示就是以下的關系

接下來我們就來分别的看一下每個元件裡面關鍵的代碼

LimitLatch

我們上面說了

LimitLatch

主要是用來控制Tomcat所能接收的最大數量連接配接,如果超過了此連接配接,那麼Tomcat就會将此連接配接線程阻塞等待,等裡面有其他連接配接釋放了再消費此連接配接。那麼

LimitLatch

是如何做到呢?我們可以看

LimitLatch

這個類

public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    //目前連接配接數
    private final AtomicLong count;
    //最大連接配接數
    private volatile long limit;
    private volatile boolean released = false;
}
           

我們可以看到它内部實作了

AbstractQueuedSynchronizer

,AQS其實就是一個架構,實作它的類可以自定義控制線程什麼時候挂起什麼時候釋放。

limit

參數就是控制的最大連接配接數。我們可以看到

AbstractEndpoint

調用

LimitLatch

countUpOrAwait

方法來判斷是否能擷取連接配接。

public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }           

AQS是如何知道什麼時候阻塞線程呢?即不能擷取連接配接呢?這些就靠使用者自己實作

AbstractQueuedSynchronizer

自己來定義什麼時候擷取連接配接,什麼時候釋放連接配接了。可以看到Sync類重寫了

tryAcquireShared

tryReleaseShared

方法。在

tryAcquireShared

方法中定義了一旦目前連接配接數大于了設定的最大連接配接數,那麼就會傳回

-1

表示将此線程放入AQS隊列中等待。

Acceptor

Acceptor

是接收連接配接的,我們可以看到

Acceptor

實作了

Runnable

接口,那麼在哪會新開啟線程來執行

Acceptor

的run方法呢?在

AbstractEndpoint

startAcceptorThreads

方法中。

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}
           

可以看到這裡可以設定開啟幾個

Acceptor

,預設是一個。而一個端口隻能對應一個

ServerSocketChannel

,那麼這個

ServerSocketChannel

在哪初始化呢?我們可以看到在

Acceptor<U> acceptor = new Acceptor<>(this);

這句話中傳入了this進去,那麼應該是由

Endpoint

元件初始化的連接配接。在

NioEndpoint

initServerSocket

方法中初始化了連接配接。

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}
           

這裡面我們能夠看到兩點

  1. 在bind方法中的第二個參數表示作業系統的等待隊列長度,即Tomcat不再接受連接配接時(達到了設定的最大連接配接數),但是在作業系統層面還是能夠接受連接配接的,此時就将此連接配接資訊放入等待隊列,那麼這個隊列的大小就是此參數設定的。
  2. ServerSocketChannel

    被設定成了阻塞的模式,也就是說是以阻塞方式接受連接配接的。或許會有疑問。在平時的NIO程式設計中Channel不是都要設定成非阻塞模式嗎?這裡解釋一下,如果是設定成非阻塞模式那麼就必須設定一個

    Selector

    不斷的輪詢,但是接受連接配接隻需要阻塞一個通道即可。

這裡需要注意一點,每個

Acceptor

在生成

PollerEvent

對象放入

Poller

隊列中時都是随機取出

Poller

對象的,具體代碼可以看如下,是以

Poller

中的

Queue

對象設定成了

SynchronizedQueue<PollerEvent>

,因為可能有多個

Acceptor

同時向此

Poller

的隊列中放入

PollerEvent

對象。

public Poller getPoller0() {
    if (pollerThreadCount == 1) {
        return pollers[0];
    } else {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
}
           
什麼是作業系統級别的連接配接呢?在TCP的三次握手中,系統通常會每一個LISTEN狀态的Socket維護兩個隊列,一個是半連接配接隊列(SYN):這些連接配接已經收到用戶端SYN;另一個是全連接配接隊列(ACCEPT):這些連結已經收到用戶端的ACK,完成了三次握手,等待被應用調用accept方法取走使用。

所有的

Acceptor

共用這一個連接配接,在

Acceptor

run

方法中,放一些重要的代碼。

@Override
    public void run() {
        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {
            try {
                //如果到了最大連接配接數,線程等待
                endpoint.countUpOrAwaitConnection();
                U socket = null;
                try {
                    //調用accept方法獲得一個連接配接
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // 出異常以後目前連接配接數減掉1
                    endpoint.countDownConnection();
                }
                // 配置Socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
    }
           

裡面我們可以得到兩點

  1. 運作時會先判斷是否到達了最大連接配接數,如果到達了那麼就阻塞線程等待,裡面調用的就是

    LimitLatch

    元件判斷的。
  2. 最重要的就是配置socket這一步了,是

    endpoint.setSocketOptions(socket)

    這段代碼
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            // 設定Socket為非阻塞模式,供Poller調用
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            //注冊ChannelEvent,其實是将ChannelEvent放入到隊列中,然後Poller從隊列中取
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }
           

其實裡面重要的就是将

Acceptor

與一個

Poller

綁定起來,然後兩個元件通過隊列通信,每個Poller都維護着一個

SynchronizedQueue

隊列,

ChannelEvent

放入到隊列中,然後

Poller

從隊列中取出事件進行消費。

Poller

我們可以看到

Poller

NioEndpoint

的内部類,而它也是實作了

Runnable

接口,可以看到在其類中維護了一個Quene和Selector,定義如下。是以本質上

Poller

就是

Selector

private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();           

重點在其run方法中,這裡删減了一些代碼,隻展示重要的。

@Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                boolean hasEvents = false;
                try {
                    if (!close) {
                        //檢視是否有連接配接進來,如果有就将Channel注冊進Selector中
                        hasEvents = events();
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }
           

其中主要的就是調用了

events()

方法,就是不斷的檢視隊列中是否有

Pollerevent

事件,如果有的話就将其取出然後把裡面的

Channel

取出來注冊到該

Selector

中,然後不斷輪詢所有注冊過的

Channel

檢視是否有事件發生。

SocketProcessor

Poller

在輪詢

Channel

有事件發生時,就會調用将此事件封裝起來,然後交給線程池去執行。那麼這個包裝類就是

SocketProcessor

。而我們打開此類,能夠看到它也實作了

Runnable

接口,用來定義線程池

Executor

中線程所執行的任務。那麼這裡是如何将

Channel

中的位元組流轉換為Tomcat需要的

ServletRequest

對象呢?其實就是調用了

Http11Processor

來進行位元組流與對象的轉換的。

Executor

Executor

其實是Tomcat定制版的線程池。我們可以看它的類的定義,可以發現它其實是擴充了Java的線程池。

public interface Executor extends java.util.concurrent.Executor, Lifecycle
           

線上程池中最重要的兩個參數就是核心線程數和最大線程數,正常的Java線程池的執行流程是這樣的。

  1. 如果目前線程小于核心線程數,那麼來一個任務就建立一個線程。
  2. 如果目前線程大于核心線程數,那麼就再來任務就将任務放入到任務隊列中。所有線程搶任務。
  3. 如果隊列滿了,那麼就開始建立臨時線程。
  4. 如果總線程數到了最大的線程數并且隊列也滿了,那麼就抛出異常。

但是在Tomcat自定義的線程池中是不一樣的,通過重寫了

execute

方法實作了自己的任務處理邏輯。

  1. 如果總線程數到了最大的線程數,再次獲得任務隊列,再嘗試一次将任務加入隊列中。
  2. 如果此時還是滿的,就抛異常。

差别就在于第四步的差别,原生線程池的處理政策是隻要目前線程數大于最大線程數,那麼就抛異常,而Tomcat的則是如果目前線程數大于最大線程數,就再嘗試一次,如果還是滿的才會抛異常。下面是定制化線程池

execute

的執行邏輯。

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if (super.getQueue() instanceof TaskQueue) {
            //獲得任務隊列
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }

    }
}           

在代碼中,我們可以看到有這麼一句

submittedCount.incrementAndGet();

,為什麼會有這句呢?我們可以看看這個參數的定義。簡單來說這個參數就是定義了任務已經送出到了線程池中,但是還沒有執行的任務個數。

/**
 * The number of tasks submitted but not yet finished. This includes tasks
 * in the queue and tasks that have been handed to a worker thread but the
 * latter did not start executing the task yet.
 * This number is always greater or equal to {@link #getActiveCount()}.
 */
private final AtomicInteger submittedCount = new AtomicInteger(0);
           

為什麼會有這麼一個參數呢?我們知道定制的隊列是繼承了

LinkedBlockingQueue

LinkedBlockingQueue

隊列預設是沒有邊界的。于是我們就傳入了一個參數,

maxQueueSize

給構造的隊列。但是在Tomcat的任務隊列預設情況下是無限制的,那麼這樣就會出一個問題,如果目前線程達到了核心線程數,則開始向隊列中添加任務,那麼就會一直是添加成功的。那麼就不會再建立新的線程。那麼在什麼情況下要建立線程呢?

線程池中建立新線程會有兩個地方,一個是小于核心線程時,來一個任務建立一個線程。另一個是超過核心線程并且任務隊列已滿,則會建立臨時線程。

那麼如何規定任務隊列是否已滿呢?如果設定了隊列的最大長度當然好了,但是Tomcat預設情況下是沒有設定,是以預設是無限的。是以Tomcat的

TaskQueue

繼承了

LinkedBlockingQueue

,重寫了

offer

方法,在裡面定義了什麼時候傳回false。

@Override
public boolean offer(Runnable o) {
    if (parent==null) return super.offer(o);
    //如果目前線程數等于最大線程數,此時不能建立新線程,隻能添加進任務隊列中
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //如果已送出但是未完成的任務數小于等于目前線程數,說明能處理過來,就放入隊列中
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //到這一步說明,已送出但是未完成的任務數大于目前線程數,如果目前線程數小于最大線程數,就傳回false建立線程
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    return super.offer(o);
}
           

這就是

submittedCount

的意義,目的就是為了在任務隊列長度無限的情況下,讓線程池有機會建立新的線程。

總結

上面的知識有部分是看着李号雙老師的深入拆解Tomcat總結的,又結合着源碼深入了解了一下,當時剛看文章的時候覺得自己都懂了,但是再深入源碼的時候又會發現自己不懂。是以知識如果隻是看了而不運用,那麼知識永遠都不會是自己的。通過Tomcat連接配接器這一小塊的源碼學習,除了一些常用知識的實際運用,例如AQS、鎖的應用、自定義線程池需要考慮的點、NIO的應用等等。還有總體上的設計思維的學習,子產品化設計,和如今的微服務感覺很相似,将一個功能點内部分為多種子產品,這樣無論是在以後替換或者是更新時都能遊刃有餘。

往期文章

如何斷點調試Tomcat源碼

死磕Tomcat系列(1)——整體架構

一次奇怪的StackOverflowError問題查找之旅

徒手撸一個簡單的RPC架構

徒手撸一個簡單的RPC架構(2)——項目改造

徒手撸一個簡單的IOC

繼續閱讀