Druid設計了一種可以疊代的序列叫Sequence,其實作在java-util包中。它是對Iterator操作的進階封裝。但是它與普通的Iterator不同,它不會為您提供從中提取值的方式,而是提供了一個累加器(Accumulator)并定義資料如何操作。
這種控制反轉(IoC)的方式是為了更好的讓Sequence進行資源管理。當執行結束時,它可以強制調用close()方法來清除資源。如果沒有這種反轉,在操作時會很容易引起資源洩漏。
Sequence上還暴露了Yielder對象。它可以允許你在Sequence上周遊操作時進行中斷,它會儲存執行的狀态,下次執行是在從終端處開始。它不會提供類似于Sequence的資源管理功能,需要顯示的調用close方法。
首先來看一下Sequence接口的定義:
public interface Sequence<T>
{
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
}
它隻提供了兩個方法接口:
1. accumulate方法的功能是通過控制反轉的方式完成聚合運算。該方法的第一個參數為傳入的初始值,Accumulator累加器是封裝回調函數的接口,把原來在疊代過程中進行聚合運算的邏輯抽取到accumulate方法中回調執行。它的接口定義為:
public interface Accumulator<AccumulatedType, InType>
{
public AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
以上的方法中的兩個參數通過範型的方式設定:
第一個參數儲存聚合的結果,在調用時作為參數傳入,計算完成後将該參數作為結果傳回,并在下一次疊代時作為參數傳入,循環執行,知道疊代結束。其使用方法舉例如下(取自BaseSequence類的makeYeilder方法):
OutType retVal = initValue;
while (!accumulator.yielded() && iter.hasNext()) {
retVal = accumulator.accumulate(retVal, iter.next());
}
第二個參數in是疊代器的下一個元素的值。舉一個例子說明:假設AccumulatorType是Integer, in也是Integer。下面驗證以下一個整數的Sequence是不是遞增的,其實作方法如下:
public Integer accumulate(Integer accumulated, Integer in) {
Assert.assertEquals(accumulated, in);
return Integer.valueOf(accumulated.intValue() + 1);
}
再一個完整的例子,取一個值為0 - 9的List的所有值的和:
public static void main(String[] args) {
List<Integer> intList = new ArrayList<>();
for(int i = 0; i < 10; i ++) {
intList.add(i);
}
Sequence<Integer> intSequence = Sequences.simple(intList);
int x = intSequence.accumulate(0, new Accumulator<Integer, Integer>() {
@Override
public Integer accumulate(Integer accumulated, Integer in) {
return accumulated + in;
}
});
System.out.println(x);
}
2. 第二個方法為toYielder。它的功能是将Sequence轉換成一個Yielder。Yielder對象可以看作是一個無法回溯的連标。調用Yielder的get()方法可以用來擷取目前元素的值。通過調用next方法擷取下一個Yielder對象。
在toYielder方法中需要傳入一個YieldingAccumulator,它和Yielder協同工作實作Java語言中的中斷/延續執行。YieldingAccumulator的接口實作如下所示:
public abstract class YieldingAccumulator<AccumulatedType, InType>
{
private boolean yielded = false;
public void yield()
{
yielded = true;
}
public boolean yielded()
{
return yielded;
}
public void reset()
{
yielded = false;
}
public abstract AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
從以上定義中可以看出,YieldingAccumulator添加了yield标志。yield标志的初始值為false,調用yield方法以後将該标志設定為true。yield标志的作用是退出目前的周遊疊代過程,并将Accumulator的值指派給目前的Yielder。舉個例子如下:
public static void main(String[] args) throws IOException {
List<Integer> intList = new ArrayList<>();
for(int i = 0; i < 10; i ++) {
intList.add(i);
}
Sequence<Integer> intSequence = Sequences.simple(intList);
Yielder<Integer> yielder = intSequence.toYielder(0, new YieldingAccumulator<Integer, Integer>() {
@Override
public Integer accumulate(Integer accumulated, Integer in) {
yield();
return accumulated + in;
}
});
while(!yielder.isDone()) {
int x = yielder.get();
System.out.println(x);
Yielder<Integer> oldYielder = yielder;
yielder = oldYielder.next(x);
oldYielder.close();
}
yielder.close();
}
Sequence是怎麼構造的呢?它是由工具類Sequences來建立的。其實作類如下:
public class Sequences
{
private static final EmptySequence EMPTY_SEQUENCE = new EmptySequence();
public static <T> Sequence<T> simple(final Iterable<T> iterable)
{
return BaseSequence.simple(iterable);
}
@SuppressWarnings("unchecked")
public static <T> Sequence<T> empty()
{
return (Sequence<T>) EMPTY_SEQUENCE;
}
public static <T> Sequence<T> concat(Sequence<T>... sequences)
{
return concat(Arrays.asList(sequences));
}
public static <T> Sequence<T> concat(Iterable<Sequence<T>> sequences)
{
return concat(Sequences.simple(sequences));
}
public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences)
{
return new ConcatSequence<>(sequences);
}
public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<From, To> fn)
{
return new MappedSequence<>(sequence, fn);
}
public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred)
{
return new FilteredSequence<>(sequence, pred);
}
public static <T> Sequence<T> limit(final Sequence<T> sequence, final int limit)
{
return new LimitedSequence<>(sequence, limit);
}
public static <T> Sequence<T> withBaggage(final Sequence<T> seq, Closeable baggage)
{
return new ResourceClosingSequence<>(seq, baggage);
}
public static <T> Sequence<T> withEffect(final Sequence <T> seq, final Runnable effect, final Executor exec)
{
return new Sequence<T>()
{
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
final OutType out = seq.accumulate(initValue, accumulator);
exec.execute(effect);
return out;
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
return new ExecuteWhenDoneYielder<>(seq.toYielder(initValue, accumulator), effect, exec);
}
};
}
// This will materialize the entire sequence in memory. Use at your own risk.
public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
{
List<T> seqList = Sequences.toList(sequence, Lists.<T>newArrayList());
Collections.sort(seqList, comparator);
return BaseSequence.simple(seqList);
}
public static <T, ListType extends List<T>> ListType toList(Sequence<T> seq, ListType list)
{
return seq.accumulate(list, Accumulators.<ListType, T>list());
}
private static class EmptySequence implements Sequence<Object>
{
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, Object> accumulator)
{
return initValue;
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, Object> accumulator)
{
return Yielders.done(initValue, null);
}
}
}
下面看一下這個工具類裡的一些常用的方法:
- simple:傳入一個iterable接口的對象。傳回一個BaseSequence對象。
- concat:把多個Sequence合并成一個。為了減少記憶體的使用,并不會把多個Sequence中的元素複制到一個新的Sequence,而是在執行accumulate方法時将多個Sequence的積累結果合并在一起。
- map:類似于函數式程式設計中的map函數,在執行accumulate方法時,在調用轉換函數以後再進行聚合操作。舉例說明:
public static void main(String[] args) throws IOException {
List<Integer> intList = new ArrayList<>();
for(int i = 0; i < 10; i ++) {
intList.add(i);
}
Sequence<Integer> intSequence = Sequences.simple(intList);
Function<Integer, Integer> fn = new Function<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input + 5;
}
};
Sequence<Integer> toSeq = Sequences.map(intSequence, fn);
toSeq.accumulate(0, new Accumulator<Integer, Integer>() {
@Override
public Integer accumulate(Integer accumulated, Integer in) {
System.out.println(in);
return in;
}
});
}
}
- filter:其功能是在執行accumulated方法時根據傳入的Predicate過濾,如果Predicate傳回true,則進行累加,否則放棄。GroupBy查詢的Having就是使用該方法實作的。
- withEffect:在執行accumulate方法時異步執行某些邏輯,例如在CachingQueryRunner中異步的将Sequence中的元素收集到一個List中,待accumulate方法執行完成在進行緩存操作。