天天看點

HDFS FileSystem使用的坑

在初次使用HDFS用戶端下載下傳檔案時,很容易寫出下面的代碼

FileSystem fileSystem = FileSystem.get(uri, conf, "hadoopuser");
// 使用fileSystem做操作
try (BufferedReader br =
                     new BufferedReader(
                         new InputStreamReader(fileSystem.open(new Path("/data.txt"))))) {
	// 讀取檔案
}
           

看起來還使用了

try-with-resource

, 以為最後fileSystem會被關閉,實際上

fileSystem.open()

傳回了

FSDataInputStream

後就不管了。

fileSystem

本身還是存在的,并沒有關閉。

如果

fileSystem

沒有關閉會出現什麼情況?

稍微跟一下代碼就會知道,

FileSystem

裡面有一個靜态的緩存Map,

/** FileSystem cache */
  static final Cache CACHE = new Cache();
           

如果采用預設配置:

fs.hadoopuser.impl.disable.cache

是true的,也就是開啟緩存,每次會從CACHE裡擷取。

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    ...
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }
           

而這個緩存裡的key構造如下:

FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }
    
    static class Key {
      final String scheme;
      final String authority;
      final UserGroupInformation ugi;
      final long unique;   // an artificial way to make a key unique

      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null ?
            "" : StringUtils.toLowerCase(uri.getScheme());
        authority = uri.getAuthority()==null ?
            "" : StringUtils.toLowerCase(uri.getAuthority());
        this.unique = unique;
        
        this.ugi = UserGroupInformation.getCurrentUser();
      }
      @Override
      public int hashCode() {
        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
      }
           

我們都知道map是根據hashcode來判斷key是否相同的,來前面的

scheme, authority,unique

都是一樣的,而

this.ugi = UserGroupInformation.getCurrentUser();

跟到最後發現,這個 ugi裡每次都new了一個新對象,是以,緩存CACHE會無限增加,最終OOM。

為什麼會出現這種情況?

其實,如果你隻是在一個方法裡完成上述過程,即便 FileSystem沒有被關閉,也會在方法結束後被GC給回收了。

問題就在于

FileSystem.get(uri, conf, "hadoopuser")

比較耗時,是以,程式裡一般會複用一個

FileSystem

, 我們以為每次擷取的是一個執行個體,誰知道是不同的,因為是靜态資源,是以得不到回收,這就導緻執行個體在記憶體堆積。

如何解決?

其實解決方法也很簡單:

  1. 每次使用後就關閉
  2. 修改代碼能夠利用緩存

方法1 就不說了。

方法2,肯定不是讓我們去修改 HDFS Client的代碼,而是修改應用代碼:

private FileSystem getFileSystem() throws IOException, URISyntaxException {
        Configuration conf = HadoopUtils.getHadoopConfiguration(hdfsConfig.getConfigDir());
        String hdfsPath = hdfsConfig.getPath();
        System.setProperty("HADOOP_USER_NAME", hadoopUser);
        return FileSystem.get(new URI(hdfsPath), conf);
    }
           

重點就在

System.setProperty("HADOOP_USER_NAME", hadoopUser);

, 之前我們将使用者傳進去,現在我們通過環境變量設定

HADOOP_USER_NAME

的方式傳使用者,問題就解決了。

為什麼?

實際上,我們的應用一般不會和Hadoop叢集部署在一起,是以一般是沒有這個環境變量的,而這個環境變量是Hadoop根據Java安全政策構造

ugi

的選擇,有了這個使用者,每次擷取到的

ugi

就是一樣的了。