天天看點

解析Tomcat處理請求的類Connector<一>

       Connector類的相關配置在Tomcat的安裝目錄conf下的Server.xml檔案裡,我這次主要解析采用NIO方式處理請求的情況.在Server.xml的配置如下:

<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
               connectionTimeout="20000"
               redirectPort="8443" />      

在tomcat啟動的時候,會調用Connector類的Start()方法,根據以上配置,Connector的start方法裡會調用Http11NioProtocol類的start()方法,如下:

try {
            protocolHandler.start();
        } catch (Exception e) {
            String errPrefix = "";
            if(this.service != null) {
                errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
            }

            throw new LifecycleException
                (errPrefix + " " + sm.getString
                 ("coyoteConnector.protocolHandlerStartFailed", e));
        }
           

   Http11NioProtocol類的start()方法又會調用NioEndpoint類的start()方法,如下:

try {
            ep.start();
        } catch (Exception ex) {
            log.error(sm.getString("http11protocol.endpoint.starterror"), ex);
            throw ex;
        }
           

    NioEndpoint類的start()方法如下:

public void start()
        throws Exception {
        // Initialize socket if not done before
        if (!initialized) {
            init();
        }
        if (!running) {
            running = true;
            paused = false;
            
            // Create worker collection
            if (getUseExecutor()) {
                if ( executor == null ) {
                    TaskQueue taskqueue = new TaskQueue();
                    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
                    taskqueue.setParent( (ThreadPoolExecutor) executor, this);
                }
            } else if ( executor == null ) {//avoid two thread pools being created
                workers = new WorkerStack(maxThreads);
            }

            // Start poller threads輪詢線程的個數,預設等譯cpu的個數
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            // Start acceptor threads
            for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(daemon);
                acceptorThread.start();
            }
        }
    }
           

    該方式初始化了處理接受Sockt的線程Acceptor,輪詢sockt的線程Poller,真正處理socket的線程池executor 或workers.Acceptor類的run方法如下:

public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                // Loop if endpoint is paused
                while (paused) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                try {
                    // Accept the next incoming connection from the server socket
                    SocketChannel socket = serverSock.accept();
                    // Hand this socket off to an appropriate processor
                    //TODO FIXME - this is currently a blocking call, meaning we will be blocking
                    //further accepts until there is a thread available.
                    if ( running && (!paused) && socket != null ) {
                        //processSocket(socket);
                        if (!setSocketOptions(socket)) {//把sockt交給poller數組
                            try {
                                socket.socket().close();
                                socket.close();
                            } catch (IOException ix) {
                                if (log.isDebugEnabled())
                                    log.debug("", ix);
                            }
                        } 
                    }
                }catch (SocketTimeoutException sx) {
                    //normal condition
                }catch ( IOException x ) {
                    if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){}
                        }catch (Throwable letsHopeWeDontGetHere){}
                    }
                } catch (Throwable t) {
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }//while
        }//run
           

      setSocketOptions(Socket socket)如下:

/**
     * Process the specified connection.
     */
    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = nioChannels.poll();
            if ( channel == null ) {
                // SSL setup
                if (sslContext != null) {
                    SSLEngine engine = createSSLEngine();
                    int appbufsize = engine.getSession().getApplicationBufferSize();
                    NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
                                                                       Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
                                                                       socketProperties.getDirectBuffer());
                    channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
                } else {
                    // normal tcp setup
                    NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                                       socketProperties.getAppWriteBufSize(),
                                                                       socketProperties.getDirectBuffer());

                    channel = new NioChannel(socket, bufhandler);
                }
            } else {                
                channel.setIOChannel(socket);
                if ( channel instanceof SecureNioChannel ) {
                    SSLEngine engine = createSSLEngine();
                    ((SecureNioChannel)channel).reset(engine);
                } else {
                    channel.reset();
                }
            }
            //将socket注冊到Poller的Selector上
            getPoller0().register(channel);
        } catch (Throwable t) {
            try {
                log.error("",t);
            }catch ( Throwable tt){}
            // Tell to close the socket
            return false;
        }
        return true;
    }
           

 getPoller0()方法如下:

public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
           

 Poller的register()方法如下:

public void register(final NioChannel socket)
        {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment();
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);//将sockt注冊到Poller的隊列中
        }
           

  addEvent()方法如下:

public void addEvent(Runnable event) {
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
           

  Poller的run()方法如下:

public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                try {
                    // Loop if endpoint is paused
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                    boolean hasEvents = false;

                    hasEvents = (hasEvents | events());//往Selector注冊Socket事件
                    // Time to terminate?
                    if (close) {
                        timeout(0, false);
                        break;
                    }
                    int keyCount = 0;
                    try {
                        if ( !close ) {
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            timeout(0, false);
                            selector.close(); 
                            break; 
                        }
                    } catch ( NullPointerException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch ( CancelledKeyException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch (Throwable x) {
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = (SelectionKey) iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            processKey(sk, attachment);//将Socket交由線程池executor 或workers處理
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){}
                    }
                }
            }//while
            synchronized (this) {
                this.notifyAll();
            }
            stopLatch.countDown();

        }
           

       後續就是處理sockt請求,傳回處理結果到浏覽器端