天天看點

使用AIO和SEDA模型來建構可伸縮的企業應用

備注:翻譯自theserverside.com的一篇文章,原文位址請見http://www.theserverside.com/tt/articles/article.tss?l=IOandSEDAModel。英文能力一般,翻譯品質不是特别理想,大家将就點看吧。如有錯誤請幫忙指正。

正文如下:

讨論

    這篇文章展示一個解決方案,用來解決企業應用中的可伸縮性問題,這些應用必須支援即要求快速響應而又長時間運作的業務程式,吞吐量或大或小。

    讓我們定義一個簡單的示例場景來模拟這種情況。我們有一個前端web應用程式,通過http接收請求,然後把請求發送給不同的web service後端。web service請求的後端平台中有一個響應很慢。結果導緻我們将獲得一個很低的吞吐量.因為這個響應很慢的服務使得web伺服器的線程池中的一個工作線程始終保持繁忙,其他請求無法得到處理。

    這裡有一個解決這種情況的方案,雖然現在還沒有标準化,但已經被幾乎所有servlet容器以這樣或者那樣的方法實作:Jetty, Apache Jakarta Tomcat, and Weblogic. 這就是異步IO(asynchronous IO,or AIO).

    上面提到的解決方案中使用到的關鍵架構元件如下:

        1. 在servlet容器中使用異步IO

        2. 階段化事件驅動架構模型(SEDA)

    在servlet容器中使用異步IO

    servlet容器正成為在java nio庫之上實作高可伸縮性應用的良好機會——nio類庫給予從每連接配接一線程轉變為每請求一線程的能力。

    當然這些還不夠,在實作Reverse Ajax-based的應用時會發生問題。目前沒有機制提供servlet API來容許異步發送資料給用戶端。目前Reverse Ajax有三種實作方式:

    * polling

    * piggy back

    * Comet

    目前基于Commet的實作是保持和用戶端的一個打開的通道,基于某些事件發回資料。但是這打破了每請求一線程模型,在伺服器端至少需要分派一個工作線程。

    在servlet容器中目前有兩種實作方式:

    1.異步IO(Apache Tomcat, Bea Weblogic)——容許servlet異步處理資料

    2.continuations (延續?)(Jetty)——在Jetty6介紹的非常有趣的特性,容許挂起目前請求并釋放目前線程。

    所有這些實作都有優點和缺點,而最好的實作将是所有這些實作的組合。

    我的例子基于Apache Jakarta Tomcat的實作,稱為CometProcessor。這種實作将請求和應答從工作線程中解耦,進而容許工作線程稍後再完成應答。

    Staged event-driven architecture (SEDA) model

    SEDA模型是伯克利大學的Matt Welsh, David Culler和Eric Brewer推薦的一個架構設計。SEDA将應用分解為由動态資源控制器分離的不同階段,進而容許應用程式動态調整來改變負載。

    下面你将看到基于SEDA的HTTP伺服器:

使用AIO和SEDA模型來建構可伸縮的企業應用

    圖檔2: SEDA HTTP伺服器: 基于SEDA的HTTP伺服器的架構表述。應用由被隊列分離的多個階段的集合組成。箭頭表述了階段之間的事件流程。每個階段可以被獨立管理,并且階段可以按順序依次運作或并發運作,或者是兩者的組合。時間隊列的使用容許每個階段分别load-conditioned(負載調節?).例如,設定事件隊列的閥值。

    有關這個架構的更多内容可以在這個頁面找到:SEDA: An Architecture for Well-Conditioned, Scalable Internet Services.

    讓我們一起來看,我們的簡化場景是如何映射到這個SEDA架構的。

使用AIO和SEDA模型來建構可伸縮的企業應用

    基于SEDA的應用将由七個階段組成。當一個特定類型的請求到達時,它将被路由到正确的隊列中。對應的階段将處理這個消息,然後将應答放到應答隊列中。最後資料将被發送給用戶端。通過這種方法我們可以解決當請求被路由到應答緩慢的服務時阻塞其他請求處理而帶來的擴充性問題。

    讓我們一起來看看怎麼用Mule來實作這種架構。

    Mule是一種開源Enterprise Message Bus (ESB),它的模型概念是基于SEDA模型。Mule也支援其他資訊模型,但預設是SEDA模型。在這種模式下,Mule将每個元件當成一個階段,使用自己的線程和工作隊列。

    在SEDA模型中的關鍵元件——Incoming Event Queue(輸入事件隊列), Admission Controller(許可控制器), Dynamically sized Thread Pool(動态線程池), Event Handler(事件處理器)和Resource Controller(資源控制器)——被映射到Mule的服務元件。

    在Mule中,Incoming Event Queue(輸入事件隊列)是作為一個inbound(内部?)的路由器或者終端提供,而Event Handler(事件處理器)自身就是作為一個元件。Thus we're short of an Admission Controller, Resource Controller and a Dynamically sized Thread Pool. (be short of ?怎麼翻譯,sorry)

    Admission Controller(許可控制器)作為SEDA階段和Incoming Event Queue(輸入事件隊列)連接配接,用Mule的術語說是元件。實作這種方式的最直接的方法是作為一個Inbound路由器,用于控制被注冊到通道上的元件接受的事件,哪些該被處理和該如何處理。

    我們場景的邏輯流程,将在下面的圖中展示如何被映射到Mule模型。圖中列舉的步驟如下:

    1. 用戶端通過http請求下一個訂單

    2. 請求被http伺服器處理,在我們的案例中是Apache Jakarta Tomcat。基于http請求提供的參數,前端應用程式組合一個請求對象。在我們的場景中,我們有兩個對象類型,PriceOrderRequest和StockOrderRequest。每個請求會自動生成一個關聯id,并被映射到關聯這個請求的應答對象中。我們将在稍後看到這個關聯id将被如何用于比對從Mule容器到原始用戶端請求的應答。從現在開始,請求對象将包含這個關聯id,并将在前端應用程式的所有層之間傳遞,當然也會穿透Mule的元件。這個請求訂單,不管是PriceOrderRequest還是StockOrderRequest,将被發送到access層。在access層将有一個準備好的JMS生産者用于将這個資訊加入到請求隊列。現在請求訂單将被Mule元件處理。被web伺服器配置設定用來服務于我們http請求的工作線程現在被釋放可以用于服務其他請求,它不需要等待我們的業務處理結束。

使用AIO和SEDA模型來建構可伸縮的企業應用

    3. 我們的請求訂單現在在jms的隊列中,位址是jms://requestQueue。現在處理被轉移到Mule中。

    4. 基于對象類型,訂單将被路由到不同的隊列。在我們的案例中,我們有一個PriceOrderRequest,是以資訊被路由到jms://priceOrderQueue。

    5. 通過使用Apache CXF,一個SOAP請求被生成并發送到web service容器。應答将被發送到jms://responseQueue.

    6. 同樣的類似步驟4的場景發生在StockOrderRequest的案例中。

    7. 類似步驟5.

    8. JMS的消費者池監聽the jms://responseQueue. 這個隊列包含業務請求的應答資訊。這個消息包含在步驟2中生成的關聯id中繼資料,這将容許我們識别請求的發起者。

    9. 一旦http應答對象被識别,我們可以發送應答給用戶端。

    上面流程的Mule配置資訊展示如下:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < jms:activemq-connector  name ="jmsConnector"  brokerURL ="tcp://localhost:61616" />

< model  name ="Sample" >

     < service  name ="Order Service"   >             

           < inbound >

              < jms:inbound-endpoint  queue ="requestQueue" />                    

           </ inbound >                

           < component  class ="org.mule.example.Logger" />

           < outbound >

            < filtering-router >

                < jms:outbound-endpoint  queue ="priceOrderQueue"   />

                    < payload-type-filter  expectedType ="org.mule.model.PriceOrderRequest" />

              </ filtering-router >                 

              < filtering-router >      

                   < jms:outbound-endpoint  queue ="stockOrderQueue"   />                          

                     < payload-type-filter  expectedType ="org.mule.model.StockOrderRequest"   />

                 </ filtering-router >

         </ outbound >         

     </ service >

     < service  name ="stockService" >

         < inbound >

             < jms:inbound-endpoint  queue ="stockOrderQueue"  transformer-refs ="JMSToObject 

                    StockOrderRequestToServiceRequest"   />

         </ inbound >                

         < outbound >

              < chaining-router >

                  < cxf:outbound-endpoint                             

                      address ="http://localhost:8080/axis2/services/getStock"

                 clientClass ="org.axis2.service.stock.GetStock_Service"   

                  wsdlPort ="getStockHttpSoap12Endpoint"  

                  wsdlLocation ="classpath:/Stock.wsdl"  

                  operation ="getStock"   />

                  < jms:outbound-endpoint  queue ="responseQueue"  

                         transformer-refs ="ServiceResponseToStockOrderResponse ObjectToJMS" />

             </ chaining-router >                 

          </ outbound >

         < default-service-exception-strategy >

            < jms:outbound-endpoint  queue ="responseQueue"  

                   transformer-refs ="ExceptionToResponse ObjectToJMS" />

         </ default-service-exception-strategy >

    </ service >     

    < service  name ="priceService" >

        < inbound >

            < jms:inbound-endpoint  queue ="priceOrderQueue"  

                   transformer-refs ="JMSToObject PriceOrderRequestToServiceRequest" />

        </ inbound >     

        < outbound >

            < chaining-router >

                < cxf:outbound-endpoint                     

                    address ="http://localhost:8080/axis2/services/getPrice"

                   clientClass ="org.axis2.service.price.GetPrice_Service"   

                   wsdlPort ="getPriceHttpSoap12Endpoint"  

                   wsdlLocation ="classpath:/Price.wsdl"  

                   operation ="getPrice"   />

                < jms:outbound-endpoint  queue ="responseQueue"  

                       transformer-refs ="ServiceResponseToPriceOrderResponse ObjectToJMS" />

            </ chaining-router >                 

           </ outbound >

           < default-service-exception-strategy >

    這個事件驅動的架構模型有一個挑戰性的問題,如何将應答和請求關聯?請求被生成,業務對象被建立,并被作為jsm對象資訊的負載在Mule空間中通過多個jms隊列傳輸。這個資訊被從一個隊列路由到另一個,通常被用來作為到web service請求的輸入。

    容許我們持續追蹤資訊的關鍵資訊是來自jms規範的關聯id。可以通過使用message.setJMSCorrelationID()來設定。然而如果你在jms隊列中釋出設定了這個屬性的資訊,Mule似乎會覆寫這個資訊并為消息建立一個将貫穿整個流程的新的關聯id。幸好還有一個内部的名為MULE_CORRELATION_ID的Mule消息屬性。如果Mule發現消息的這個屬性被設定,它将被用于穿越流程中所有的元件,另外如果關聯id沒有被設定,MULE_CORRELATION_ID屬性的值還将被作為關聯id的值使用。

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->

conn = getConnection();

session = conn.createSession( false , Session.AUTO_ACKNOWLEDGE);

producer =  session.createProducer(getDestination(Constants.JMS_DESTINATION_REQUEST_QUEUE));

jmsMessage = session.createObjectMessage();

jmsMessage.setObject(request);

            jmsMessage.setStringProperty(Constants.PROPS_MULE_CORRELATION_ID, request.getCorrelationID());

producer.send(jmsMessage);

    是以每個請求必須在對應的業務對象被發送到Mule入口(一個jms對象)前生成一個唯一的關聯id。

    一個可行的方法是生成一個UUID用做關聯id,同樣将UUID映射到CometProcessor接口中的事件方法提供的被包裹為CometEvent對象的HttpServletResponse對象。

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->

public   class  IdentityCreator  extends  MethodInterceptorAspect{

     protected   void  beforeInvoke(MethodInvocation method){        

        Object[] args = method.getArguments();

        HttpServletRequest httpRequest = ((CometEvent)args[ 0 ]).getHttpServletRequest();

        String uuid = UuidFactory.getUuid();

        httpRequest.setAttribute(Constants.PROPS_MULE_CORRELATION_ID, uuid);

        HttpResponseManager.getInstance().saveResponse(uuid, ((CometEvent)args[ 0 ]).getHttpServletResponse());

    }

     protected   void  afterInvoke(MethodInvocation method){

         return ;

    }

    @Override

     public   void  afterExceptionInvoke(MethodInvocation method)  throws  Throwable {        

        Object[] args = method.getArguments();

        HttpServletRequest httpRequest = ((CometEvent)args[ 0 ]).getHttpServletRequest();

        String uuid = (String)httpRequest.getAttribute(Constants.PROPS_MULE_CORRELATION_ID);

         if  (uuid != null ) HttpResponseManager.getInstance().removeResponse(uuid);         

    }

}

    當應答消息傳回時,我們所需要做的隻是從jms消息屬性中擷取關聯對象的值,查找對象的HttpServletResponse對象,然後發送應答給用戶端。

    測試

    一些測試可以提供我們這個架構優點的清晰見解。使用Apache JMeter,每個案例都執行一個測試,一個架構使用異步servlet和SEDA模型,另一個架構不使用這個模型。測試運作了1個小時,每秒10個線程,兩種類型的請求互動使用。為了這些測試,我們配置設定了總共6個工作線程。在沒有擴充性提升的案例中,所有6個線程都被Tomcat的線程池占用。

使用AIO和SEDA模型來建構可伸縮的企業應用
使用AIO和SEDA模型來建構可伸縮的企業應用

    可以非常清楚的看到,吞吐量(綠線)是如何下降到大概 23 請求每分鐘的。

使用AIO和SEDA模型來建構可伸縮的企業應用

    現在讓我們在我們的元件中配置設定這6個線程。每個元件配置設定一個單一線程。

    在Jakarta Tomcat中,server.xml配置檔案中的下面這些行需要修改:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < Executor  name ="tomcatThreadPool"  namePrefix ="catalina-exec-"  

        maxThreads ="1"  minSpareThreads ="0" />

    在Mule的案例中,需要在Mule配置檔案中為每個服務元件在service标簽中增加以下行:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < component-threading-profile 

     maxThreadsActive ="1"  maxThreadsIdle ="0"  poolExhaustedAction ="RUN"  

    maxBufferSize ="20"  threadWaitTimeout ="300" />

    異步和SEDA模型架構的測試在下面可以看到。吞吐量在23請求每分鐘保持不變。

    如果我們運作性能測試超過1小時,第一個案例的吞吐量還将繼續下降,但是第二個案例依然将保持同樣的值。

使用AIO和SEDA模型來建構可伸縮的企業應用
使用AIO和SEDA模型來建構可伸縮的企業應用