Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
和广播变量的区别:
-广播变量广播的是程序中的变量(DataSet)数据,分布式缓存广播的是文件
-广播变量将数据广播到各个TaskManager的内存中,分布式缓存广播到各个TaskManager的本地文件系统
用法
使用Flink运行时环境的 registerCachedFile 注册一个分布式缓存
在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存
示例
创建一个 成绩 数据集
List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))
请通过分布式缓存获取到学生姓名,将数据转换为
List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))
注:资料\测试数据源\distribute_cache_student 文件保存了学生ID以及学生姓名
操作步骤
1. 将 distribute_cache_student 文件上传到HDFS / 目录下
2. 获取批处理运行环境
3. 创建成绩数据集
4. 对 成绩 数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)
- RichMapFunction 的 open 方法中,获取分布式缓存数据
- 在 map 方法中进行转换
5. 实现 open 方法
- 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
- 使用 Scala.fromFile 读取文件,并获取行
- 将文本转换为元组(学生ID,学生姓名),再转换为List
6. 实现 map 方法
- 从分布式缓存中根据学生ID过滤出来学生
- 获取学生姓名
- 构建最终结果元组
7. 打印测试
参考代码
/**
* 分布式缓存
*/
public class DistributedCacheDemo {
public static void main(String[] args) throws Exception {
// Env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 准备分数信息数据集
DataSource<Tuple3<Integer, String, Integer>> scoreInfoDataSet = env.fromElements(
Tuple3.of(1, "数据结构", 99),
Tuple3.of(2, "英语", 100),
Tuple3.of(3, "C++", 96),
Tuple3.of(5, "Java", 97),
Tuple3.of(3, "Scala", 100)
);
/*
分布式缓存和广播变量的使用步骤基本差不多,有一点不同
1. 设置它, 使用env.registerCachedFile来注册分布式缓存.
2. 使用它, 在算子内部调用getRuntimeContext.getDistributedCache.getFile(File)来获取分布式缓存的文件
*/
// 1. 注册分布式缓存
env.registerCachedFile("data/input/distributed_student.txt", "student");
// 通过map方法来组合数据和获取分布式缓存文件内容
MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreInfoDataSet.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
// 定义一个map变量来接收学生信息对象
final Map<Integer, String> map = new HashMap<Integer, String>();
// 通过open方法来获得分布式缓存的文件, 并将数据放入map中
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File distributedFile = getRuntimeContext().getDistributedCache().getFile("student");
BufferedReader bufferedReader = new BufferedReader(new FileReader(distributedFile));
// Lambda方式
// bufferedReader.lines().forEach((String line) -> {
// String[] elements = line.split(",");
// map.put(Integer.parseInt(elements[0]), elements[1]);
// });
// 普通方式
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String[] elements = line.split(",");
map.put(Integer.parseInt(elements[0]), elements[1]);
}
}
@Override
public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
return Tuple3.of(map.getOrDefault(value.f0, "未知学生姓名"), value.f1, value.f2);
}
});
result.print();
}
}