自從函數式程式設計和響應式程式設計逐漸進入到程式員的生活之後,map函數作為其中一個重要算子也為大家所熟知,無論是前端web開發,手機開發還是後端伺服器開發,都很難逃過它的手心。而在大資料領域中又往往可以見到另外一個算子mapPartition的身影。在性能調優中,經常會被建議盡量用 mappartition 操作去替代 map 操作。本文将從Flink源碼和示例入手,為大家解析為什麼mapPartition比map更高效。
[源碼解析]為什麼mapPartition比map更高效
目錄
-
- 0x00 摘要
- 0x01 map vs mapPartition
- 1.1 map
- 1.2 mapPartition
- 1.3 異同
- 0x02 代碼
- 0x03 Flink的傳輸機制
- 3.1 傳輸機制概述
- 3.2 遠端通信
- 3.3 TaskManager程序内傳輸
- 3.4 源碼分析
- 0x04 runtime
- 4.1 Driver
- 4.2 MapDriver
- 4.3 MapPartitionDriver
- 4.4 效率差別
- 0x05 優化和ChainedMapDriver
- 0x06 總結
- 0x07 參考
Map的作用是将資料流上每個元素轉換為另外的元素,比如
data.map { x => x.toInt }
。它把
數組流
中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的
數組流
。然後傳回這個新資料流。
MapPartition的作用是單個函數調用并行分區,比如
data.mapPartition { in => in map { (_, 1) } }
。該函數将分區作為“疊代器”,可以産生任意數量的結果。每個分區中的元素數量取決于并行度和以前的operations。
其實,兩者完成的業務操作是一樣的,本質上都是将資料流上每個元素轉換為另外的元素。
差別主要在兩點。
從邏輯實作來講,
- map邏輯實作簡單,就是在函數中簡單一一轉換,map函數的輸入和輸入都是單個元素。
- mapPartition相對複雜,函數的輸入有兩個,一般格式為
。其中values是需要映射轉換的所有記錄,out是用來發送結果的collector。具體傳回什麼,如何操作out來傳回結果,則完全依賴于業務邏輯。void mapPartition(Iterable<T> values, Collector<O> out)
從調用次數來說,
- 資料有多少個元素,map就會被調用多少次。
- 資料有多少分區,mapPartition就會被調用多少次。
為什麼MapPartition有這麼高效呢,下面我們将具體論證。
首先我們給出示例代碼,從下文中我們可以看出,map就是簡單的轉換,而mapPartition則不但要做轉換,程式員還需要手動操作如何傳回結果:
public class IteratePi {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
//疊代次數
int iterativeNum=10;
DataSet<Integer> wordList = env.fromElements(1, 2, 3);
IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum);
DataSet<Integer> mapResult=iterativeDataSet
.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
value += 1;
return value;
}
});
//疊代結束的條件
DataSet<Integer> result=iterativeDataSet.closeWith(mapResult);
result.print();
MapPartitionOperator<Integer, Integer> mapPartitionResult = iterativeDataSet
.mapPartition(new MapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
for (Integer value : values) {
// 這裡需要程式員自行決定如何傳回,即調用collect操作。
out.collect(value + 2);
}
} }
);
//疊代結束的條件
DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult);
partitionResult.print();
}
}
世界上很少有沒有來由的愛,也少見免費的午餐。mapPartition之是以高效,其所依賴的基礎就是Flink的傳輸機制。是以我們下面就講解下為什麼。
大家都知道,Spark是用微批處理來模拟流處理,就是說,spark還是一批一批的傳輸和處理資料,是以我們就能了解mapPartition的機制就是基于這一批資料做統一處理。這樣确實可以高效。
但是Flink号稱是純流,即Flink是每來一個輸入record,就進行一次業務處理,然後傳回給下遊算子。
有的兄弟就會産生疑問:每次都隻是處理單個記錄,怎麼能夠讓mapPartition做到批次處理呢。其實這就是Flink的微妙之處:即Flink确實是每次都處理一個輸入record,但是在上下遊傳輸時候,Flink還是把records累積起來做批量傳輸的。也可以這麼了解:從傳輸的角度講,Flink是微批處理的。
Flink 的網絡棧是組成 flink-runtime 子產品的核心元件之一,也是 Flink 作業的核心部分。所有來自 TaskManager 的工作單元(子任務)都通過它來互相連接配接。流式傳輸資料流都要經過網絡棧,是以它對 Flink 作業的性能表現(包括吞吐量和延遲名額)至關重要。與通過 Akka 使用 RPC 的 TaskManager 和 JobManager 之間的協調通道相比,TaskManager 之間的網絡棧依賴的是更底層的,基于 Netty 的 API。
一個運作的application的tasks在持續交換資料。TaskManager負責做資料傳輸。不同任務之間的每個(遠端)網絡連接配接将在 Flink 的網絡棧中獲得自己的 TCP 通道。但是如果同一任務的不同子任務被安排到了同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接配接将被多路複用,并共享一個 TCP 信道以減少資源占用。
每個TaskManager有一組網絡緩沖池(預設每個buffer是32KB),用于發送與接受資料。如發送端和接收端位于不同的TaskManager程序中,則它們需要通過作業系統的網絡棧進行交流。流應用需要以管道的模式進行資料交換,也就是說,每對TaskManager會維持一個永久的TCP連接配接用于做資料交換。在shuffle連接配接模式下(多個sender與多個receiver),每個sender task需要向每個receiver task發送資料,此時TaskManager需要為每個receiver task都配置設定一個緩沖區。
一個記錄被建立并傳遞之後(例如通過 Collector.collect()),它會被遞交到RecordWriter,其将來自 Java 對象的記錄序列化為一個位元組序列,後者最終成為網絡緩存。RecordWriter 首先使用SpanningRecordSerializer将記錄序列化為一個靈活的堆上位元組數組。然後它嘗試将這些位元組寫入目标網絡通道的關聯網絡緩存。
因為如果逐個發送會降低每個記錄的開銷并帶來更高的吞吐量,是以為了取得高吞吐量,TaskManager的網絡元件首先從緩沖buffer中收集records,然後再發送。也就是說,records并不是一個接一個的發送,而是先放入緩沖,然後再以batch的形式發送。這個技術可以高效使用網絡資源,并達到高吞吐。類似于網絡或磁盤 I/O 協定中使用的緩沖技術。
接收方網絡棧(netty)将接收到的緩存寫入适當的輸入通道。最後(流式)任務的線程從這些隊列中讀取并嘗試在RecordReader的幫助下,通過Deserializer将積累的資料反序列化為 Java 對象。
若sender與receiver任務都運作在同一個TaskManager程序,則sender任務會将發送的條目做序列化,并存入一個位元組緩沖。然後将緩沖放入一個隊列,直到隊列被填滿。
Receiver任務從隊列中擷取緩沖,并反序列化輸入的條目。是以,在同一個TaskManager内,任務之間的資料傳輸并不經過網絡互動。
即在同一個TaskManager程序内,也是批量傳輸。
我們基于Flink優化的結果進行分析驗證,看看Flink是不是把記錄寫入到buffer中,這種情況下運作的是CountingCollector和ChainedMapDriver。
copyFromSerializerToTargetChannel:153, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:116, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
當執行完使用者定義的map函數之後,系統運作在 ChainedMapDriver.collect 函數。
public void collect(IT record) {
this.outputCollector.collect(this.mapper.map(record));// mapper就是使用者代碼
}
然後調用到了CountingCollector.collect
public void collect(OUT record) {
this.collector.collect(record);// record就是使用者轉換後的記錄
}
OutputCollector.collect函數會把記錄發送給所有的writers。
this.delegate.setInstance(record);// 先把record設定到SerializationDelegate中
for (RecordWriter<SerializationDelegate<T>> writer : writers) { // 所有的writer
writer.emit(this.delegate); // 發送record
}
RecordWriter
負責把資料序列化,然後寫入到緩存中。它有兩個實作類:
-
: 維護了多個下遊channel,發送資料到下遊所有的channel中。BroadcastRecordWriter
-
: 通過ChannelSelectorRecordWriter
對象判斷資料需要發往下遊的哪個channel。我們用的正是這個channelSelector
。RecordWriter
這裡我們分析下
ChannelSelectorRecordWriter
的
emit
方法:
public void emit(T record) throws IOException, InterruptedException {
emit(record, channelSelector.selectChannel(record));
}
這裡使用了
channelSelector.selectChannel
方法。該方法為record尋找到對應下遊channel id。
public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
public final int selectChannel(SerializationDelegate<T> record) {
switch (strategy) {
case FORWARD:
return forward(); // 我們代碼用到了這種情況。這裡 return 0;
......
}
}
}
接下來我們又回到了父類
RecordWriter.emit
protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
serializer.serializeRecord(record);
// Make sure we don't hold onto the large intermediate serialization buffer for too long
if (copyFromSerializerToTargetChannel(targetChannel)) {
serializer.prune();
}
}
關鍵的邏輯在于
copyFromSerializerToTargetChannel
。此方法從序列化器中複制資料到目标channel,我們可以看出來,每條記錄都是寫入到buffer中。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
// We should reset the initial position of the intermediate serialization buffer before
// copying, so the serialization results can be copied to multiple target buffers.
// 此處Serializer為SpanningRecordSerializer
// reset方法将serializer内部的databuffer position重置為0
serializer.reset();
boolean pruneTriggered = false;
// 擷取目标channel的bufferBuilder
// bufferBuilder内維護了MemorySegment,即記憶體片段
// Flink的記憶體管理依賴MemorySegment,可實作堆内堆外記憶體的管理
// RecordWriter内有一個bufferBuilder數組,長度和下遊channel數目相同
// 該數組以channel ID為下标,存儲和channel對應的bufferBuilder
// 如果對應channel的bufferBuilder尚未建立,調用requestNewBufferBuilder申請一個新的bufferBuilder
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
// 複制serializer的資料到bufferBuilder中
SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
// 循環直到result完全被寫入到buffer
// 一條資料可能會被寫入到多個緩存中
// 如果緩存不夠用,會申請新的緩存
// 資料完全寫入完畢之時,目前正在操作的緩存是沒有寫滿的
// 是以傳回true,表明需要壓縮該buffer的空間
while (result.isFullBuffer()) {
finishBufferBuilder(bufferBuilder);
// If this was a full record, we are done. Not breaking out of the loop at this point
// will lead to another buffer request before breaking out (that would not be a
// problem per se, but it can lead to stalls in the pipeline).
if (result.isFullRecord()) {
pruneTriggered = true;
emptyCurrentBufferBuilder(targetChannel);
break;
}
bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
// 如果buffer逾時時間為0,需要flush目标channel的資料
if (flushAlways) {
flushTargetPartition(targetChannel);
}
return pruneTriggered;
}
Driver是Flink runtime的一個重要概念,是在一個task中運作的使用者業務邏輯元件,具體實作了批量操作代碼。其内部API包括初始化,清除,運作,取消等邏輯。
public interface Driver<S extends Function, OT> {
......
void setup(TaskContext<S, OT> context);
void run() throws Exception;
void cleanup() throws Exception;
void cancel() throws Exception;
}
具體在 org.apache.flink.runtime.operators 目錄下,我們能夠看到各種Driver的實作,基本的算子都有自己的Driver。
......
CoGroupDriver.java
FlatMapDriver.java
FullOuterJoinDriver.java
GroupReduceCombineDriver.java
GroupReduceDriver.java
JoinDriver.java
LeftOuterJoinDriver.java
MapDriver.java
MapPartitionDriver.java
......
map算子對應的就是MapDriver。
結合上節我們知道,上遊資料是通過batch方式批量傳入的。是以,在run函數會周遊輸入,每次取出一個record,然後調用使用者自定義函數function.map對這個record做map操作。
public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
@Override
public void run() throws Exception {
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
.....
else {
IT record = null;
// runtime主動進行循環,這樣導緻大量函數調用
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
output.collect(function.map(record)); // function是使用者函數
}
}
}
}
MapPartitionDriver是mapPartition的具體元件。系統會把得到的批量資料inIter一次性的都傳給使用者自定義函數,由使用者代碼來進行周遊操作。
public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> {
@Override
public void run() throws Exception {
final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);
......
} else {
final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());
// runtime不參與循環,這樣可以減少函數調用
function.mapPartition(inIter, output);
}
}
}
我們能夠看到map和mapPartition的input都是MutableObjectIterator input類型,說明兩者的輸入一緻。隻不過map是在Driver代碼中進行循環,mapPartition在使用者代碼中進行循環。具體mapPartition的 效率提高展現在如下方面 :
- 假設一共有60個資料需要轉換,map會在runtime中調用使用者函數60次。
- runtime把資料分成6個partition操作,則mapPartition在runtime中會調用使用者函數6次,在每個使用者函數中分别循環10次。對于runtime來說,map操作會多出54次使用者函數調用。
- 如果使用者業務中需要頻繁建立額外的對象或者外部資源操作,mapPartition的優勢更可以展現。 例如将資料寫入Mysql, 那麼map需要為每個元素建立一個資料庫連接配接,而mapPartition為每個partition建立一個連結。
假設有上億個資料需要map,這資源占用和運作速度效率差别會相當大。
之前提到了優化,這裡我們再詳細深入下如何優化map算子。
Flink有一個關鍵的優化技術稱為任務鍊,用于(在某些情況下)減少本地通信的過載。為了滿足任務鍊的條件,至少兩個以上的operator必須配置為同一并行度,并且使用本地向前的(local forwad)方式連接配接。任務鍊可以被認為是一種管道。
當管道以任務鍊的方式執行時候,Operators的函數被融合成單個任務,并由一個單獨的線程執行。一個function産生的records,通過使用一個簡單的方法調用,被遞交給下一個function。是以這裡在方法之間的records傳遞中,基本沒有序列化以及通信消耗。
針對優化後的Operator Chain,runtime對應的Driver則是ChainedMapDriver。這是通過
MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0)
, 映射得到的。
我們可以看到,因為是任務鍊,是以每個record是直接在管道中流淌 ,ChainedMapDriver連循環都省略了,直接map轉換後丢給下遊去也。
public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
private MapFunction<IT, OT> mapper; // 使用者函數
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.outputCollector.collect(this.mapper.map(record));
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
}
// 這時的調用棧如下
map:23, UserFunc$1 (com.alibaba.alink)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
map和mapPartition實作的基礎是Flink的資料傳輸機制 :Flink确實是每次都處理一個輸入record,但是在上下遊之間傳輸時候,Flink還是把records累積起來做批量傳輸。即可以認為從資料傳輸模型角度講,Flink是微批次的。
對于資料流轉換,因為是批量傳輸,是以對于積累的records,map是在runtime Driver代碼中進行循環,mapPartition在使用者代碼中進行循環。
map的函數調用次數要遠高于mapPartition。如果在使用者函數中涉及到頻繁建立額外的對象或者外部資源操作,則mapPartition性能遠遠高出。
如果沒有connection之類的操作,則通常性能差别并不大,通常不會成為瓶頸,也沒有想象的那麼嚴重。
深入了解 Flink 網絡棧 ——A Deep-Dive into Flink's Network Stack
Flink架構(二)- Flink中的資料傳輸
Flink 源碼之節點間通信