Flink提供了一個類似于Hadoop的分布式緩存,讓并行運作執行個體的函數可以在本地通路。這個功能可以被使用來分享外部靜态的資料,例如:機器學習的邏輯回歸模型等!
緩存的使用流程:
使用ExecutionEnvironment執行個體對本地的或者遠端的檔案(例如:HDFS上的檔案),為緩存檔案指定一個名字注冊該緩存檔案!當程式執行時候,Flink會自動将複制檔案或者目錄到所有worker節點的本地檔案系統中,函數可以根據名字去該節點的本地檔案系統中檢索該檔案!
和廣播變量的差別:
-廣播變量廣播的是程式中的變量(DataSet)資料,分布式緩存廣播的是檔案
-廣播變量将資料廣播到各個TaskManager的記憶體中,分布式緩存廣播到各個TaskManager的本地檔案系統
用法
使用Flink運作時環境的 registerCachedFile 注冊一個分布式緩存
在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 檔案名 )擷取分布式緩存
示例
建立一個 成績 資料集
List( (1, "國文", 50),(2, "數學", 70), (3, "英文", 86))
請通過分布式緩存擷取到學生姓名,将資料轉換為
List( ("張三", "國文", 50),("李四", "數學", 70), ("王五", "英文", 86))
注:資料\測試資料源\distribute_cache_student 檔案儲存了學生ID以及學生姓名
操作步驟
1. 将 distribute_cache_student 檔案上傳到HDFS / 目錄下
2. 擷取批處理運作環境
3. 建立成績資料集
4. 對 成績 資料集進行map轉換,将(學生ID, 學科, 分數)轉換為(學生姓名,學科,分數)
- RichMapFunction 的 open 方法中,擷取分布式緩存資料
- 在 map 方法中進行轉換
5. 實作 open 方法
- 使用 getRuntimeContext.getDistributedCache.getFile 擷取分布式緩存檔案
- 使用 Scala.fromFile 讀取檔案,并擷取行
- 将文本轉換為元組(學生ID,學生姓名),再轉換為List
6. 實作 map 方法
- 從分布式緩存中根據學生ID過濾出來學生
- 擷取學生姓名
- 建構最終結果元組
7. 列印測試
參考代碼
/**
* 分布式緩存
*/
public class DistributedCacheDemo {
public static void main(String[] args) throws Exception {
// Env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 準備分數資訊資料集
DataSource<Tuple3<Integer, String, Integer>> scoreInfoDataSet = env.fromElements(
Tuple3.of(1, "資料結構", 99),
Tuple3.of(2, "英語", 100),
Tuple3.of(3, "C++", 96),
Tuple3.of(5, "Java", 97),
Tuple3.of(3, "Scala", 100)
);
/*
分布式緩存和廣播變量的使用步驟基本差不多,有一點不同
1. 設定它, 使用env.registerCachedFile來注冊分布式緩存.
2. 使用它, 在算子内部調用getRuntimeContext.getDistributedCache.getFile(File)來擷取分布式緩存的檔案
*/
// 1. 注冊分布式緩存
env.registerCachedFile("data/input/distributed_student.txt", "student");
// 通過map方法來組合資料和擷取分布式緩存檔案内容
MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreInfoDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
// 定義一個map變量來接收學生資訊對象
final Map<Integer, String> map = new HashMap<Integer, String>();
// 通過open方法來獲得分布式緩存的檔案, 并将資料放入map中
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File distributedFile = getRuntimeContext().getDistributedCache().getFile("student");
BufferedReader bufferedReader = new BufferedReader(new FileReader(distributedFile));
// Lambda方式
// bufferedReader.lines().forEach((String line) -> {
// String[] elements = line.split(",");
// map.put(Integer.parseInt(elements[0]), elements[1]);
// });
// 普通方式
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String[] elements = line.split(",");
map.put(Integer.parseInt(elements[0]), elements[1]);
}
}
@Override
public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
return Tuple3.of(map.getOrDefault(value.f0, "未知學生姓名"), value.f1, value.f2);
}
});
result.print();
}
}