天天看點

Druid源碼分析之Query -- Sequence與Yielder

Druid設計了一種可以疊代的序列叫Sequence,其實作在java-util包中。它是對Iterator操作的進階封裝。但是它與普通的Iterator不同,它不會為您提供從中提取值的方式,而是提供了一個累加器(Accumulator)并定義資料如何操作。

這種控制反轉(IoC)的方式是為了更好的讓Sequence進行資源管理。當執行結束時,它可以強制調用close()方法來清除資源。如果沒有這種反轉,在操作時會很容易引起資源洩漏。

Sequence上還暴露了Yielder對象。它可以允許你在Sequence上周遊操作時進行中斷,它會儲存執行的狀态,下次執行是在從終端處開始。它不會提供類似于Sequence的資源管理功能,需要顯示的調用close方法。

首先來看一下Sequence接口的定義:

public interface Sequence<T>
{
  public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
  public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
}
           

它隻提供了兩個方法接口:

1. accumulate方法的功能是通過控制反轉的方式完成聚合運算。該方法的第一個參數為傳入的初始值,Accumulator累加器是封裝回調函數的接口,把原來在疊代過程中進行聚合運算的邏輯抽取到accumulate方法中回調執行。它的接口定義為:

public interface Accumulator<AccumulatedType, InType>
{
  public AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
           

以上的方法中的兩個參數通過範型的方式設定:

第一個參數儲存聚合的結果,在調用時作為參數傳入,計算完成後将該參數作為結果傳回,并在下一次疊代時作為參數傳入,循環執行,知道疊代結束。其使用方法舉例如下(取自BaseSequence類的makeYeilder方法):

OutType retVal = initValue;
    while (!accumulator.yielded() && iter.hasNext()) {
      retVal = accumulator.accumulate(retVal, iter.next());
    }
           

第二個參數in是疊代器的下一個元素的值。舉一個例子說明:假設AccumulatorType是Integer, in也是Integer。下面驗證以下一個整數的Sequence是不是遞增的,其實作方法如下:

public Integer accumulate(Integer accumulated, Integer in) {
		Assert.assertEquals(accumulated, in);
		return Integer.valueOf(accumulated.intValue() + 1);
	}
           

再一個完整的例子,取一個值為0 - 9的List的所有值的和:

public static void main(String[] args) {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		int x = intSequence.accumulate(0, new Accumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				return accumulated + in;
			}
			
		});
		
		System.out.println(x);

	}
           

2. 第二個方法為toYielder。它的功能是将Sequence轉換成一個Yielder。Yielder對象可以看作是一個無法回溯的連标。調用Yielder的get()方法可以用來擷取目前元素的值。通過調用next方法擷取下一個Yielder對象。

在toYielder方法中需要傳入一個YieldingAccumulator,它和Yielder協同工作實作Java語言中的中斷/延續執行。YieldingAccumulator的接口實作如下所示:

public abstract class YieldingAccumulator<AccumulatedType, InType>
{
  private boolean yielded = false;

  public void yield()
  {
    yielded = true;
  }

  public boolean yielded()
  {
    return yielded;
  }

  public void reset()
  {
    yielded = false;
  }

  public abstract AccumulatedType accumulate(AccumulatedType accumulated, InType in);
}
           

從以上定義中可以看出,YieldingAccumulator添加了yield标志。yield标志的初始值為false,調用yield方法以後将該标志設定為true。yield标志的作用是退出目前的周遊疊代過程,并将Accumulator的值指派給目前的Yielder。舉個例子如下:

public static void main(String[] args) throws IOException {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		Yielder<Integer> yielder = intSequence.toYielder(0, new YieldingAccumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				yield();
				return accumulated + in;
			}
			
		});
		
		while(!yielder.isDone()) {
			int x = yielder.get();
			System.out.println(x);
			Yielder<Integer> oldYielder = yielder;
			yielder = oldYielder.next(x);
			oldYielder.close();
		}
		yielder.close();
	}
           

Sequence是怎麼構造的呢?它是由工具類Sequences來建立的。其實作類如下:

public class Sequences
{

  private static final EmptySequence EMPTY_SEQUENCE = new EmptySequence();

  public static <T> Sequence<T> simple(final Iterable<T> iterable)
  {
    return BaseSequence.simple(iterable);
  }

  @SuppressWarnings("unchecked")
  public static <T> Sequence<T> empty()
  {
    return (Sequence<T>) EMPTY_SEQUENCE;
  }

  public static <T> Sequence<T> concat(Sequence<T>... sequences)
  {
    return concat(Arrays.asList(sequences));
  }

  public static <T> Sequence<T> concat(Iterable<Sequence<T>> sequences)
  {
    return concat(Sequences.simple(sequences));
  }

  public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences)
  {
    return new ConcatSequence<>(sequences);
  }

  public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<From, To> fn)
  {
    return new MappedSequence<>(sequence, fn);
  }

  public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred)
  {
    return new FilteredSequence<>(sequence, pred);
  }

  public static <T> Sequence<T> limit(final Sequence<T> sequence, final int limit)
  {
    return new LimitedSequence<>(sequence, limit);
  }

  public static <T> Sequence<T> withBaggage(final Sequence<T> seq, Closeable baggage)
  {
    return new ResourceClosingSequence<>(seq, baggage);
  }

  public static <T> Sequence<T> withEffect(final Sequence <T> seq, final Runnable effect, final Executor exec)
  {
    return new Sequence<T>()
    {
      @Override
      public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
      {
        final OutType out = seq.accumulate(initValue, accumulator);
        exec.execute(effect);
        return out;
      }

      @Override
      public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
      {
        return new ExecuteWhenDoneYielder<>(seq.toYielder(initValue, accumulator), effect, exec);
      }
    };
  }

  // This will materialize the entire sequence in memory. Use at your own risk.
  public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
  {
    List<T> seqList = Sequences.toList(sequence, Lists.<T>newArrayList());
    Collections.sort(seqList, comparator);
    return BaseSequence.simple(seqList);
  }

  public static <T, ListType extends List<T>> ListType toList(Sequence<T> seq, ListType list)
  {
    return seq.accumulate(list, Accumulators.<ListType, T>list());
  }

  private static class EmptySequence implements Sequence<Object>
  {
    @Override
    public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, Object> accumulator)
    {
      return initValue;
    }

    @Override
    public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, Object> accumulator)
    {
      return Yielders.done(initValue, null);
    }
  }
}
           

下面看一下這個工具類裡的一些常用的方法:

  • simple:傳入一個iterable接口的對象。傳回一個BaseSequence對象。
  • concat:把多個Sequence合并成一個。為了減少記憶體的使用,并不會把多個Sequence中的元素複制到一個新的Sequence,而是在執行accumulate方法時将多個Sequence的積累結果合并在一起。
  • map:類似于函數式程式設計中的map函數,在執行accumulate方法時,在調用轉換函數以後再進行聚合操作。舉例說明:
public static void main(String[] args) throws IOException {
		
		List<Integer> intList = new ArrayList<>();
		
		for(int i = 0; i < 10; i ++) {
			intList.add(i);
		}
		
		Sequence<Integer> intSequence = Sequences.simple(intList);
		
		Function<Integer, Integer> fn = new Function<Integer, Integer>() {

			@Override
			public Integer apply(Integer input) {
				return input + 5;
			}
			
		};
		
		Sequence<Integer> toSeq = Sequences.map(intSequence, fn);
		toSeq.accumulate(0, new Accumulator<Integer, Integer>() {

			@Override
			public Integer accumulate(Integer accumulated, Integer in) {
				System.out.println(in);
				return in;
			}
			
		});
		
	}
}
           
  • filter:其功能是在執行accumulated方法時根據傳入的Predicate過濾,如果Predicate傳回true,則進行累加,否則放棄。GroupBy查詢的Having就是使用該方法實作的。
  • withEffect:在執行accumulate方法時異步執行某些邏輯,例如在CachingQueryRunner中異步的将Sequence中的元素收集到一個List中,待accumulate方法執行完成在進行緩存操作。

繼續閱讀