内容簡介
-
- 一、協處理的基本概念
- 二、基于協處理器的行為表操作行為監控
-
-
- 0.實戰内容與思路
- 1.在HBase中建立兩張表用于測試
- 2. 建立普通的Java工程引入Maven支援
- 3.建立MyRegionObserver類,繼承BaseRegionObserver
- 4.将代碼打包并送出到HBase叢集
- 5.配置Hbase的配置檔案并重新開機HBase
- 6.測試并檢視行為監控
-
- 三、總結
一、協處理的基本概念
- 使用用戶端的API,配合過濾器可以對資料進行限制,使得傳回用戶端的資料更加精确。如果更近一步,将資料的某些處理流程直接放到服務端執行,然後僅僅傳回一個小的處理結果集,類似于在服務端開啟一個小型的MapReduce作業來分布式處理資料,這就是HBase的協處理器機制。
- 協處理器允許使用者在region伺服器執行自己的代碼,即在允許使用者執行region級别的操作。協處理器分為兩大類:observer與endpoint。observer類似于關系型資料庫中的觸發器,endpoint則類似于關系型資料庫中的存儲過程,簡單了解就是observer是一個觀察者,當某些行為比如插入資料發生或者結束時,可以對應做出何種行為,屬于被動調用,而endpoint則類似于調用一個函數,或者自定義的代碼,屬于主動調用。
-
observer分為三類:
(1).RegionObserver:使用者可以利用這種協處理器處理資料修改事件,與表的region緊密相連,是針對表資料做出修改時的協處理器。
(2).MasterObserver:可以被用于管理或DDL類型的操作,比如建立、修改表屬性等,這是針對叢集事件的協處理器。
(3).WALObserver:供控制WAL的鈎子函數。
不同的observer提供了針對本observer的回調函數,供不同的事件進行回調。
- endpoint:除了事件處理之外,有時候還需要将使用者的自定義操作添加到服務端,endpoint通過遠端過程調用來擴充RPC協定,與關系型資料庫的存儲過程功能相似。
二、基于協處理器的行為表操作行為監控
0.實戰内容與思路
- 本次案例HBase版本是hbase-1.2.0-cdh5.7.0,Hadoop版本是hadoop-2.6.0-cdh5.7.0 開發工具是IDEA2018。
- 實戰内容:在操作HBase的表的過程中,可以對某些操作,比如put,get操作進行監控,一旦發生了此類的行為,就把操作該行為的時間、操作的表名以及操作行為所涉及的RowKey記錄到一個HDFS上的檔案上。
- 自定義一個類來繼承BaseRegionObserver類,此類已經實作了所有的有關Region級别的所有方法,如果直接實作RegionObserver接口則要實作接口裡面的所有方法,非常多,預設情況下繼承BaseRegionObserver類沒有任何的功能,使用者則可以找對應要使用的方法進行重寫就可以了。然後将代碼打包并送出到HBase的叢集中,最後配置HBase并重新開機即可。
1.在HBase中建立兩張表用于測試
- 在Linux終端輸入指令
進入HBase的終端。執行指令hbase shell
來建立一張表名為table1,列族為info的表,再執行指令create 'table1','info'
來建立一張表名為table1,列族為info的表:create 'table2','info'
HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰
2. 建立普通的Java工程引入Maven支援
- 完整Maven依賴如下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <hadoop.version>2.6.0-cdh5.7.0</hadoop.version> <hbase.version>1.2.0-cdh5.7.0</hbase.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build> </project>
3.建立MyRegionObserver類,繼承BaseRegionObserver
- 在MyRegionObserver類中編寫HDFS檔案寫入方法,用于将協處理器生成的日志資訊寫入HDFS中。有一點需要注意,日志是不斷産生的,是以需要追加寫入,但是HDFS的API不支援追加寫入,是以會判斷日志檔案是否存在,若存在則會先建立一個輸入流,然後将舊的檔案内容複制到新檔案上,然後追加新日志。
private void outWrite(String str) { try { // 判斷檔案是否存在标志 boolean isExist = false; // 建立HDFS的輸入流 Configuration configuration = new Configuration(); // 初始化HDFS檔案系統寫入路徑,該路徑為你Hadoop叢集的URL位址 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),configuration); FSDataInputStream inputStream = null; // 如果存在該檔案則先讀取 // 該路徑為你存放日志檔案的HDFS目錄 if(fs.exists(new Path("/data/MyRegionObserver.txt"))){ inputStream = fs.open(new Path("/data/MyRegionObserver.txt")); isExist = true; } FSDataOutputStream outputStream = fs.create(new Path("/data/MyRegionObserver.txt"),true); // 将舊的行為日志複制到新檔案中 if(isExist) { IOUtils.copyBytes(inputStream, outputStream, 1024); } // 将新的行為資訊追加寫入HDFS outputStream.write((str + "\r\n").getBytes()); outputStream.close(); }catch (Exception e){ e.printStackTrace(); } }
- MyRegionObserver類中編寫日期格式方法,用于格式化表操作行為發生時的時間戳。
private String dateFormat(Date date){ String formatDate = "YYYY/MM/dd HH:mm:ss"; SimpleDateFormat dateFormat = new SimpleDateFormat(); dateFormat.applyPattern(formatDate); return dateFormat.format(date); }
- MyRegionObserver類完整代碼如下:
/** * 自定義協處理器,區域觀察者 * 對表的行為進行日志監控 */ public class MyRegionObserver extends BaseRegionObserver { private void outWrite(String str) { try { // 判斷檔案是否存在标志 boolean isExist = false; // 建立HDFS的輸入流 Configuration configuration = new Configuration(); // 初始化HDFS檔案系統寫入路徑,該路徑為你Hadoop叢集的URL位址 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),configuration); FSDataInputStream inputStream = null; // 如果存在該檔案則先讀取 // 該路徑為你存放日志檔案的HDFS目錄 if(fs.exists(new Path("/data/MyRegionObserver.txt"))){ inputStream = fs.open(new Path("/data/MyRegionObserver.txt")); isExist = true; } FSDataOutputStream outputStream = fs.create(new Path("/data/MyRegionObserver.txt"),true); // 将舊的行為日志複制到新檔案中 if(isExist) { IOUtils.copyBytes(inputStream, outputStream, 1024); } // 将新的行為資訊追加寫入HDFS outputStream.write((str + "\r\n").getBytes()); outputStream.close(); }catch (Exception e){ e.printStackTrace(); } } // 時間格式化工具類 private String dateFormat(Date date){ String formatDate = "YYYY/MM/dd HH:mm:ss"; SimpleDateFormat dateFormat = new SimpleDateFormat(); dateFormat.applyPattern(formatDate); return dateFormat.format(date); } @Override public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { super.preGetOp(e, get, results); String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString(); outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "開始執行Get操作,RowKey:" + new String(get.getRow())); } @Override public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { super.postGetOp(e, get, results); String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString(); outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "結束執行Get操作,RowKey:" + new String(get.getRow())); } @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super.prePut(e, put, edit, durability); String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString(); outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "開始執行Put操作,RowKey:" + new String(put.getRow())); } @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super.postPut(e, put, edit, durability); String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString(); outWrite("[" + dateFormat(new Date()) + "] " + "表" + tableName + "結束執行Put操作,RowKey:" + new String(put.getRow())); } }
4.将代碼打包并送出到HBase叢集
- 使用IDEA的Maven自帶的打包工具,或者直接使用Maven來打包也可以,以下是使用IDEA的Maven自帶的打包工具打包:
HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰 - 将打包好的jar包放到HBase所有節點的
路徑下。$HBASE_HOME/lib
5.配置Hbase的配置檔案并重新開機HBase
- 進入所有HBase節點的
目錄下,編輯hbase-site.xml檔案,加入如下配置:$HBASE_HOME/conf
value标簽的值為你編寫MyRegionObserver類的全類名。<property> <name>hbase.coprocessor.region.classes</name> <value>com.train.hbase.MyRegionObserver</value> </property>
- 確定jar包已經分發到所有HBase節點指定的目錄下,且配置檔案正确配置并分發到所有節點後,重新啟動HBase服務。
6.測試并檢視行為監控
- 進入HBase的終端,首先在表table1中插入一條資料:
然後讀取該資料:put 'table1','xiao-bang-zhu','info:column','12345'
再在表table2中插入一條資料:get 'table1','xiao-bang-zhu'
然後讀取該資料:put 'table2','xiao-bang-zhu','info:column','wewdd'
get 'table2','xiao-bang-zhu'
HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰 - 因為日志檔案在HDFS上,是以可以在終端直接輸入指令:
檢視日志檔案:hdfs dfs -cat 日志檔案的HDFS目錄
可以看到我們剛才的對table1和table2表的操作已經被詳細記錄到日志中,包括操作的具體時間和RowKey等。除此之外還有許多HBase内部的操作也會被記錄進去,比如我們執行get操作時會在meta表中尋找get操作RowKey所對應的region的位置,而meta表本身也在某個region中,是以也會觸發協處理器進行日志的記錄。HBase學習之路(八):HBase協處理器+Hadoop的表操作行為日志監控案例實戰
三、總結
- 本文簡單闡述了HBase中協處理器的概念,其核心思想可以概括為:将某些資料的處理工作直接交由服務端去完成,并将處理結果以小資料結果集的形式傳回給用戶端。事實上,協處理器的内容非常多,其中有更為細緻的知識,礙于篇幅,這裡沒有給出。過濾器、計數器、協處理器為HBase的三大進階特性,如果配合來使用可以完成很多複雜的業務。感謝你的閱讀,如有錯誤請不吝賜教!
- 更多内容請檢視 蕭邦主的技術部落格導航