天天看點

Spark1.6.0功能擴充——為HiveThriftServer2增加HA

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/beliefer/article/details/78549991

前言

HiveThriftServer2是Spark基于HiveServer2實作的多Session管理的Thrift服務,提供對Hive的集中式管理服務。HiveThriftServer2作為Yarn上的Application,目前隻支援yarn-client模式——即Driver運作在本地,ApplicationMaster運作在NodeManager所管理的Container中。yarn-client模式相較于yarn-cluster模式,在Driver和ApplicationMaster之間引入了額外的通信,因而服務的穩定性較低。

為了能夠提高HiveThriftServer2的可用性,打算部署兩個或者多個HiveThriftServer2執行個體,最終确定了選擇HA的解決方案。網上有關HiveThriftServer2的HA實作,主要借助了HAProxy、Nginx等提供的反向代理和負載均衡功能實作。這種方案有個問題,那就是使用者送出的執行SQL請求與HiveThriftServer2之間的連接配接一旦斷了,反向代理伺服器并不會主動将請求重定向到其他節點上,使用者必須再次送出請求,這時才會與其他HiveThriftServer2建立連接配接。這種方案,究其根本更像是負載均衡,無法保證SQL請求不丢失、重連、Master/Slave切換等機制。

為了解決以上問題,我選擇了第三種方案。

由于HiveThriftServer2本身繼承自HiveServer2,是以HiveServer2自帶的HA方案也能夠支援HiveThriftServer2。對于HiveServer2自帶的HA方案不熟悉的同學,可以百度一下,相關内容還是很多的。如果按照我的假設,就使用HiveServer2自帶的HA方案的話,你會發現我的假設是錯誤的——HiveThriftServer2居然不支援HA。這是為什麼呢?請讀者務必保持平常心,我們來一起研究研究。

注意:我這裡的Spark版本是1.6.0,Hive版本是1.2.1。

HiveServer2的HA分析

我從網上找到了一副能夠有效展示HiveServer2的HA原理的圖(具體來源無從考證)。

這幅圖檔很直覺的為我們介紹了HiveServer2的HA架構。整個架構實際上圍繞着ZooKeeper叢集,利用ZooKeeper提供的建立節點、檢索子節點等功能來實作。那麼ZooKeeper的HA是如何實作的呢?讓我們來進行源碼分析吧。

HiveServer2本身是由Java語言開發,熟悉Java應用(如Tomcat、Spark的Master和Worker、Yarn的ResourceManager和NodeManager等)的同學應該知道,任何的Java應用必須要有一個main class。HiveServer2這個Thrift服務的main class就是HiveServer2類。HiveServer2的main方法如代碼清單1所示。

代碼清單1 HiveServer2的main方法

public static void main(String[] args) {
    HiveConf.setLoadHiveServer2Config(true);
    try {
      ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
      ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);

      // 省略無關代碼

      // Call the executor which will execute the appropriate command based on the parsed options
      oprocResponse.getServerOptionsExecutor().execute();
    } catch (LogInitializationException e) {
      LOG.error("Error initializing log: " + e.getMessage(), e);
      System.exit(-1);
    }
  }           

上邊代碼中首先建立了ServerOptionsProcessor對象并對參數進行解析,parse方法解析完參數傳回了oprocResponse對象(類型為ServerOptionsProcessorResponse)。然後調用oprocResponse的getServerOptionsExecutor方法得到的對象實際為StartOptionExecutor。最後調用了StartOptionExecutor的execute方法。StartOptionExecutor的實作見代碼清單2。

代碼清單2 StartOptionExecutor的實作

static class StartOptionExecutor implements ServerOptionsExecutor {
    @Override
    public void execute() {
      try {
        startHiveServer2();
      } catch (Throwable t) {
        LOG.fatal("Error starting HiveServer2", t);
        System.exit(-1);
      }
    }
  }           

從代碼清單2看到,StartOptionExecutor的execute方法實際調用了startHiveServer2方法,startHiveServer2方法中與HA相關的代碼如下:

if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
          server.addServerInstanceToZooKeeper(hiveConf);
        }           

可以看到調用了HiveServer2的addServerInstanceToZooKeeper方法。這個addServerInstanceToZooKeeper的作用就是在指定的ZooKeeper叢集上建立持久化的父節點作為HA的命名空間,并建立持久化的節點将HiveServer2的執行個體資訊儲存到節點上(addServerInstanceToZooKeeper方法的實作細節留給感興趣的同學,自行閱讀)。ZooKeeper叢集如何指定?HA的命名空間又是什麼?大家先記着這兩個問題,最後在配置的時候,再告訴大家。

HiveThriftServer2為何不支援HiveServer2自帶的HA?

使用jps指令檢視HiveThriftServer2的程序資訊,如圖。

從程序資訊看到HiveThriftServer2是作為Driver送出的。那麼我們從HiveThriftServer2的main方法開始分析。

HiveThriftServer2的main方法(見代碼清單3)中建立了HiveServerServerOptionsProcessor對象,并調用了其process方法。

代碼清單3 HiveThriftServer2的main方法

def main(args: Array[String]) {
    val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2")
    if (!optionsProcessor.process(args)) {
      System.exit(-1)
    }

    // 省略無關代碼
      
  }           

HiveServerServerOptionsProcessor的實作見代碼清單4。

代碼清單4 HiveServerServerOptionsProcessor的實作

private[apache] class HiveServerServerOptionsProcessor(serverName: String)
    extends ServerOptionsProcessor(serverName) {

  def process(args: Array[String]): Boolean = {
    // A parse failure automatically triggers a system exit
    val response = super.parse(args)
    val executor = response.getServerOptionsExecutor()
    // return true if the parsed option was to start the service
    executor.isInstanceOf[StartOptionExecutor]
  }
}           

從代碼清單4看到,HiveServerServerOptionsProcessor繼承了我們前文所說的ServerOptionsProcessor,并增加了process方法。process方法中調用了父類ServerOptionsProcessor的parse方法解析參數,并得到類型為ServerOptionsProcessorResponse的response,之後調用了response的getServerOptionsExecutor方法得到對象executor(實際類型為StartOptionExecutor),最後隻是判斷executor的類型是否是StartOptionExecutor。

可以看到HiveServerServerOptionsProcessor的process方法,自始至終都沒有調用StartOptionExecutor的execute方法,進而也就無法完成向ZooKeeper叢集注冊服務,是以HiveThriftServer2沒能支援HA。

為HiveThriftServer2添加HA功能

由于HiveServer2的startHiveServer2方法是靜态私有方法,是以HiveThriftServer2不能夠直接調用。為了使得HiveThriftServer2能夠調用,隻能采用反射來實作。在HiveThriftServer2的main方法中添加反射調用addServerInstanceToZooKeeper方法的代碼見代碼清單5。

代碼清單5 反射調用addServerInstanceToZooKeeper方法

if (SparkSQLEnv.hiveContext.hiveconf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
        val method = server.getClass.getSuperclass.getDeclaredMethod("addServerInstanceToZooKeeper", classOf[org.apache.hadoop.hive.conf.HiveConf])
        method.setAccessible(true)
        method.invoke(server, SparkSQLEnv.hiveContext.hiveconf)
      }           

至此,我們的改造完成。

配置

既然通過修改源碼,HiveThriftServer2已經采用了HiveServer2的HA實作,是以就可以采用與HiveServer2相同的配置。在hive-site.xml檔案中添加以下配置:

<property>
  <name>hive.server2.support.dynamic.service.discovery</name>
  <value>true</value>
</property>
 
<property>
  <name>hive.server2.zookeeper.namespace</name>
  <value>hiveserver2_zk</value>
</property>
 
<property>
  <name>hive.zookeeper.quorum</name>
  <value>zkNode1:2181,zkNode2:2181,zkNode3:2181</value>
</property>
 
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
</property>           

以上配置中,各個配置項的含義為:

  • hive.server2.zookeeper.namespace:HiveServer2注冊到ZooKeeper叢集時,需要的命名空間。實際上,第一個有此配置的HiveServer2執行個體将在ZooKeeper叢集的根節點下建立以命名空間為名稱的持久化節點。
  • hive.server2.support.dynamic.service.discovery:是否開啟HiveServer2的動态服務發現。開啟此配置後,HiveServer2将向ZooKeeper叢集的命名空間節點下建立服務的資訊節點。
  • hive.zookeeper.quorum:ZooKeeper叢集的參與者清單。
  • hive.zookeeper.client.port:ZooKeeper叢集開放給用戶端使用的端口。

測試

我們啟動兩個HiveThriftServer2執行個體,然後打開ZooKeeper用戶端,就可以看到ZooKeeper叢集的根節點下名稱為hiveserver2_zk的持久化節點,如下圖所示。

我們檢視hiveserver2_zk節點下已經注冊的服務,如下圖所示。

使用beeline來測試,首先進入beeline,然後使用jdbc連接配接HiveThriftServer2,如下圖所示。

通過jdbc連接配接時使用的jdbc URL的格式為:jdbc:hive2://<zookeeper quorum>/<dbName>;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk

在連接配接的過程中需要輸入使用者名、密碼等資訊。最終beeline會通過hive-jdbc從多個HiveThriftServer2執行個體中選擇一個連接配接。

使用Java語言時,通過java jdbc也可以使用此HiveThriftServer2執行個體,隻不過需要的jdbc driver為org.apache.hive.jdbc.HiveDriver。

功能擴充

通過ZooKeeper叢集的服務發現,我們實作的HA實際跟HAProxy、Nginx等提供的負載均衡功能沒有太多差別。如果發生網絡逾時、連接配接斷開、執行失敗等情況時,我們的用戶端程式也會失敗。為了在發生以上異常時能夠進行重連、重試、選擇其他服務進行重連,這都需要用戶端代碼去實作。由于實作方式多種多樣,是以這裡就不具體羅列,隻将我個人實作的HiveThriftHAHelper類的各個關鍵功能進行介紹:

  • init:從jdbc URL中解析出必要的參數,例如zookeeper quorum、serviceDiscoveryMode、zooKeeperNamespace等。
  • getServerHosts:從ZooKeeper叢集擷取各個HiveThriftServer2執行個體的資訊,并進行緩存。
  • selectHost:從HiveThriftServer2執行個體中按照随機、輪詢、Master/Slave等多種政策選擇服務。
  • execute:選擇服務、執行SQL、異常重試等。

這樣我們在網絡逾時時可以進行重試、連接配接斷開時選擇其他節點進行重試、使用者不必反複送出SQL、支援負載均衡、支援Master/Slave等。

關于《Spark核心設計的藝術 架構設計與實作》

經過近一年的準備,基于Spark2.1.0版本的《Spark核心設計的藝術 架構設計與實作》一書現已出版發行,圖書如圖:

紙質版售賣連結如下:

京東:

https://item.jd.com/12302500.html

電子版售賣連結如下:

https://e.jd.com/30389208.html