
Apache Camel分解(split)与聚合(aggregate)



Apache Camel分解(split)与聚合(aggregate)


package org.apache.camel;

public interface Expression {

    <T> T evaluate(Exchange exchange, Class<T> type);


public static Iterator<Object> createIterator(Object value, String delimiter, final boolean allowEmptyValues) {

	// if its a message than we want to iterate its body
	if (value instanceof Message) {
		value = ((Message) value).getBody();

	if (value == null) {
		return Collections.emptyList().iterator();
	} else if (value instanceof Iterator) {
		return (Iterator<Object>)value;
	} else if (value instanceof Iterable) {
		return ((Iterable<Object>)value).iterator();
	} else if (value.getClass().isArray()) {
		if (isPrimitiveArrayType(value.getClass())) {
			final Object array = value;
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return (idx + 1) < Array.getLength(array);

				public Object next() {
					return Array.get(array, idx);

				public void remove() {
					throw new UnsupportedOperationException();

		} else {
			List<Object> list = Arrays.asList((Object[]) value);
			return list.iterator();
	} else if (value instanceof NodeList) {
		// lets iterate through DOM results after performing XPaths
		final NodeList nodeList = (NodeList) value;
		return new Iterator<Object>() {
			int idx = -1;

			public boolean hasNext() {
				return (idx + 1) < nodeList.getLength();

			public Object next() {
				return nodeList.item(idx);

			public void remove() {
				throw new UnsupportedOperationException();
	} else if (value instanceof String) {
		final String s = (String) value;

		// this code is optimized to only use a Scanner if needed, eg there is a delimiter

		if (delimiter != null && s.contains(delimiter)) {
			// use a scanner if it contains the delimiter
			Scanner scanner = new Scanner((String)value);

			if (DEFAULT_DELIMITER.equals(delimiter)) {
				// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
				// which may have balanced parentheses pairs as well.
				// if the value contains parentheses we need to balance those, to avoid iterating
				// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
				// the regexp will split by comma, but honor parentheses pair that may include commas
				// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
				// then the regexp will split that into two:
				// -> bean=foo?method=killer(a,b)
				// -> bean=bar?method=great(a,b)
				// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
				delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";

			return CastUtils.cast(scanner);
		} else {
			// use a plain iterator that returns the value as is as there are only a single value
			return new Iterator<Object>() {
				int idx = -1;

				public boolean hasNext() {
					return idx + 1 == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));

				public Object next() {
					return s;

				public void remove() {
					throw new UnsupportedOperationException();
	} else {
		return Collections.singletonList(value).iterator();



Apache Camel分解(split)与聚合(aggregate)



package com.xtayfjpk.camel;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.support.ExpressionAdapter;

public class Test {

	 * @param args
	public static void main(String[] args) throws Exception {
		CamelContext camelContext = new DefaultCamelContext();
		camelContext.addRoutes(new RouteBuilder() {
			public void configure() throws Exception {
					.split(new ExpressionAdapter() {
					public Object evaluate(Exchange exchange) {
						File file = exchange.getIn().getBody(File.class);
						Scanner scanner = null;
						if(file!=null) {
							try {
								scanner = new Scanner(file);
							} catch (FileNotFoundException e) {
						return scanner;
				}).process(new Processor() {
					private int count = 0;
					public void process(Exchange exchange) throws Exception {
						exchange.getIn().setHeader("test_correlation_key", (++count)%2);
						System.out.println("body:" + exchange.getIn().getBody());
				}).aggregate(header("test_correlation_key"), new AggregationStrategy() {
					public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
						if (oldExchange == null) {
				            return newExchange;
				        String oldBody = oldExchange.getIn().getBody(String.class);
				        System.out.println("old body:" + oldBody);
				        String newBody = newExchange.getIn().getBody(String.class);
				        System.out.println("new body:" + newBody);
				        oldExchange.getIn().setBody(oldBody + "\n" + newBody);
				        return oldExchange;
				}).completionTimeout(5000).process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("body:" + exchange.getIn().getBody());
		Object object = new Object();
		synchronized (object) {


