天天看點

Flink 的分布式緩存使用步驟詳解

作者:散文随風想
Flink提供了一個類似于Hadoop的分布式緩存,讓并行運作執行個體的函數可以在本地通路。這個功能可以被使用來分享外部靜态的資料,例如:機器學習的邏輯回歸模型等!

緩存的使用流程:

使用ExecutionEnvironment執行個體對本地的或者遠端的檔案(例如:HDFS上的檔案),為緩存檔案指定一個名字注冊該緩存檔案!當程式執行時候,Flink會自動将複制檔案或者目錄到所有worker節點的本地檔案系統中,函數可以根據名字去該節點的本地檔案系統中檢索該檔案!

和廣播變量的差別:

-廣播變量廣播的是程式中的變量(DataSet)資料,分布式緩存廣播的是檔案

-廣播變量将資料廣播到各個TaskManager的記憶體中,分布式緩存廣播到各個TaskManager的本地檔案系統

用法

使用Flink運作時環境的 registerCachedFile 注冊一個分布式緩存

在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 檔案名 )擷取分布式緩存

示例

建立一個 成績 資料集

List( (1, "國文", 50),(2, "數學", 70), (3, "英文", 86))           

請通過分布式緩存擷取到學生姓名,将資料轉換為

List( ("張三", "國文", 50),("李四", "數學", 70), ("王五", "英文", 86))           

注:資料\測試資料源\distribute_cache_student 檔案儲存了學生ID以及學生姓名

操作步驟

1. 将 distribute_cache_student 檔案上傳到HDFS / 目錄下

2. 擷取批處理運作環境

3. 建立成績資料集

4. 對 成績 資料集進行map轉換,将(學生ID, 學科, 分數)轉換為(學生姓名,學科,分數)

  • RichMapFunction 的 open 方法中,擷取分布式緩存資料
  • 在 map 方法中進行轉換

5. 實作 open 方法

  • 使用 getRuntimeContext.getDistributedCache.getFile 擷取分布式緩存檔案
  • 使用 Scala.fromFile 讀取檔案,并擷取行
  • 将文本轉換為元組(學生ID,學生姓名),再轉換為List

6. 實作 map 方法

  • 從分布式緩存中根據學生ID過濾出來學生
  • 擷取學生姓名
  • 建構最終結果元組

7. 列印測試

參考代碼

/**
 * 分布式緩存
 */
public class DistributedCacheDemo {
    public static void main(String[] args) throws Exception {
        // Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 準備分數資訊資料集
        DataSource<Tuple3<Integer, String, Integer>> scoreInfoDataSet = env.fromElements(
                Tuple3.of(1, "資料結構", 99),
                Tuple3.of(2, "英語", 100),
                Tuple3.of(3, "C++", 96),
                Tuple3.of(5, "Java", 97),
                Tuple3.of(3, "Scala", 100)
        );

        /*
        分布式緩存和廣播變量的使用步驟基本差不多,有一點不同
        1. 設定它, 使用env.registerCachedFile來注冊分布式緩存.
        2. 使用它, 在算子内部調用getRuntimeContext.getDistributedCache.getFile(File)來擷取分布式緩存的檔案
         */

        // 1. 注冊分布式緩存
        env.registerCachedFile("data/input/distributed_student.txt", "student");

        // 通過map方法來組合資料和擷取分布式緩存檔案内容
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreInfoDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
            // 定義一個map變量來接收學生資訊對象
            final Map<Integer, String> map = new HashMap<Integer, String>();

            // 通過open方法來獲得分布式緩存的檔案, 并将資料放入map中
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File distributedFile = getRuntimeContext().getDistributedCache().getFile("student");
                BufferedReader bufferedReader = new BufferedReader(new FileReader(distributedFile));
                // Lambda方式
//                bufferedReader.lines().forEach((String line) -> {
//                    String[] elements = line.split(",");
//                    map.put(Integer.parseInt(elements[0]), elements[1]);
//                });

                // 普通方式
                String line = null;
                while ((line = bufferedReader.readLine()) != null) {
                    String[] elements = line.split(",");
                    map.put(Integer.parseInt(elements[0]), elements[1]);
                }
            }

            @Override
            public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                return Tuple3.of(map.getOrDefault(value.f0, "未知學生姓名"), value.f1, value.f2);
            }
        });

        result.print();
    }
}           

繼續閱讀