水一篇 flink 1.14 上傳檔案的源碼流程
為了友善查找,用 ⭐️⭐️⭐️ 标注了。
上傳檔案核心源碼就是 io 流的讀寫。
1.10 和 1.14 大緻相同,隻有細微細節不同。
從網上借個圖。

// YarnClusterDescriptor 類裡面,有啟動 startAppMaster 方法。
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
...
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); // ⭐️⭐️⭐️
...
}
YarnLocalResourceDescriptor registerSingleLocalResource(
final String key,
final Path resourcePath,
final String relativeDstPath,
final LocalResourceType resourceType,
final boolean whetherToAddToRemotePaths,
final boolean whetherToAddToEnvShipResourceList)
throws IOException {
addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
if (Utils.isRemotePath(resourcePath.toString())) {
final FileStatus fileStatus = fileSystem.getFileStatus(resourcePath);
LOG.debug("Using remote file {} to register local resource", fileStatus.getPath());
final YarnLocalResourceDescriptor descriptor =
YarnLocalResourceDescriptor.fromFileStatus(
key, fileStatus, LocalResourceVisibility.APPLICATION, resourceType);
addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
localResources.put(key, descriptor.toLocalResource());
return descriptor;
}
final File localFile = new File(resourcePath.toUri().getPath());
final Tuple2<Path, Long> remoteFileInfo =
uploadLocalFileToRemote(resourcePath, relativeDstPath); // ⭐️⭐️⭐️
final YarnLocalResourceDescriptor descriptor =
new YarnLocalResourceDescriptor(
key,
remoteFileInfo.f0,
localFile.length(),
remoteFileInfo.f1,
LocalResourceVisibility.APPLICATION,
resourceType);
addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
localResources.put(key, descriptor.toLocalResource());
return descriptor;
}
private Path copyToRemoteApplicationDir(
final Path localSrcPath, final String relativeDstPath, final int replicationFactor)
throws IOException {
final Path applicationDir = getApplicationDirPath(homeDir, applicationId);
final String suffix =
(relativeDstPath.isEmpty() ? "" : relativeDstPath + "/") + localSrcPath.getName();
final Path dst = new Path(applicationDir, suffix);
LOG.debug(
"Copying from {} to {} with replication factor {}",
localSrcPath,
dst,
replicationFactor);
fileSystem.copyFromLocalFile(false, true, localSrcPath, dst); // ⭐️⭐️⭐️
fileSystem.setReplication(dst, (short) replicationFactor);
return dst;
}
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}
public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
FileStatus fileStatus = srcFS.getFileStatus(src);
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
}
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
Path src = srcStatus.getPath();
dst = checkDest(src.getName(), dstFS, dst, overwrite);
if (srcStatus.isDirectory()) {
checkDependencies(srcFS, src, dstFS, dst);
if (!dstFS.mkdirs(dst)) {
return false;
}
FileStatus contents[] = srcFS.listStatus(src);
for (int i = 0; i < contents.length; i++) {
copy(srcFS, contents[i], dstFS,
new Path(dst, contents[i].getPath().getName()),
deleteSource, overwrite, conf);
}
} else {
InputStream in=null;
OutputStream out = null;
try {
in = srcFS.open(src);
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true); // ⭐️⭐️⭐️
} catch (IOException e) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
throw e;
}
}
if (deleteSource) {
return srcFS.delete(src, true);
} else {
return true;
}
}
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
throws IOException {
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close);
}
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException {
try {
copyBytes(in, out, buffSize); // ⭐️⭐️⭐️
if(close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if(close) {
closeStream(out);
closeStream(in);
}
}
}
public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}