Yarn源代碼分析之旅—ResourceManager—使用者互動之ResourceManagerAdministrationProtocol與AdminService
AdminService的作用
AdminService為管理者提供了一套獨立的服務接口,以防止大量的普通使用者的請求使得管理者發送的管理指令餓死。管理者可以通過這些接口指令管理叢集,比如動态更新節點清單,更新ACL清單,更新隊列資訊等等
AdminService的服務初始化
我們在ResourceManager.java下面可以看到,當ResourceManager啟動後,AdminService會初始化并加入到ResourceManager得serverList清單。
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
具體的AdminService初始化和啟動會做哪些工作,可以參考AdminService.java的源代碼實作。
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
// ResourceManager的8033端口
// ResourceManager 對管理者暴露的通路位址。管理者通過該位址向RM發送管理指令等
// ResourceManagerAdministrationProtocol協定
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
rmId = conf.get(YarnConfiguration.RM_HA_ID);
super.serviceInit(conf);
}
主要的,就是建構了一個端口位址綁定,綁定了ResourceManager的8033端口(具體的可以在配置檔案設定)
AdminService的服務啟動
AdminService啟動後,會建構一個RCP的Server端,監聽來自8033接口的請求
this.server = (Server) rpc.getServer(
ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
.....
this.server.start();
Server端與Client端之間傳遞的協定就是ResourceManagerAdministrationProtocol
AdminService的用戶端調用–RMAdminCLI
打開yarn.sh,我們可以清楚的看到如下代碼:
elif [ "$COMMAND" = "rmadmin" ] ; then
CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
也就是說,如果是管理者指令的話,RMAdminCLI會做處理,我們找到這個類的源檔案。
啟動後,會執行run方法,這個方法裡面會根據輸入的參數,找到并執行相應的方法,比如refreshQueues,refreshNodes等等。
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshQueuesRequest request =
recordFactory.newRecordInstance(RefreshQueuesRequest.class);
adminProtocol.refreshQueues(request);
實作也比較簡單,首先建立協定的代理,然後生成一個請求對象,然後調用Server端的遠端方法。