在使用Camel时,你可能会使用到分解与聚合,例如当你向消息队列发送一个很大的文件的时候,你可能出于文件大小限制或效率的考量,需要将一个文件分解为若干文件包分别发送,在接收到接收到所有文件包后再合并为一个完整的文件。
分解即将一个消息分解为若干份(消息),然后可以对其进行单独处理,如下图:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2QvwVe0lmdhJ3ZvwFM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2LcZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TOwYDOyEjM3ETMzcDM1EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
要实现分解功能,则需要在路由定义中添加SplitDefinition,也就是要调用ProcessorDefinition.split方法,split方法主要是接收一个Expression对象,org.apache.camel.Expression是一个接口,其中只有一个evaluate方法:
package org.apache.camel;
public interface Expression {
<T> T evaluate(Exchange exchange, Class<T> type);
}
在调用split方法时,evaluate方法需要返回一个的对象类型有一定的规则(要求),具体的规则是什么,看下面的源码则一目录了然:
public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {
// if its a message than we want to iterate its body
if (value instanceof Message) {
value = ((Message) value).getBody();
}
if (value == null) {
return Collections.emptyList().iterator();
} else if (value instanceof Iterator) {
return (Iterator<Object>)value;
} else if (value instanceof Iterable) {
return ((Iterable<Object>)value).iterator();
} else if (value.getClass().isArray()) {
if (isPrimitiveArrayType(value.getClass())) {
final Object array = value;
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return (idx + 1) < Array.getLength(array);
}
public Object next() {
idx++;
return Array.get(array, idx);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
} else {
List<Object> list = Arrays.asList((Object[]) value);
return list.iterator();
}
} else if (value instanceof NodeList) {
// lets iterate through DOM results after performing XPaths
final NodeList nodeList = (NodeList) value;
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return (idx + 1) < nodeList.getLength();
}
public Object next() {
idx++;
return nodeList.item(idx);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
} else if (value instanceof String) {
final String s = (String) value;
// this code is optimized to only use a Scanner if needed, eg there is a delimiter
if (delimiter != null && s.contains(delimiter)) {
// use a scanner if it contains the delimiter
Scanner scanner = new Scanner((String)value);
if (DEFAULT_DELIMITER.equals(delimiter)) {
// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
// which may have balanced parentheses pairs as well.
// if the value contains parentheses we need to balance those, to avoid iterating
// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
// the regexp will split by comma, but honor parentheses pair that may include commas
// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
// then the regexp will split that into two:
// -> bean=foo?method=killer(a,b)
// -> bean=bar?method=great(a,b)
// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
}
scanner.useDelimiter(delimiter);
return CastUtils.cast(scanner);
} else {
// use a plain iterator that returns the value as is as there are only a single value
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
}
public Object next() {
idx++;
return s;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
} else {
return Collections.singletonList(value).iterator();
}
}
该方法定义在org.apache.camel.util.ObjectHelper类,由上可知允许的类型有很多,但最终于转换为了一个java.uti.Iterator对象,这样Camel就能通过该迭代器遍历出各个元素对象,然后针对每一个元素对象创建一个Exchange对象,再把元素对象设置到Message的body中,这样就把一个消息分解为了多份。
聚合,刚好是分解的逆过程,即将根据路由定义路由的多个消息合并为一个消息,如下图:
聚合主要要解决的问题是如何确定哪些消息是要进行聚合的,聚合的过程是怎样的。要实现聚合功能,则需要向路由定义中添加AggregateDefinition,调用ProcessorDefinition.aggregate方法,该方法主要是要提供一个Expression与AggregationStrategy对象,前者用于确定哪些消息需要被聚合,后者用于确定具体的聚合过程如何进行。
下面提供一个分解与聚合具体的例子,实现一个示例功能,将一个文件根据每一行进行分解,被分解后再进行聚合:
package com.xtayfjpk.camel;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
//轮询指定目录
this.from("file:H:/temp/in?noop=true")
//添加SplitDefinition,传入一个自定义的Expression对象
.split(new ExpressionAdapter() {
@Override
public Object evaluate(Exchange exchange) {
//返回一个实现了Iterator接口的类,每一个迭代出来的元素对象创建一个Exchange
File file = exchange.getIn().getBody(File.class);
System.out.println(file);
Scanner scanner = null;
if(file!=null) {
try {
scanner = new Scanner(file);
//根据行进行分解
scanner.useDelimiter("\n");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
return scanner;
}
}).process(new Processor() {
private int count = 0;
public void process(Exchange exchange) throws Exception {
//往消息中设置关联key的值,在聚合的时候将会使用到
//如果消息的关联key值是相同的则表示需要进行聚合
exchange.getIn().setHeader("test_correlation_key", (++count)%2);
System.out.println("body:" + exchange.getIn().getBody());
}
}).aggregate(header("test_correlation_key"), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//如果oldExchange为null,则说明是第一个分解包
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
System.out.println("old body:" + oldBody);
String newBody = newExchange.getIn().getBody(String.class);
System.out.println("new body:" + newBody);
//将新与旧包进行合并,再设置进Message的body中
oldExchange.getIn().setBody(oldBody + "\n" + newBody);
return oldExchange;
}
}).completionTimeout(5000).process(new Processor() {
public void process(Exchange exchange) throws Exception {
//示例后续处理,进行输出
System.out.println("body:" + exchange.getIn().getBody());
}
});
}
});
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}
}
}
在分解与聚合过程中,与此相关的一些数据会做为Exchange的属性进行设置,如分解序号,分解是否完成,聚合序列等,详细内容可参看Camel官方文档:
http://camel.apache.org/splitter.html
http://camel.apache.org/aggregator.html