天天看点

Flink 的分布式缓存使用步骤详解

作者:散文随风想
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!

缓存的使用流程:

使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

和广播变量的区别:

-广播变量广播的是程序中的变量(DataSet)数据,分布式缓存广播的是文件

-广播变量将数据广播到各个TaskManager的内存中,分布式缓存广播到各个TaskManager的本地文件系统

用法

使用Flink运行时环境的 registerCachedFile 注册一个分布式缓存

在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存

示例

创建一个 成绩 数据集

List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))           

请通过分布式缓存获取到学生姓名,将数据转换为

List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))           

注:资料\测试数据源\distribute_cache_student 文件保存了学生ID以及学生姓名

操作步骤

1. 将 distribute_cache_student 文件上传到HDFS / 目录下

2. 获取批处理运行环境

3. 创建成绩数据集

4. 对 成绩 数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)

  • RichMapFunction 的 open 方法中,获取分布式缓存数据
  • 在 map 方法中进行转换

5. 实现 open 方法

  • 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
  • 使用 Scala.fromFile 读取文件,并获取行
  • 将文本转换为元组(学生ID,学生姓名),再转换为List

6. 实现 map 方法

  • 从分布式缓存中根据学生ID过滤出来学生
  • 获取学生姓名
  • 构建最终结果元组

7. 打印测试

参考代码

/**
 * 分布式缓存
 */
public class DistributedCacheDemo {
    public static void main(String[] args) throws Exception {
        // Env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 准备分数信息数据集
        DataSource<Tuple3<Integer, String, Integer>> scoreInfoDataSet = env.fromElements(
                Tuple3.of(1, "数据结构", 99),
                Tuple3.of(2, "英语", 100),
                Tuple3.of(3, "C++", 96),
                Tuple3.of(5, "Java", 97),
                Tuple3.of(3, "Scala", 100)
        );

        /*
        分布式缓存和广播变量的使用步骤基本差不多,有一点不同
        1. 设置它, 使用env.registerCachedFile来注册分布式缓存.
        2. 使用它, 在算子内部调用getRuntimeContext.getDistributedCache.getFile(File)来获取分布式缓存的文件
         */

        // 1. 注册分布式缓存
        env.registerCachedFile("data/input/distributed_student.txt", "student");

        // 通过map方法来组合数据和获取分布式缓存文件内容
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreInfoDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
            // 定义一个map变量来接收学生信息对象
            final Map<Integer, String> map = new HashMap<Integer, String>();

            // 通过open方法来获得分布式缓存的文件, 并将数据放入map中
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File distributedFile = getRuntimeContext().getDistributedCache().getFile("student");
                BufferedReader bufferedReader = new BufferedReader(new FileReader(distributedFile));
                // Lambda方式
//                bufferedReader.lines().forEach((String line) -> {
//                    String[] elements = line.split(",");
//                    map.put(Integer.parseInt(elements[0]), elements[1]);
//                });

                // 普通方式
                String line = null;
                while ((line = bufferedReader.readLine()) != null) {
                    String[] elements = line.split(",");
                    map.put(Integer.parseInt(elements[0]), elements[1]);
                }
            }

            @Override
            public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                return Tuple3.of(map.getOrDefault(value.f0, "未知学生姓名"), value.f1, value.f2);
            }
        });

        result.print();
    }
}           

继续阅读