天天看點

Apache Camel分解(split)與聚合(aggregate)

   在使用Camel時,你可能會使用到分解與聚合,例如當你向消息隊列發送一個很大的檔案的時候,你可能出于檔案大小限制或效率的考量,需要将一個檔案分解為若幹檔案包分别發送,在接收到接收到所有檔案包後再合并為一個完整的檔案。

   分解即将一個消息分解為若幹份(消息),然後可以對其進行單獨處理,如下圖:

Apache Camel分解(split)與聚合(aggregate)

   要實作分解功能,則需要在路由定義中添加SplitDefinition,也就是要調用ProcessorDefinition.split方法,split方法主要是接收一個Expression對象,org.apache.camel.Expression是一個接口,其中隻有一個evaluate方法:

package org.apache.camel;

public interface Expression {

    <T> T evaluate(Exchange exchange, Class<T> type);
}
           

在調用split方法時,evaluate方法需要傳回一個的對象類型有一定的規則(要求),具體的規則是什麼,看下面的源碼則一目錄了然:

public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {

	// if its a message than we want to iterate its body
	if (value instanceof Message) {
		value = ((Message) value).getBody();
	}

	if (value == null) {
		return Collections.emptyList().iterator();
	} else if (value instanceof Iterator) {
		return (Iterator<Object>)value;
	} else if (value instanceof Iterable) {
		return ((Iterable<Object>)value).iterator();
	} else if (value.getClass().isArray()) {
		if (isPrimitiveArrayType(value.getClass())) {
			final Object array = value;
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return (idx + 1) < Array.getLength(array);
				}

				public Object next() {
					idx++;
					return Array.get(array, idx);
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}

			};
		} else {
			List<Object> list = Arrays.asList((Object[]) value);
			return list.iterator();
		}
	} else if (value instanceof NodeList) {
		// lets iterate through DOM results after performing XPaths
		final NodeList nodeList = (NodeList) value;
		return new Iterator<Object>() {
			int idx = -1;

			public boolean hasNext() {
				return (idx + 1) < nodeList.getLength();
			}

			public Object next() {
				idx++;
				return nodeList.item(idx);
			}

			public void remove() {
				throw new UnsupportedOperationException();
			}
		};
	} else if (value instanceof String) {
		final String s = (String) value;

		// this code is optimized to only use a Scanner if needed, eg there is a delimiter

		if (delimiter != null && s.contains(delimiter)) {
			// use a scanner if it contains the delimiter
			Scanner scanner = new Scanner((String)value);

			if (DEFAULT_DELIMITER.equals(delimiter)) {
				// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
				// which may have balanced parentheses pairs as well.
				// if the value contains parentheses we need to balance those, to avoid iterating
				// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
				// the regexp will split by comma, but honor parentheses pair that may include commas
				// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
				// then the regexp will split that into two:
				// -> bean=foo?method=killer(a,b)
				// -> bean=bar?method=great(a,b)
				// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
				delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
			}

			scanner.useDelimiter(delimiter);
			return CastUtils.cast(scanner);
		} else {
			// use a plain iterator that returns the value as is as there are only a single value
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
				}

				public Object next() {
					idx++;
					return s;
				}

				public void remove() {
					throw new UnsupportedOperationException();
				}
			};
		}
	} else {
		return Collections.singletonList(value).iterator();
	}
}
           

   該方法定義在org.apache.camel.util.ObjectHelper類,由上可知允許的類型有很多,但最終于轉換為了一個java.uti.Iterator對象,這樣Camel就能通過該疊代器周遊出各個元素對象,然後針對每一個元素對象建立一個Exchange對象,再把元素對象設定到Message的body中,這樣就把一個消息分解為了多份。

   聚合,剛好是分解的逆過程,即将根據路由定義路由的多個消息合并為一個消息,如下圖:

Apache Camel分解(split)與聚合(aggregate)

   聚合主要要解決的問題是如何确定哪些消息是要進行聚合的,聚合的過程是怎樣的。要實作聚合功能,則需要向路由定義中添加AggregateDefinition,調用ProcessorDefinition.aggregate方法,該方法主要是要提供一個Expression與AggregationStrategy對象,前者用于确定哪些消息需要被聚合,後者用于确定具體的聚合過程如何進行。

   下面提供一個分解與聚合具體的例子,實作一個示例功能,将一個檔案根據每一行進行分解,被分解後再進行聚合:

package com.xtayfjpk.camel;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		CamelContext camelContext = new DefaultCamelContext();
		camelContext.addRoutes(new RouteBuilder() {
			
			@Override
			public void configure() throws Exception {
				//輪詢指定目錄
				this.from("file:H:/temp/in?noop=true")
					//添加SplitDefinition,傳入一個自定義的Expression對象
					.split(new ExpressionAdapter() {
					
					@Override
					public Object evaluate(Exchange exchange) {
						//傳回一個實作了Iterator接口的類,每一個疊代出來的元素對象建立一個Exchange
						File file = exchange.getIn().getBody(File.class);
						System.out.println(file);
						Scanner scanner = null;
						if(file!=null) {
							try {
								scanner = new Scanner(file);
								//根據行進行分解
								scanner.useDelimiter("\n");
							} catch (FileNotFoundException e) {
								e.printStackTrace();
							}
						}
						return scanner;
					}
				}).process(new Processor() {
					private int count = 0;
					
					public void process(Exchange exchange) throws Exception {
						//往消息中設定關聯key的值,在聚合的時候将會使用到
						//如果消息的關聯key值是相同的則表示需要進行聚合
						exchange.getIn().setHeader("test_correlation_key", (++count)%2);
						System.out.println("body:" + exchange.getIn().getBody());
					}
				}).aggregate(header("test_correlation_key"), new AggregationStrategy() {
					
					public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
						//如果oldExchange為null,則說明是第一個分解包
						if (oldExchange == null) {
				            return newExchange;
				        }
				 
				        String oldBody = oldExchange.getIn().getBody(String.class);
				        System.out.println("old body:" + oldBody);
				        String newBody = newExchange.getIn().getBody(String.class);
				        System.out.println("new body:" + newBody);
						//将新與舊包進行合并,再設定進Message的body中
				        oldExchange.getIn().setBody(oldBody + "\n" + newBody);
				        return oldExchange;
					}
				}).completionTimeout(5000).process(new Processor() {
					
					public void process(Exchange exchange) throws Exception {
						//示例後續處理,進行輸出
						System.out.println("body:" + exchange.getIn().getBody());
						
					}
				});
				
			}
		});
		
		camelContext.start();
		
		Object object = new Object();
		synchronized (object) {
			object.wait();
		}
	}
}
           

在分解與聚合過程中,與此相關的一些資料會做為Exchange的屬性進行設定,如分解序号,分解是否完成,聚合序列等,詳細内容可參看Camel官方文檔:

http://camel.apache.org/splitter.html

http://camel.apache.org/aggregator.html

繼續閱讀