天天看点

Druid源码分析之Query -- Sequence与Yielder

Druid设计了一种可以迭代的序列叫Sequence,其实现在java-util包中。它是对Iterator操作的高级封装。但是它与普通的Iterator不同,它不会为您提供从中提取值的方式,而是提供了一个累加器(Accumulator)并定义数据如何操作。

这种控制反转(IoC)的方式是为了更好的让Sequence进行资源管理。当执行结束时,它可以强制调用close()方法来清除资源。如果没有这种反转,在操作时会很容易引起资源泄漏。

Sequence上还暴露了Yielder对象。它可以允许你在Sequence上遍历操作时进行中断,它会保存执行的状态,下次执行是在从终端处开始。它不会提供类似于Sequence的资源管理功能,需要显示的调用close方法。

首先来看一下Sequence接口的定义:

public interface Sequence<T>
{
  public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
  public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
}
           

它只提供了两个方法接口:

1. accumulate方法的功能是通过控制反转的方式完成聚合运算。该方法的第一个参数为传入的初始值,Accumulator累加器是封装回调函数的接口,把原来在迭代过程中进行聚合运算的逻辑抽取到accumulate方法中回调执行。它的接口定义为:

public interface Accumulator<AccumulatedType, InType>
{
  public AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
           

以上的方法中的两个参数通过范型的方式设定:

第一个参数保存聚合的结果,在调用时作为参数传入,计算完成后将该参数作为结果返回,并在下一次迭代时作为参数传入,循环执行,知道迭代结束。其使用方法举例如下(取自BaseSequence类的makeYeilder方法):

OutType retVal = initValue;
    while (!accumulator.yielded() && iter.hasNext()) {
      retVal = accumulator.accumulate(retVal, iter.next());
    }
           

第二个参数in是迭代器的下一个元素的值。举一个例子说明:假设AccumulatorType是Integer, in也是Integer。下面验证以下一个整数的Sequence是不是递增的,其实现方法如下:

public Integer accumulate(Integer accumulated, Integer in) {
		Assert.assertEquals(accumulated, in);
		return Integer.valueOf(accumulated.intValue() + 1);
	}
           

再一个完整的例子,取一个值为0 - 9的List的所有值的和:

public static void main(String[] args) {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		int x = intSequence.accumulate(0, new Accumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				return accumulated + in;
			}
			
		});
		
		System.out.println(x);

	}
           

2. 第二个方法为toYielder。它的功能是将Sequence转换成一个Yielder。Yielder对象可以看作是一个无法回溯的连标。调用Yielder的get()方法可以用来获取当前元素的值。通过调用next方法获取下一个Yielder对象。

在toYielder方法中需要传入一个YieldingAccumulator,它和Yielder协同工作实现Java语言中的中断/延续执行。YieldingAccumulator的接口实现如下所示:

public abstract class YieldingAccumulator<AccumulatedType, InType>
{
  private boolean yielded = false;

  public void yield()
  {
    yielded = true;
  }

  public boolean yielded()
  {
    return yielded;
  }

  public void reset()
  {
    yielded = false;
  }

  public abstract AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
           

从以上定义中可以看出,YieldingAccumulator添加了yield标志。yield标志的初始值为false,调用yield方法以后将该标志设置为true。yield标志的作用是退出当前的遍历迭代过程,并将Accumulator的值赋值给当前的Yielder。举个例子如下:

public static void main(String[] args) throws IOException {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		Yielder<Integer> yielder = intSequence.toYielder(0, new YieldingAccumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				yield();
				return accumulated + in;
			}
			
		});
		
		while(!yielder.isDone()) {
			int x = yielder.get();
			System.out.println(x);
			Yielder<Integer> oldYielder = yielder;
			yielder = oldYielder.next(x);
			oldYielder.close();
		}
		yielder.close();
	}
           

Sequence是怎么构造的呢?它是由工具类Sequences来创建的。其实现类如下:

public class Sequences
{

  private static final EmptySequence EMPTY_SEQUENCE = new EmptySequence();

  public static <T> Sequence<T> simple(final Iterable<T> iterable)
  {
    return BaseSequence.simple(iterable);
  }

  @SuppressWarnings("unchecked")
  public static <T> Sequence<T> empty()
  {
    return (Sequence<T>) EMPTY_SEQUENCE;
  }

  public static <T> Sequence<T> concat(Sequence<T>... sequences)
  {
    return concat(Arrays.asList(sequences));
  }

  public static <T> Sequence<T> concat(Iterable<Sequence<T>> sequences)
  {
    return concat(Sequences.simple(sequences));
  }

  public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences)
  {
    return new ConcatSequence<>(sequences);
  }

  public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<From, To> fn)
  {
    return new MappedSequence<>(sequence, fn);
  }

  public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred)
  {
    return new FilteredSequence<>(sequence, pred);
  }

  public static <T> Sequence<T> limit(final Sequence<T> sequence, final int limit)
  {
    return new LimitedSequence<>(sequence, limit);
  }

  public static <T> Sequence<T> withBaggage(final Sequence<T> seq, Closeable baggage)
  {
    return new ResourceClosingSequence<>(seq, baggage);
  }

  public static <T> Sequence<T> withEffect(final Sequence <T> seq, final Runnable effect, final Executor exec)
  {
    return new Sequence<T>()
    {
      @Override
      public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
      {
        final OutType out = seq.accumulate(initValue, accumulator);
        exec.execute(effect);
        return out;
      }

      @Override
      public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
      {
        return new ExecuteWhenDoneYielder<>(seq.toYielder(initValue, accumulator), effect, exec);
      }
    };
  }

  // This will materialize the entire sequence in memory. Use at your own risk.
  public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
  {
    List<T> seqList = Sequences.toList(sequence, Lists.<T>newArrayList());
    Collections.sort(seqList, comparator);
    return BaseSequence.simple(seqList);
  }

  public static <T, ListType extends List<T>> ListType toList(Sequence<T> seq, ListType list)
  {
    return seq.accumulate(list, Accumulators.<ListType, T>list());
  }

  private static class EmptySequence implements Sequence<Object>
  {
    @Override
    public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, Object> accumulator)
    {
      return initValue;
    }

    @Override
    public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, Object> accumulator)
    {
      return Yielders.done(initValue, null);
    }
  }
}
           

下面看一下这个工具类里的一些常用的方法:

  • simple:传入一个iterable接口的对象。返回一个BaseSequence对象。
  • concat:把多个Sequence合并成一个。为了减少内存的使用,并不会把多个Sequence中的元素复制到一个新的Sequence,而是在执行accumulate方法时将多个Sequence的积累结果合并在一起。
  • map:类似于函数式编程中的map函数,在执行accumulate方法时,在调用转换函数以后再进行聚合操作。举例说明:
public static void main(String[] args) throws IOException {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		Function<Integer, Integer> fn = new Function<Integer, Integer>() {

			@Override
			public Integer apply(Integer input) {
				return input + 5;
			}
			
		};
		
		Sequence<Integer> toSeq = Sequences.map(intSequence, fn);
		toSeq.accumulate(0, new Accumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				System.out.println(in);
				return in;
			}
			
		});
		
	}
}
           
  • filter:其功能是在执行accumulated方法时根据传入的Predicate过滤,如果Predicate返回true,则进行累加,否则放弃。GroupBy查询的Having就是使用该方法实现的。
  • withEffect:在执行accumulate方法时异步执行某些逻辑,例如在CachingQueryRunner中异步的将Sequence中的元素收集到一个List中,待accumulate方法执行完成在进行缓存操作。

继续阅读