天天看點

Flink源碼解析——資料源讀入原理

Flink是分布式并行計算架構,是以Flink程式内在是分布和并行的,其并行的特性可在下述代碼片段展現:

val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath)
    val data = text.flatMap(_.split(" "))
    var count1 = 0
    val counter1 = data.map{
      t =>
        count1 += 1
        (t, count1)
    }
    counter1.print()
           

其中

inputPath

檔案中存儲的内容為以下格式:

a
b c
d
           

該段代碼輸出為:

(b,1)
(c,2)
(a,1)
(d,1)
           

由以上簡單示例可知,Flink程式在讀入文本時是并行讀入的,送出Flink Job後,每一行資料為DataSet中的一個資料單元,由某一個TaskManager中的某一個slot進行計算,是以正常的累加操作是針對一個slot中需要處理的資料,無法對整體的資料進行累加操作。但是在程式設計過程中,發現一件很奇怪的事:如果不使用

env.readTextFile

讀取資料,而使用

env.fromElements

讀取資料,程式可以正常進行計數,其輸出結果為:

(a,1)
(b,2)
(c,3)
(d,4)
           

于是筆者檢視了Flink的源碼,發現

fromElements

的具體實作是這樣的:

def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
    require(data != null, "Data must not be null.")
    val typeInfo = implicitly[TypeInformation[T]]
    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
  }
           

它調用了

fromCollection

建立的dataSource,而

fromCollection

的具體實作是這樣的:

def fromCollection[T: ClassTag : TypeInformation](
      data: Iterable[T]): DataSet[T] = {
    require(data != null, "Data must not be null.")

    val typeInfo = implicitly[TypeInformation[T]]
    CollectionInputFormat.checkCollection(data.asJavaCollection, typeInfo.getTypeClass)
    val dataSource = new DataSource[T](
      javaEnv,
      new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer(getConfig)),
      typeInfo,
      getCallLocationName())
    wrap(dataSource)
  }
           

readTextFile

的具體實作是這樣的:

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String] = {
    require(filePath != null, "The file path may not be null.")
    val format = new TextInputFormat(new Path(filePath))
    format.setCharsetName(charsetName)
    val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO,
      getCallLocationName())
    wrap(source)
  }
           

比較

fromCollection

方法和

readTextFile

方法的具體實作,可以看出其大緻過程其實基本一緻,無非就是

new

一個

DataSource

然後傳回,但是可以看出其構造

DataSource

的參數類型有些不同,具體哪個參數類型有問題,我們可以繼續觀察

DataSource

類的構造函數,如下:

public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
		super(context, type);
		
		this.dataSourceLocationName = dataSourceLocationName;
		
		if (inputFormat == null) {
			throw new IllegalArgumentException("The input format may not be null.");
		}
		
		this.inputFormat = inputFormat;
		
		if (inputFormat instanceof NonParallelInput) {
			this.parallelism = 1;
		}
	}
           

OK,Flink源碼跟蹤到這基本要水落石出了,我們可以看出,構造函數中寫了一個

if

判斷,如果

inputFormat

NonParallelInput

接口的一個執行個體,則讀取資料的過程并行度設定為1。

fromElements

方法中的輸入類型參數為

CollectionInputFormat

,檢視該類實作了哪些接口,如下:

public class CollectionInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput {}
           

由此可見,

fromElements

方法之是以能夠對整體進行計數,是由于其底層實作将該過程的并行度設定為1。

綜上,我們如果需要使用

readTextFile

方法對資料進行有序讀取、計數,則可以根據Flink源碼中

fromElements

方法的實作思路,将讀取資料操作的并行度設定為1。當資料量龐大時,這樣的做法會可能會導緻計算從資料源處開始癱瘓,是以最好不要采用該種方法,代碼測試可以考慮采用該種方法。

那麼還有什麼方法可以在并行環境下對整體資料進行計數呢?可以參照很多種語言中都有的Static靜态變量的思路,靜态變量可以在它的作用域内,被所有類執行個體共享。是以可以考慮将用于計數的

count

變量設定為被整個Flink程式共享的一個變量,保證在任意TaskManager的任意的Slot中都是對同一個

count

變量進行更新。一開始考慮使用廣播變量将用于計數的

count

變量廣播到每一個并行度中,但廣播變量必須是

DataSet[T]

類型的算子,并且每一個Slot隻能對廣播變量進行通路,暫沒有找到可以修改廣播變量的方法,是以這個處理的想法夭折了。目前可考慮的方法隻有設定并行度或者通過文本預處理達到計數目的,若有新的想法會在部落格更新,也歡迎讨論。

繼續閱讀