天天看点

Camel之AsyncProcessor

   Camel支持一种更复杂的异步的处理模型,异步处理器实现一个继承自Processor接口的AsyncProcessor接口,使用异步Processor的优点:

a.异步Processor不会因等待阻塞调用而耗尽线程,这样在处理同样工作量的情况下,通过减少线程的数量可以增加系统的伸缩性

b.使用异步Processor,可以将路由分阶段处理,不同的线程池处理其相应的路由阶段,这就意味着路由可以并行处理。

缺点:实现异步的Processor要比同步的Processor复杂得多。

异步Processor与同步Processor的区别:

a.必须提供一个AsyncCallback对象,该对象在exchange处理完成后被通知

b.在异步Processor处理exchange的时候不能抛出任何异常,而应该将异常存储在exchange的Exception属性中

c.异步Processor必须知道它将以什么方式完成处理,异步或同步,如果process方法返回true,则是同步完成,如果process方法返回false,则是异步完成。

d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法,sync参数必须与process方法的返回值一致。

对于一个路由来说,完全使用异步模式可以降低线程的使用量,这要求从Consumer开始就必须使用异步的处理API(即调用异步的

process方法),如果Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制阻塞。

有一点必须注意的是当你调用了异步的API,这并不意味着处理过程就是异步的,这仅仅是为不捆绑在调用者线程提供了可能。

至于是否是进行异步处理依赖于Camel路由的配置.

以上是Camel官方对异步Processor的解释,下面是本人用于测试的一个例子:

public static void main(String[] args) throws Exception {
	RouteBuilder builder = new RouteBuilder() {
		
		@Override
		public void configure() throws Exception {
			RouteDefinition definition1 = this.from("file:H:/temp/in");
			RouteDefinition definition2 = definition1.process(new Processor() {
				
				@Override
				public void process(Exchange exchange) throws Exception {
					System.out.println(Thread.currentThread().getName());
					System.out.println("process1");
				}
			}).process(new AsyncProcessor() {
				
				@Override
				public void process(Exchange exchange) throws Exception {
					System.out.println("process");						
				}
				@Override
				public boolean process(Exchange exchange, AsyncCallback callback) {
					System.out.println(Thread.currentThread().getName());
					System.out.println("async process");
					try {
						Thread.sleep(10 * 1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					callback.done(false);
					return false;
				}
			}).process(new Processor() {
				
				@Override
				public void process(Exchange exchange) throws Exception {
					System.out.println(Thread.currentThread().getName());
					System.out.println("process2");
				}
			});
			definition2.to("file:H:/temp/out");
		}
	};
	
	DefaultCamelContext camelContext = new DefaultCamelContext();
	camelContext.addRoutes(builder);
	camelContext.start();
	
	Object object = new Object();
	synchronized (object) {
		object.wait();
	}
	
}
           

当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的例子中,三个线程名称是一样的,

并且在阻塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中执行的,这也是阻塞10秒的原因。

我个人认为是对Camel异步Processor的"异步"两字理解出现了偏差,这里的异步只为processor的processor方法,提供一个

回调函数,而不是另启线程。而且我们自己写Processor处理器对这个异步的使用也很有限,因为我们写的处理器是被调用者,AsyncCallback是由上层提供的,我们只是能调用其done方法通知上层本次处理完成,而我们更多的需求应该是自己去注册回调函数,并且我们能够控制这个回调函数的回调时机,而现在我们无法提供回调函数的注册。那我们不禁要问,这个AsyncCallback对象那到底是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的例子来说是在FileConsumer类中,如下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)

getAsyncProcessor().process(exchange, new AsyncCallback() {
	public void done(boolean doneSync) {
		// noop
		if (log.isTraceEnabled()) {
			log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
		}
	}
});
           

这时创建的AsyncCallback对象就是源始的回调对象,当然在路由执行的后续过程中,该回调对象可以被包装,其中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个例子:

callback = new InternalCallback(states, exchange, callback);

这里我们不禁又会问,既然CamelInternalProcessor能够对源始AsyncCallback对象进行包装加入自己的回调逻辑,为什么我们自己不行呢,其原来还是我们写的Processor是被调用者,是被包装者,具体过程可参看Camel路由启动过程。

如果非要添加自己的回调逻辑也不是不可能,就只能自己写消费者,自己写消费者就能控制源AsyncCallback对象,其后续只是对

源AsyncCallback对象的一个包装的过程,只要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,如果在第二个Processor中如果不执行callback.done(false);的话路由过程将永远不会结束,因为上层一直认为下层处理还未结束。当然如果我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,但是如果我们写了异步Processor就一定要调用callback.done方法。

所以这么一通下来,并没有感受到官方提及的不阻塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,而且这个回调功能还很有限,当然这也有可能是自己什么地方理解错了,如若如此,尽请指正......

继续阅读