在使用Camel時,你可能會使用到分解與聚合,例如當你向消息隊列發送一個很大的檔案的時候,你可能出于檔案大小限制或效率的考量,需要将一個檔案分解為若幹檔案包分别發送,在接收到接收到所有檔案包後再合并為一個完整的檔案。
分解即将一個消息分解為若幹份(消息),然後可以對其進行單獨處理,如下圖:
要實作分解功能,則需要在路由定義中添加SplitDefinition,也就是要調用ProcessorDefinition.split方法,split方法主要是接收一個Expression對象,org.apache.camel.Expression是一個接口,其中隻有一個evaluate方法:
package org.apache.camel;
public interface Expression {
<T> T evaluate(Exchange exchange, Class<T> type);
}
在調用split方法時,evaluate方法需要傳回一個的對象類型有一定的規則(要求),具體的規則是什麼,看下面的源碼則一目錄了然:
public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {
// if its a message than we want to iterate its body
if (value instanceof Message) {
value = ((Message) value).getBody();
}
if (value == null) {
return Collections.emptyList().iterator();
} else if (value instanceof Iterator) {
return (Iterator<Object>)value;
} else if (value instanceof Iterable) {
return ((Iterable<Object>)value).iterator();
} else if (value.getClass().isArray()) {
if (isPrimitiveArrayType(value.getClass())) {
final Object array = value;
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return (idx + 1) < Array.getLength(array);
}
public Object next() {
idx++;
return Array.get(array, idx);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
} else {
List<Object> list = Arrays.asList((Object[]) value);
return list.iterator();
}
} else if (value instanceof NodeList) {
// lets iterate through DOM results after performing XPaths
final NodeList nodeList = (NodeList) value;
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return (idx + 1) < nodeList.getLength();
}
public Object next() {
idx++;
return nodeList.item(idx);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
} else if (value instanceof String) {
final String s = (String) value;
// this code is optimized to only use a Scanner if needed, eg there is a delimiter
if (delimiter != null && s.contains(delimiter)) {
// use a scanner if it contains the delimiter
Scanner scanner = new Scanner((String)value);
if (DEFAULT_DELIMITER.equals(delimiter)) {
// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
// which may have balanced parentheses pairs as well.
// if the value contains parentheses we need to balance those, to avoid iterating
// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
// the regexp will split by comma, but honor parentheses pair that may include commas
// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
// then the regexp will split that into two:
// -> bean=foo?method=killer(a,b)
// -> bean=bar?method=great(a,b)
// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
}
scanner.useDelimiter(delimiter);
return CastUtils.cast(scanner);
} else {
// use a plain iterator that returns the value as is as there are only a single value
return new Iterator<Object>() {
int idx = -1;
public boolean hasNext() {
return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
}
public Object next() {
idx++;
return s;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
} else {
return Collections.singletonList(value).iterator();
}
}
該方法定義在org.apache.camel.util.ObjectHelper類,由上可知允許的類型有很多,但最終于轉換為了一個java.uti.Iterator對象,這樣Camel就能通過該疊代器周遊出各個元素對象,然後針對每一個元素對象建立一個Exchange對象,再把元素對象設定到Message的body中,這樣就把一個消息分解為了多份。
聚合,剛好是分解的逆過程,即将根據路由定義路由的多個消息合并為一個消息,如下圖:
聚合主要要解決的問題是如何确定哪些消息是要進行聚合的,聚合的過程是怎樣的。要實作聚合功能,則需要向路由定義中添加AggregateDefinition,調用ProcessorDefinition.aggregate方法,該方法主要是要提供一個Expression與AggregationStrategy對象,前者用于确定哪些消息需要被聚合,後者用于确定具體的聚合過程如何進行。
下面提供一個分解與聚合具體的例子,實作一個示例功能,将一個檔案根據每一行進行分解,被分解後再進行聚合:
package com.xtayfjpk.camel;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
//輪詢指定目錄
this.from("file:H:/temp/in?noop=true")
//添加SplitDefinition,傳入一個自定義的Expression對象
.split(new ExpressionAdapter() {
@Override
public Object evaluate(Exchange exchange) {
//傳回一個實作了Iterator接口的類,每一個疊代出來的元素對象建立一個Exchange
File file = exchange.getIn().getBody(File.class);
System.out.println(file);
Scanner scanner = null;
if(file!=null) {
try {
scanner = new Scanner(file);
//根據行進行分解
scanner.useDelimiter("\n");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
return scanner;
}
}).process(new Processor() {
private int count = 0;
public void process(Exchange exchange) throws Exception {
//往消息中設定關聯key的值,在聚合的時候将會使用到
//如果消息的關聯key值是相同的則表示需要進行聚合
exchange.getIn().setHeader("test_correlation_key", (++count)%2);
System.out.println("body:" + exchange.getIn().getBody());
}
}).aggregate(header("test_correlation_key"), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//如果oldExchange為null,則說明是第一個分解包
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
System.out.println("old body:" + oldBody);
String newBody = newExchange.getIn().getBody(String.class);
System.out.println("new body:" + newBody);
//将新與舊包進行合并,再設定進Message的body中
oldExchange.getIn().setBody(oldBody + "\n" + newBody);
return oldExchange;
}
}).completionTimeout(5000).process(new Processor() {
public void process(Exchange exchange) throws Exception {
//示例後續處理,進行輸出
System.out.println("body:" + exchange.getIn().getBody());
}
});
}
});
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}
}
}
在分解與聚合過程中,與此相關的一些資料會做為Exchange的屬性進行設定,如分解序号,分解是否完成,聚合序列等,詳細内容可參看Camel官方文檔:
http://camel.apache.org/splitter.html
http://camel.apache.org/aggregator.html