天天看点

使用AIO和SEDA模型来构建可伸缩的企业应用

备注:翻译自theserverside.com的一篇文章,原文地址请见http://www.theserverside.com/tt/articles/article.tss?l=IOandSEDAModel。英文能力一般,翻译质量不是特别理想,大家将就点看吧。如有错误请帮忙指正。

正文如下:

讨论

    这篇文章展示一个解决方案,用来解决企业应用中的可伸缩性问题,这些应用必须支持即要求快速响应而又长时间运行的业务程序,吞吐量或大或小。

    让我们定义一个简单的示例场景来模拟这种情况。我们有一个前端web应用程序,通过http接收请求,然后把请求发送给不同的web service后端。web service请求的后端平台中有一个响应很慢。结果导致我们将获得一个很低的吞吐量.因为这个响应很慢的服务使得web服务器的线程池中的一个工作线程始终保持繁忙,其他请求无法得到处理。

    这里有一个解决这种情况的方案,虽然现在还没有标准化,但已经被几乎所有servlet容器以这样或者那样的方法实现:Jetty, Apache Jakarta Tomcat, and Weblogic. 这就是异步IO(asynchronous IO,or AIO).

    上面提到的解决方案中使用到的关键架构组件如下:

        1. 在servlet容器中使用异步IO

        2. 阶段化事件驱动架构模型(SEDA)

    在servlet容器中使用异步IO

    servlet容器正成为在java nio库之上实现高可伸缩性应用的良好机会——nio类库给予从每连接一线程转变为每请求一线程的能力。

    当然这些还不够,在实现Reverse Ajax-based的应用时会发生问题。目前没有机制提供servlet API来容许异步发送数据给客户端。目前Reverse Ajax有三种实现方式:

    * polling

    * piggy back

    * Comet

    当前基于Commet的实现是保持和客户端的一个打开的通道,基于某些事件发回数据。但是这打破了每请求一线程模型,在服务器端至少需要分派一个工作线程。

    在servlet容器中目前有两种实现方式:

    1.异步IO(Apache Tomcat, Bea Weblogic)——容许servlet异步处理数据

    2.continuations (延续?)(Jetty)——在Jetty6介绍的非常有趣的特性,容许挂起当前请求并释放当前线程。

    所有这些实现都有优点和缺点,而最好的实现将是所有这些实现的组合。

    我的例子基于Apache Jakarta Tomcat的实现,称为CometProcessor。这种实现将请求和应答从工作线程中解耦,从而容许工作线程稍后再完成应答。

    Staged event-driven architecture (SEDA) model

    SEDA模型是伯克利大学的Matt Welsh, David Culler和Eric Brewer推荐的一个架构设计。SEDA将应用分解为由动态资源控制器分离的不同阶段,从而容许应用程序动态调整来改变负载。

    下面你将看到基于SEDA的HTTP服务器:

使用AIO和SEDA模型来构建可伸缩的企业应用

    图片2: SEDA HTTP服务器: 基于SEDA的HTTP服务器的架构表述。应用由被队列分离的多个阶段的集合组成。箭头表述了阶段之间的事件流程。每个阶段可以被独立管理,并且阶段可以按顺序依次运行或并发运行,或者是两者的组合。时间队列的使用容许每个阶段分别load-conditioned(负载调节?).例如,设置事件队列的阀值。

    有关这个架构的更多内容可以在这个页面找到:SEDA: An Architecture for Well-Conditioned, Scalable Internet Services.

    让我们一起来看,我们的简化场景是如何映射到这个SEDA架构的。

使用AIO和SEDA模型来构建可伸缩的企业应用

    基于SEDA的应用将由七个阶段组成。当一个特定类型的请求到达时,它将被路由到正确的队列中。对应的阶段将处理这个消息,然后将应答放到应答队列中。最后数据将被发送给客户端。通过这种方法我们可以解决当请求被路由到应答缓慢的服务时阻塞其他请求处理而带来的扩展性问题。

    让我们一起来看看怎么用Mule来实现这种架构。

    Mule是一种开源Enterprise Message Bus (ESB),它的模型概念是基于SEDA模型。Mule也支持其他信息模型,但默认是SEDA模型。在这种模式下,Mule将每个组件当成一个阶段,使用自己的线程和工作队列。

    在SEDA模型中的关键组件——Incoming Event Queue(输入事件队列), Admission Controller(许可控制器), Dynamically sized Thread Pool(动态线程池), Event Handler(事件处理器)和Resource Controller(资源控制器)——被映射到Mule的服务组件。

    在Mule中,Incoming Event Queue(输入事件队列)是作为一个inbound(内部?)的路由器或者终端提供,而Event Handler(事件处理器)自身就是作为一个组件。Thus we're short of an Admission Controller, Resource Controller and a Dynamically sized Thread Pool. (be short of ?怎么翻译,sorry)

    Admission Controller(许可控制器)作为SEDA阶段和Incoming Event Queue(输入事件队列)连接,用Mule的术语说是组件。实现这种方式的最直接的方法是作为一个Inbound路由器,用于控制被注册到通道上的组件接受的事件,哪些该被处理和该如何处理。

    我们场景的逻辑流程,将在下面的图中展示如何被映射到Mule模型。图中列举的步骤如下:

    1. 客户端通过http请求下一个订单

    2. 请求被http服务器处理,在我们的案例中是Apache Jakarta Tomcat。基于http请求提供的参数,前端应用程序组合一个请求对象。在我们的场景中,我们有两个对象类型,PriceOrderRequest和StockOrderRequest。每个请求会自动生成一个关联id,并被映射到关联这个请求的应答对象中。我们将在稍后看到这个关联id将被如何用于匹配从Mule容器到原始客户端请求的应答。从现在开始,请求对象将包含这个关联id,并将在前端应用程序的所有层之间传递,当然也会穿透Mule的组件。这个请求订单,不管是PriceOrderRequest还是StockOrderRequest,将被发送到access层。在access层将有一个准备好的JMS生产者用于将这个信息加入到请求队列。现在请求订单将被Mule组件处理。被web服务器分配用来服务于我们http请求的工作线程现在被释放可以用于服务其他请求,它不需要等待我们的业务处理结束。

使用AIO和SEDA模型来构建可伸缩的企业应用

    3. 我们的请求订单现在在jms的队列中,地址是jms://requestQueue。现在处理被转移到Mule中。

    4. 基于对象类型,订单将被路由到不同的队列。在我们的案例中,我们有一个PriceOrderRequest,所以信息被路由到jms://priceOrderQueue。

    5. 通过使用Apache CXF,一个SOAP请求被生成并发送到web service容器。应答将被发送到jms://responseQueue.

    6. 同样的类似步骤4的场景发生在StockOrderRequest的案例中。

    7. 类似步骤5.

    8. JMS的消费者池监听the jms://responseQueue. 这个队列包含业务请求的应答信息。这个消息包含在步骤2中生成的关联id元数据,这将容许我们识别请求的发起者。

    9. 一旦http应答对象被识别,我们可以发送应答给客户端。

    上面流程的Mule配置信息展示如下:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < jms:activemq-connector  name ="jmsConnector"  brokerURL ="tcp://localhost:61616" />

< model  name ="Sample" >

     < service  name ="Order Service"   >             

           < inbound >

              < jms:inbound-endpoint  queue ="requestQueue" />                    

           </ inbound >                

           < component  class ="org.mule.example.Logger" />

           < outbound >

            < filtering-router >

                < jms:outbound-endpoint  queue ="priceOrderQueue"   />

                    < payload-type-filter  expectedType ="org.mule.model.PriceOrderRequest" />

              </ filtering-router >                 

              < filtering-router >      

                   < jms:outbound-endpoint  queue ="stockOrderQueue"   />                          

                     < payload-type-filter  expectedType ="org.mule.model.StockOrderRequest"   />

                 </ filtering-router >

         </ outbound >         

     </ service >

     < service  name ="stockService" >

         < inbound >

             < jms:inbound-endpoint  queue ="stockOrderQueue"  transformer-refs ="JMSToObject 

                    StockOrderRequestToServiceRequest"   />

         </ inbound >                

         < outbound >

              < chaining-router >

                  < cxf:outbound-endpoint                             

                      address ="http://localhost:8080/axis2/services/getStock"

                 clientClass ="org.axis2.service.stock.GetStock_Service"   

                  wsdlPort ="getStockHttpSoap12Endpoint"  

                  wsdlLocation ="classpath:/Stock.wsdl"  

                  operation ="getStock"   />

                  < jms:outbound-endpoint  queue ="responseQueue"  

                         transformer-refs ="ServiceResponseToStockOrderResponse ObjectToJMS" />

             </ chaining-router >                 

          </ outbound >

         < default-service-exception-strategy >

            < jms:outbound-endpoint  queue ="responseQueue"  

                   transformer-refs ="ExceptionToResponse ObjectToJMS" />

         </ default-service-exception-strategy >

    </ service >     

    < service  name ="priceService" >

        < inbound >

            < jms:inbound-endpoint  queue ="priceOrderQueue"  

                   transformer-refs ="JMSToObject PriceOrderRequestToServiceRequest" />

        </ inbound >     

        < outbound >

            < chaining-router >

                < cxf:outbound-endpoint                     

                    address ="http://localhost:8080/axis2/services/getPrice"

                   clientClass ="org.axis2.service.price.GetPrice_Service"   

                   wsdlPort ="getPriceHttpSoap12Endpoint"  

                   wsdlLocation ="classpath:/Price.wsdl"  

                   operation ="getPrice"   />

                < jms:outbound-endpoint  queue ="responseQueue"  

                       transformer-refs ="ServiceResponseToPriceOrderResponse ObjectToJMS" />

            </ chaining-router >                 

           </ outbound >

           < default-service-exception-strategy >

    这个事件驱动的架构模型有一个挑战性的问题,如何将应答和请求关联?请求被生成,业务对象被创建,并被作为jsm对象信息的负载在Mule空间中通过多个jms队列传输。这个信息被从一个队列路由到另一个,通常被用来作为到web service请求的输入。

    容许我们持续追踪信息的关键信息是来自jms规范的关联id。可以通过使用message.setJMSCorrelationID()来设置。然而如果你在jms队列中发布设置了这个属性的信息,Mule似乎会覆盖这个信息并为消息创建一个将贯穿整个流程的新的关联id。幸好还有一个内部的名为MULE_CORRELATION_ID的Mule消息属性。如果Mule发现消息的这个属性被设置,它将被用于穿越流程中所有的组件,另外如果关联id没有被设置,MULE_CORRELATION_ID属性的值还将被作为关联id的值使用。

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->

conn = getConnection();

session = conn.createSession( false , Session.AUTO_ACKNOWLEDGE);

producer =  session.createProducer(getDestination(Constants.JMS_DESTINATION_REQUEST_QUEUE));

jmsMessage = session.createObjectMessage();

jmsMessage.setObject(request);

            jmsMessage.setStringProperty(Constants.PROPS_MULE_CORRELATION_ID, request.getCorrelationID());

producer.send(jmsMessage);

    所以每个请求必须在对应的业务对象被发送到Mule入口(一个jms对象)前生成一个唯一的关联id。

    一个可行的方法是生成一个UUID用做关联id,同样将UUID映射到CometProcessor接口中的事件方法提供的被包裹为CometEvent对象的HttpServletResponse对象。

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->

public   class  IdentityCreator  extends  MethodInterceptorAspect{

     protected   void  beforeInvoke(MethodInvocation method){        

        Object[] args = method.getArguments();

        HttpServletRequest httpRequest = ((CometEvent)args[ 0 ]).getHttpServletRequest();

        String uuid = UuidFactory.getUuid();

        httpRequest.setAttribute(Constants.PROPS_MULE_CORRELATION_ID, uuid);

        HttpResponseManager.getInstance().saveResponse(uuid, ((CometEvent)args[ 0 ]).getHttpServletResponse());

    }

     protected   void  afterInvoke(MethodInvocation method){

         return ;

    }

    @Override

     public   void  afterExceptionInvoke(MethodInvocation method)  throws  Throwable {        

        Object[] args = method.getArguments();

        HttpServletRequest httpRequest = ((CometEvent)args[ 0 ]).getHttpServletRequest();

        String uuid = (String)httpRequest.getAttribute(Constants.PROPS_MULE_CORRELATION_ID);

         if  (uuid != null ) HttpResponseManager.getInstance().removeResponse(uuid);         

    }

}

    当应答消息返回时,我们所需要做的只是从jms消息属性中获取关联对象的值,查找对象的HttpServletResponse对象,然后发送应答给客户端。

    测试

    一些测试可以提供我们这个架构优点的清晰见解。使用Apache JMeter,每个案例都执行一个测试,一个架构使用异步servlet和SEDA模型,另一个架构不使用这个模型。测试运行了1个小时,每秒10个线程,两种类型的请求交互使用。为了这些测试,我们分配了总共6个工作线程。在没有扩展性提升的案例中,所有6个线程都被Tomcat的线程池占用。

使用AIO和SEDA模型来构建可伸缩的企业应用
使用AIO和SEDA模型来构建可伸缩的企业应用

    可以非常清楚的看到,吞吐量(绿线)是如何下降到大概 23 请求每分钟的。

使用AIO和SEDA模型来构建可伸缩的企业应用

    现在让我们在我们的组件中分配这6个线程。每个组件分配一个单一线程。

    在Jakarta Tomcat中,server.xml配置文件中的下面这些行需要修改:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < Executor  name ="tomcatThreadPool"  namePrefix ="catalina-exec-"  

        maxThreads ="1"  minSpareThreads ="0" />

    在Mule的案例中,需要在Mule配置文件中为每个服务组件在service标签中增加以下行:

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> < component-threading-profile 

     maxThreadsActive ="1"  maxThreadsIdle ="0"  poolExhaustedAction ="RUN"  

    maxBufferSize ="20"  threadWaitTimeout ="300" />

    异步和SEDA模型架构的测试在下面可以看到。吞吐量在23请求每分钟保持不变。

    如果我们运行性能测试超过1小时,第一个案例的吞吐量还将继续下降,但是第二个案例依然将保持同样的值。

使用AIO和SEDA模型来构建可伸缩的企业应用
使用AIO和SEDA模型来构建可伸缩的企业应用