天天看點

HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰

内容簡介

    • 一、協處理的基本概念
    • 二、基于協處理器的行為表操作行為監控
        • 0.實戰内容與思路
        • 1.在HBase中建立兩張表用于測試
        • 2. 建立普通的Java工程引入Maven支援
        • 3.建立MyRegionObserver類,繼承BaseRegionObserver
        • 4.将代碼打包并送出到HBase叢集
        • 5.配置Hbase的配置檔案并重新開機HBase
        • 6.測試并檢視行為監控
    • 三、總結

一、協處理的基本概念

  • 使用用戶端的API,配合過濾器可以對資料進行限制,使得傳回用戶端的資料更加精确。如果更近一步,将資料的某些處理流程直接放到服務端執行,然後僅僅傳回一個小的處理結果集,類似于在服務端開啟一個小型的MapReduce作業來分布式處理資料,這就是HBase的協處理器機制。
  • 協處理器允許使用者在region伺服器執行自己的代碼,即在允許使用者執行region級别的操作。協處理器分為兩大類:observer與endpoint。observer類似于關系型資料庫中的觸發器,endpoint則類似于關系型資料庫中的存儲過程,簡單了解就是observer是一個觀察者,當某些行為比如插入資料發生或者結束時,可以對應做出何種行為,屬于被動調用,而endpoint則類似于調用一個函數,或者自定義的代碼,屬于主動調用。
  • observer分為三類:

    (1).RegionObserver:使用者可以利用這種協處理器處理資料修改事件,與表的region緊密相連,是針對表資料做出修改時的協處理器。

    (2).MasterObserver:可以被用于管理或DDL類型的操作,比如建立、修改表屬性等,這是針對叢集事件的協處理器。

    (3).WALObserver:供控制WAL的鈎子函數。

    不同的observer提供了針對本observer的回調函數,供不同的事件進行回調。

  • endpoint:除了事件處理之外,有時候還需要将使用者的自定義操作添加到服務端,endpoint通過遠端過程調用來擴充RPC協定,與關系型資料庫的存儲過程功能相似。

二、基于協處理器的行為表操作行為監控

0.實戰内容與思路

  • 本次案例HBase版本是hbase-1.2.0-cdh5.7.0,Hadoop版本是hadoop-2.6.0-cdh5.7.0 開發工具是IDEA2018。
  • 實戰内容:在操作HBase的表的過程中,可以對某些操作,比如put,get操作進行監控,一旦發生了此類的行為,就把操作該行為的時間、操作的表名以及操作行為所涉及的RowKey記錄到一個HDFS上的檔案上。
  • 自定義一個類來繼承BaseRegionObserver類,此類已經實作了所有的有關Region級别的所有方法,如果直接實作RegionObserver接口則要實作接口裡面的所有方法,非常多,預設情況下繼承BaseRegionObserver類沒有任何的功能,使用者則可以找對應要使用的方法進行重寫就可以了。然後将代碼打包并送出到HBase的叢集中,最後配置HBase并重新開機即可。

1.在HBase中建立兩張表用于測試

  • 在Linux終端輸入指令

    hbase shell

    進入HBase的終端。執行指令

    create 'table1','info'

    來建立一張表名為table1,列族為info的表,再執行指令

    create 'table2','info'

    來建立一張表名為table1,列族為info的表:
    HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰

2. 建立普通的Java工程引入Maven支援

  • 完整Maven依賴如下:
    <properties>
    	    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	    <maven.compiler.source>1.7</maven.compiler.source>
    	    <maven.compiler.target>1.7</maven.compiler.target>
    	    <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
    	    <hbase.version>1.2.0-cdh5.7.0</hbase.version>
      </properties>
    
      <repositories>
    	    <repository>
    	      <id>cloudera</id>
    	      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    	    </repository>
    	  </repositories>
    	
    	  <dependencies>
    	    <dependency>
    	      <groupId>org.apache.hbase</groupId>
    	      <artifactId>hbase-client</artifactId>
    	      <version>${hbase.version}</version>
    	    </dependency>
    	
    	    <dependency>
    	      <groupId>org.apache.hbase</groupId>
    	      <artifactId>hbase-server</artifactId>
    	      <version>${hbase.version}</version>
    	    </dependency>
    	
    	    <dependency>
    	      <groupId>org.apache.hadoop</groupId>
    	      <artifactId>hadoop-client</artifactId>
    	      <version>${hadoop.version}</version>
    	    </dependency>
    	
    	    <dependency>
    	      <groupId>junit</groupId>
    	      <artifactId>junit</artifactId>
    	      <version>4.11</version>
    	    </dependency>
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
               

3.建立MyRegionObserver類,繼承BaseRegionObserver

  • 在MyRegionObserver類中編寫HDFS檔案寫入方法,用于将協處理器生成的日志資訊寫入HDFS中。有一點需要注意,日志是不斷産生的,是以需要追加寫入,但是HDFS的API不支援追加寫入,是以會判斷日志檔案是否存在,若存在則會先建立一個輸入流,然後将舊的檔案内容複制到新檔案上,然後追加新日志。
    private void outWrite(String str) {
        try {
            // 判斷檔案是否存在标志
            boolean isExist = false;
            // 建立HDFS的輸入流
            Configuration configuration = new Configuration();
            // 初始化HDFS檔案系統寫入路徑,該路徑為你Hadoop叢集的URL位址
            FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),configuration);
            FSDataInputStream inputStream = null;
            // 如果存在該檔案則先讀取
            // 該路徑為你存放日志檔案的HDFS目錄
            if(fs.exists(new Path("/data/MyRegionObserver.txt"))){
                 inputStream = fs.open(new Path("/data/MyRegionObserver.txt"));
                 isExist = true;
            }
            FSDataOutputStream outputStream = fs.create(new Path("/data/MyRegionObserver.txt"),true);
            // 将舊的行為日志複制到新檔案中
            if(isExist) {
                IOUtils.copyBytes(inputStream, outputStream, 1024);
            }
            // 将新的行為資訊追加寫入HDFS
            outputStream.write((str + "\r\n").getBytes());
            outputStream.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
               
  • MyRegionObserver類中編寫日期格式方法,用于格式化表操作行為發生時的時間戳。
    private String dateFormat(Date date){
        String formatDate = "YYYY/MM/dd HH:mm:ss";
        SimpleDateFormat dateFormat = new SimpleDateFormat();
        dateFormat.applyPattern(formatDate);
        return dateFormat.format(date);
    }
               
  • MyRegionObserver類完整代碼如下:
    /**
     * 自定義協處理器,區域觀察者
     * 對表的行為進行日志監控
     */
    public class MyRegionObserver extends BaseRegionObserver {
    
        private void outWrite(String str) {
            try {
                // 判斷檔案是否存在标志
                boolean isExist = false;
                // 建立HDFS的輸入流
                Configuration configuration = new Configuration();
                // 初始化HDFS檔案系統寫入路徑,該路徑為你Hadoop叢集的URL位址
                FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),configuration);
                FSDataInputStream inputStream = null;
                // 如果存在該檔案則先讀取
                // 該路徑為你存放日志檔案的HDFS目錄
                if(fs.exists(new Path("/data/MyRegionObserver.txt"))){
                     inputStream = fs.open(new Path("/data/MyRegionObserver.txt"));
                     isExist = true;
                }
                FSDataOutputStream outputStream = fs.create(new Path("/data/MyRegionObserver.txt"),true);
                // 将舊的行為日志複制到新檔案中
                if(isExist) {
                    IOUtils.copyBytes(inputStream, outputStream, 1024);
                }
                // 将新的行為資訊追加寫入HDFS
                outputStream.write((str + "\r\n").getBytes());
                outputStream.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        // 時間格式化工具類
        private String dateFormat(Date date){
            String formatDate = "YYYY/MM/dd HH:mm:ss";
            SimpleDateFormat dateFormat = new SimpleDateFormat();
            dateFormat.applyPattern(formatDate);
            return dateFormat.format(date);
        }
    
        @Override
        public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
            super.preGetOp(e, get, results);
            String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString();
            outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "開始執行Get操作,RowKey:" +
                    new String(get.getRow()));
        }
    
        @Override
        public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
            super.postGetOp(e, get, results);
            String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString();
            outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "結束執行Get操作,RowKey:" +
                    new String(get.getRow()));
        }
    
        @Override
        public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
            super.prePut(e, put, edit, durability);
            String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString();
            outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "開始執行Put操作,RowKey:" +
                    new String(put.getRow()));
        }
    
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
            super.postPut(e, put, edit, durability);
            String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString();
            outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "結束執行Put操作,RowKey:" +
                    new String(put.getRow()));
        }
    
    }
               

4.将代碼打包并送出到HBase叢集

  • 使用IDEA的Maven自帶的打包工具,或者直接使用Maven來打包也可以,以下是使用IDEA的Maven自帶的打包工具打包:
    HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰
  • 将打包好的jar包放到HBase所有節點的

    $HBASE_HOME/lib

    路徑下。

5.配置Hbase的配置檔案并重新開機HBase

  • 進入所有HBase節點的

    $HBASE_HOME/conf

    目錄下,編輯hbase-site.xml檔案,加入如下配置:
    <property>
            <name>hbase.coprocessor.region.classes</name>
            <value>com.train.hbase.MyRegionObserver</value>
    </property>
               
    value标簽的值為你編寫MyRegionObserver類的全類名。
  • 確定jar包已經分發到所有HBase節點指定的目錄下,且配置檔案正确配置并分發到所有節點後,重新啟動HBase服務。

6.測試并檢視行為監控

  • 進入HBase的終端,首先在表table1中插入一條資料:

    put 'table1','xiao-bang-zhu','info:column','12345'

    然後讀取該資料:

    get 'table1','xiao-bang-zhu'

    再在表table2中插入一條資料:

    put 'table2','xiao-bang-zhu','info:column','wewdd'

    然後讀取該資料:

    get 'table2','xiao-bang-zhu'

    HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰
  • 因為日志檔案在HDFS上,是以可以在終端直接輸入指令:

    hdfs dfs -cat 日志檔案的HDFS目錄

    檢視日志檔案:
    HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰
    可以看到我們剛才的對table1和table2表的操作已經被詳細記錄到日志中,包括操作的具體時間和RowKey等。除此之外還有許多HBase内部的操作也會被記錄進去,比如我們執行get操作時會在meta表中尋找get操作RowKey所對應的region的位置,而meta表本身也在某個region中,是以也會觸發協處理器進行日志的記錄。

三、總結

  • 本文簡單闡述了HBase中協處理器的概念,其核心思想可以概括為:将某些資料的處理工作直接交由服務端去完成,并将處理結果以小資料結果集的形式傳回給用戶端。事實上,協處理器的内容非常多,其中有更為細緻的知識,礙于篇幅,這裡沒有給出。過濾器、計數器、協處理器為HBase的三大進階特性,如果配合來使用可以完成很多複雜的業務。感謝你的閱讀,如有錯誤請不吝賜教!
  • 更多内容請檢視 蕭邦主的技術部落格導航