第1章 HBase簡介
1.1 什麼是HBase
HBase的原型是Google的BigTable論文,受到了該論文思想的啟發,目前作為Hadoop的子項目來開發維護,用于支援結構化的資料存儲。
官方網站:http://hbase.apache.org
-- 2006年Google發表BigTable白皮書
-- 2006年開始開發HBase
-- 2008年北京成功開奧運會,程式員默默地将HBase弄成了Hadoop的子項目
-- 2010年HBase成為Apache頂級項目
-- 現在很多公司二次開發出了很多發行版本,你也開始使用了。
HBase是一個高可靠性、高性能、面向列、可伸縮的分布式存儲系統,利用HBASE技術可在廉價PC Server上搭建起大規模結構化存儲叢集。
HBase的目标是存儲并處理大型的資料,更具體來說是僅需使用普通的硬體配置,就能夠處理由成千上萬的行和列所組成的大型資料。
HBase是Google Bigtable的開源實作,但是也有很多不同之處。比如:Google Bigtable利用GFS作為其檔案存儲系統,HBase利用Hadoop HDFS作為其檔案存儲系統;Google運作MAPREDUCE來處理Bigtable中的海量資料,HBase同樣利用Hadoop MapReduce來處理HBase中的海量資料;Google Bigtable利用Chubby作為協同服務,HBase利用Zookeeper作為對應。
1.2 HBase特點
1)海量存儲
Hbase适合存儲PB級别的海量資料,在PB級别的資料以及采用廉價PC存儲的情況下,能在幾十到百毫秒内傳回資料。這與Hbase的極易擴充性息息相關。正式因為Hbase良好的擴充性,才為海量資料的存儲提供了便利。
2)列式存儲
這裡的列式存儲其實說的是列族存儲,Hbase是根據列族來存儲資料的。列族下面可以有非常多的列,列族在建立表的時候就必須指定。
3)極易擴充
Hbase的擴充性主要展現在兩個方面,一個是基于上層處理能力(RegionServer)的擴充,一個是基于存儲的擴充(HDFS)。
通過橫向添加RegionSever的機器,進行水準擴充,提升Hbase上層的處理能力,提升Hbsae服務更多Region的能力。
備注:RegionServer的作用是管理region、承接業務的通路,這個後面會詳細的介紹通過橫向添加Datanode的機器,進行存儲層擴容,提升Hbase的資料存儲能力和提升後端存儲的讀寫能力。
4)高并發
由于目前大部分使用Hbase的架構,都是采用的廉價PC,是以單個IO的延遲其實并不小,一般在幾十到上百ms之間。這裡說的高并發,主要是在并發的情況下,Hbase的單個IO延遲下降并不多。能獲得高并發、低延遲的服務。
5)稀疏
稀疏主要是針對Hbase列的靈活性,在列族中,你可以指定任意多的列,在列資料為空的情況下,是不會占用存儲空間的。
1.3 HBase架構
Hbase架構如圖1所示:
圖1 HBase架構圖
從圖中可以看出Hbase是由Client、Zookeeper、Master、HRegionServer、HDFS等幾個元件組成,下面來介紹一下幾個元件的相關功能:
1)Client
Client包含了通路Hbase的接口,另外Client還維護了對應的cache來加速Hbase的通路,比如cache的.META.中繼資料的資訊。
2)Zookeeper
HBase通過Zookeeper來做master的高可用、RegionServer的監控、中繼資料的入口以及叢集配置的維護等工作。具體工作如下:
通過Zoopkeeper來保證叢集中隻有1個master在運作,如果master異常,會通過競争機制産生新的master提供服務
通過Zoopkeeper來監控RegionServer的狀态,當RegionSevrer有異常的時候,通過回調的形式通知Master RegionServer上下線的資訊
通過Zoopkeeper存儲中繼資料的統一入口位址
3)Hmaster
master節點的主要職責如下:
為RegionServer配置設定Region
維護整個叢集的負載均衡
維護叢集的中繼資料資訊
發現失效的Region,并将失效的Region配置設定到正常的RegionServer上
當RegionSever失效的時候,協調對應Hlog的拆分
4)HregionServer
HregionServer直接對接使用者的讀寫請求,是真正的"幹活"的節點。它的功能概括如下:
管理master為其配置設定的Region
處理來自用戶端的讀寫請求
負責和底層HDFS的互動,存儲資料到HDFS
負責Region變大以後的拆分
負責Storefile的合并工作
5)HDFS
HDFS為Hbase提供最終的底層資料存儲服務,同時為HBase提供高可用(Hlog存儲在HDFS)的支援,具體功能概括如下:
提供中繼資料和表資料的底層分布式存儲服務
資料多副本,保證的高可靠和高可用性
1.3 HBase中的角色
1.3.1 HMaster
功能
1.監控RegionServer
2.處理RegionServer故障轉移
3.進行中繼資料的變更
4.處理region的配置設定或轉移
5.在空閑時間進行資料的負載均衡
6.通過Zookeeper釋出自己的位置給用戶端
1.3.2 RegionServer
功能
1.負責存儲HBase的實際資料
2.處理配置設定給它的Region
3.重新整理緩存到HDFS
4.維護Hlog
5.執行壓縮
6.負責處理Region分片
1.2.3 其他元件
1.Write-Ahead logs
HBase的修改記錄,當對HBase讀寫資料的時候,資料不是直接寫進磁盤,它會在記憶體中保留一段時間(時間以及資料量門檻值可以設定)。但把資料儲存在記憶體中可能有更高的機率引起資料丢失,為了解決這個問題,資料會先寫在一個叫做Write-Ahead logfile的檔案中,然後再寫入記憶體中。是以在系統出現故障的時候,資料可以通過這個日志檔案重建。
2.Region
Hbase表的分片,HBase表會根據RowKey值被切分成不同的region存儲在RegionServer中,在一個RegionServer中可以有多個不同的region。
3.Store
HFile存儲在Store中,一個Store對應HBase表中的一個列族。
4.MemStore
顧名思義,就是記憶體存儲,位于記憶體中,用來儲存目前的資料操作,是以當資料儲存在WAL中之後,RegsionServer會在記憶體中存儲鍵值對。
5.HFile
這是在磁盤上儲存原始資料的實際的實體檔案,是實際的存儲檔案。StoreFile是以Hfile的形式存儲在HDFS的。
第2章 HBase安裝
2.1 Zookeeper正常部署
首先保證Zookeeper叢集的正常部署,并啟動之:
[[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
[[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
[[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start
2.2 Hadoop正常部署
Hadoop叢集的正常部署并啟動:
[[email protected] hadoop-2.7.2]$ sbin/start-dfs.sh
[[email protected] hadoop-2.7.2]$ sbin/start-yarn.sh
2.3 HBase的解壓
解壓HBase到指定目錄:
[[email protected] software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
2.4 HBase的配置檔案
修改HBase對應的配置檔案。
1)hbase-env.sh修改内容:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
2)hbase-site.xml修改内容:
<configuration> <property> <name>hbase.rootdir</name> <value>hdfs://hadoop102:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 0.98後的新變動,之前版本沒有.port,預設端口為60000 --> <property> <name>hbase.master.port</name> <value>16000</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/module/zookeeper-3.4.10/zkData</value> </property> </configuration> |
3)regionservers:
hadoop102 hadoop103 hadoop104 |
4)軟連接配接hadoop配置檔案到hbase:
[[email protected] module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml
/opt/module/hbase/conf/core-site.xml
[[email protected] module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
/opt/module/hbase/conf/hdfs-site.xml
2.5 HBase遠端發送到其他叢集
[[email protected] module]$ xsync hbase/
2.6 HBase服務的啟動
1.啟動方式1
[[email protected] hbase]$ bin/hbase-daemon.sh start master
[[email protected] hbase]$ bin/hbase-daemon.sh start regionserver
提示:如果叢集之間的節點時間不同步,會導緻regionserver無法啟動,抛出ClockOutOfSyncException異常。
修複提示:
a、同步時間服務
請參看幫助文檔:《尚矽谷大資料技術之Hadoop入門》
b、屬性:hbase.master.maxclockskew設定更大的值
<property> <name>hbase.master.maxclockskew</name> <value>180000</value> <description>Time difference of regionserver from master</description> </property> |
2.啟動方式2
[[email protected] hbase]$ bin/start-hbase.sh
對應的停止服務:
[[email protected] hbase]$ bin/stop-hbase.sh
2.7 檢視HBase頁面
啟動成功後,可以通過"host:port"的方式來通路HBase管理頁面,例如:
http://hadoop102:16010
第3章 HBase Shell操作
3.1 基本操作
1.進入HBase用戶端指令行
[[email protected] hbase]$ bin/hbase shell
2.檢視幫助指令
hbase(main):001:0> help
3.檢視目前資料庫中有哪些表
hbase(main):002:0> list
3.2 表的操作
1.建立表
hbase(main):002:0> create 'student','info'
2.插入資料到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3.掃描檢視表資料
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4.檢視表結構
hbase(main):011:0> describe 'student'
5.更新指定字段的資料
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6.檢視"指定行"或"指定列族:列"的資料
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7.統計表資料行數
hbase(main):021:0> count 'student'
8.删除資料
删除某rowkey的全部資料:
hbase(main):016:0> deleteall 'student','1001'
删除某rowkey的某一列資料:
hbase(main):017:0> delete 'student','1002','info:sex'
9.清空表資料
hbase(main):018:0> truncate 'student'
提示:清空表的操作順序為先disable,然後再truncate。
10.删除表
首先需要先讓該表為disable狀态:
hbase(main):019:0> disable 'student'
然後才能drop這個表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,會報錯:ERROR: Table student is enabled. Disable it first.
11.變更表資訊
将info列族中的資料存放3個版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
第4章 HBase資料結構
4.1 RowKey
與nosql資料庫們一樣,RowKey是用來檢索記錄的主鍵。通路HBASE table中的行,隻有三種方式:
1.通過單個RowKey通路
2.通過RowKey的range(正則)
3.全表掃描
RowKey行鍵 (RowKey)可以是任意字元串(最大長度是64KB,實際應用中長度一般為 10-100bytes),在HBASE内部,RowKey儲存為位元組數組。存儲時,資料按照RowKey的字典序(byte order)排序存儲。設計RowKey時,要充分排序存儲這個特性,将經常一起讀取的行存儲放到一起。(位置相關性)
4.2 Column Family
列族:HBASE表中的每個列,都歸屬于某個列族。列族是表的schema的一部 分(而列不是),必須在使用表之前定義。列名都以列族作為字首。例如 courses:history,courses:math都屬于courses 這個列族。
4.3 Cell
由{rowkey, column Family:columu, version} 唯一确定的單元。cell中的資料是沒有類型的,全部是位元組碼形式存貯。
關鍵字:無類型、位元組碼
4.4 Time Stamp
HBASE 中通過rowkey和columns确定的為一個存貯單元稱為cell。每個 cell都儲存 着同一份資料的多個版本。版本通過時間戳來索引。時間戳的類型是 64位整型。時間戳可以由HBASE(在資料寫入時自動 )指派,此時時間戳是精确到毫秒 的目前系統時間。時間戳也可以由客戶顯式指派。如果應用程式要避免資料版 本沖突,就必須自己生成具有唯一性的時間戳。每個 cell中,不同版本的資料按照時間倒序排序,即最新的資料排在最前面。
為了避免資料存在過多版本造成的的管理 (包括存貯和索引)負擔,HBASE提供 了兩種資料版本回收方式。一是儲存資料的最後n個版本,二是儲存最近一段 時間内的版本(比如最近七天)。使用者可以針對每個列族進行設定。
4.5 命名空間
命名空間的結構:
1) Table:表,所有的表都是命名空間的成員,即表必屬于某個命名空間,如果沒有指定,則在default預設的命名空間中。
2) RegionServer group:一個命名空間包含了預設的RegionServer Group。
3) Permission:權限,命名空間能夠讓我們來定義通路控制清單ACL(Access Control List)。例如,建立表,讀取表,删除,更新等等操作。
4) Quota:限額,可以強制一個命名空間可包含的region的數量。
第5章 HBase原理
5.1 讀流程
HBase讀資料流程如圖3所示
圖3所示 HBase讀資料流程
1)Client先通路zookeeper,從meta表讀取region的位置,然後讀取meta表中的資料。meta中又存儲了使用者表的region資訊;
2)根據namespace、表名和rowkey在meta表中找到對應的region資訊;
3)找到這個region對應的regionserver;
4)查找對應的region;
5)先從MemStore找資料,如果沒有,再到BlockCache裡面讀;
6)BlockCache還沒有,再到StoreFile上讀(為了讀取的效率);
7)如果是從StoreFile裡面讀取的資料,不是直接傳回給用戶端,而是先寫入BlockCache,再傳回給用戶端。
5.2 寫流程
Hbase寫流程如圖2所示
圖2 HBase寫資料流程
1)Client向HregionServer發送寫請求;
2)HregionServer将資料寫到HLog(write ahead log)。為了資料的持久化和恢複;
3)HregionServer将資料寫到記憶體(MemStore);
4)回報Client寫成功。
5.3 資料Flush過程
1)當MemStore資料達到門檻值(預設是128M,老版本是64M),将資料刷到硬碟,将記憶體中的資料删除,同時删除HLog中的曆史資料;
2)并将資料存儲到HDFS中;
3)在HLog中做标記點。
5.4 資料合并過程
1)當資料塊達到4塊,Hmaster觸發合并操作,Region将資料塊加載到本地,進行合并;
2)當合并的資料超過256M,進行拆分,将拆分後的Region配置設定給不同的HregionServer管理;
3)當HregionServer當機後,将HregionServer上的hlog拆分,然後配置設定給不同的HregionServer加載,修改.META.;
4)注意:HLog會同步到HDFS。
第6章 HBase API操作
6.1 環境準備
建立項目後在pom.xml中添加依賴:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> |
6.2 HBaseAPI
6.2.1 擷取Configuration對象
public static Configuration conf; static{ //使用HBaseConfiguration的單例方法執行個體化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.9.102"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } |
6.2.2 判斷表是否存在
public static boolean isTableExist(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //在HBase中管理、通路表需要先建立HBaseAdmin對象 //Connection connection = ConnectionFactory.createConnection(conf); //HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); HBaseAdmin admin = new HBaseAdmin(conf); return admin.tableExists(tableName); } |
6.2.3 建立表
public static void createTable(String tableName, String... columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ HBaseAdmin admin = new HBaseAdmin(conf); //判斷表是否存在 if(isTableExist(tableName)){ System.out.println("表" + tableName + "已存在"); //System.exit(0); }else{ //建立表屬性對象,表名需要轉位元組 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); //建立多個列族 for(String cf : columnFamily){ descriptor.addFamily(new HColumnDescriptor(cf)); } //根據對表的配置,建立表 admin.createTable(descriptor); System.out.println("表" + tableName + "建立成功!"); } } |
6.2.4 删除表
public static void dropTable(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ HBaseAdmin admin = new HBaseAdmin(conf); if(isTableExist(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("表" + tableName + "删除成功!"); }else{ System.out.println("表" + tableName + "不存在!"); } } |
6.2.5 向表中插入資料
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException{ //建立HTable對象 HTable hTable = new HTable(conf, tableName); //向表中插入資料 Put put = new Put(Bytes.toBytes(rowKey)); //向Put對象中組裝資料 put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); hTable.put(put); hTable.close(); System.out.println("插入資料成功"); } |
6.2.6 删除多行資料
public static void deleteMultiRow(String tableName, String... rows) throws IOException{ HTable hTable = new HTable(conf, tableName); List<Delete> deleteList = new ArrayList<Delete>(); for(String row : rows){ Delete delete = new Delete(Bytes.toBytes(row)); deleteList.add(delete); } hTable.delete(deleteList); hTable.close(); } |
6.2.7 擷取所有資料
public static void getAllRows(String tableName) throws IOException{ HTable hTable = new HTable(conf, tableName); //得到用于掃描region的對象 Scan scan = new Scan(); //使用HTable得到resultcanner實作類的對象 ResultScanner resultScanner = hTable.getScanner(scan); for(Result result : resultScanner){ Cell[] cells = result.rawCells(); for(Cell cell : cells){ //得到rowkey System.out.println("行鍵:" + Bytes.toString(CellUtil.cloneRow(cell))); //得到列族 System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } } |
6.2.8 擷取某一行資料
public static void getRow(String tableName, String rowKey) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); //get.setMaxVersions();顯示所有版本 //get.setTimeStamp();顯示指定時間戳的版本 Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行鍵:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("時間戳:" + cell.getTimestamp()); } } |
6.2.9 擷取某一行指定"列族:列"的資料
public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行鍵:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } |
6.3 MapReduce
通過HBase的相關JavaAPI,我們可以實作伴随HBase操作的MapReduce過程,比如使用MapReduce将資料從本地檔案系統導入到HBase的表中,比如我們從HBase中讀取一些原始資料後使用MapReduce做資料分析。
6.3.1 官方HBase-MapReduce
1.檢視HBase的MapReduce任務的執行
$ bin/hbase mapredcp
2.環境變量的導入
(1)執行環境變量的導入(臨時生效,在指令行執行下述操作)
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile配置
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-2.7.2
并在hadoop-env.sh中配置:(注意:在for循環之後配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib
public void createTableContent(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//建立表表述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//建立列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//設定塊緩存
info.setBlockCacheEnabled(true);
//設定塊緩存大小
info.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
info.setMaxVersions(1);
info.setMinVersions(1);
content.addFamily(info);
admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.4 建立使用者關系表
表結構:
方法名 | createTableRelations |
Table Name | weibo:relations |
RowKey | 使用者ID |
ColumnFamily | attends、fans |
ColumnLabel | 關注使用者ID,粉絲使用者ID |
ColumnValue | 使用者ID |
Version | 1個版本 |
代碼:
public void createTableRelations(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
//關注的人的列族
HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
//設定塊緩存
attends.setBlockCacheEnabled(true);
//設定塊緩存大小
attends.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉絲列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.5 建立微網誌收件箱表
表結構:
方法名 | createTableReceiveContentEmails |
Table Name | weibo:receive_content_email |
RowKey | 使用者ID |
ColumnFamily | info |
ColumnLabel | 使用者ID |
ColumnValue | 取微網誌内容的RowKey |
Version | 1000 |
代碼:
public void createTableReceiveContentEmail(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
info.setBlockCacheEnabled(true);
info.setBlocksize(2097152);
info.setMaxVersions(1000);
info.setMinVersions(1000);
receive_content_email.addFamily(info);;
admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.6 釋出微網誌内容
a、微網誌内容表中添加1條資料
b、微網誌收件箱表對所有粉絲使用者添加資料
代碼:Message.java
package com.atguigu.weibo;
public class Message {
private String uid;
private String timestamp;
private String content;
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
}
}
代碼:public void publishContent(String uid, String content)
public void publishContent(String uid, String content){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、微網誌内容表中添加1條資料,首先擷取微網誌内容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
//組裝Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
contentTBL.put(put);
//b、向微網誌收件箱表中加入釋出的Rowkey
//b.1、查詢使用者關系表,得到目前使用者有哪些粉絲
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//b.2、取出目标資料
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<byte[]>();
//周遊取出目前釋出微網誌的使用者的所有粉絲資料
for(Cell cell : result.rawCells()){
fans.add(CellUtil.cloneQualifier(cell));
}
//如果該使用者沒有粉絲,則直接return
if(fans.size() <= 0) return;
//開始操作收件箱表
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
List<Put> puts = new ArrayList<Put>();
for(byte[] fan : fans){
Put fanPut = new Put(fan);
fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
puts.add(fanPut);
}
recTBL.put(puts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.7 添加關注使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者添加新關注的好友
b、在微網誌使用者關系表中,對被關注的使用者添加新的粉絲
c、微網誌收件箱表中添加所關注的使用者釋出的微網誌
代碼實作:public void addAttends(String uid, String... attends)
public void addAttends(String uid, String... attends){
//參數過濾
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
return;
}
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//使用者關系表操作對象(連接配接到使用者關系表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
List<Put> puts = new ArrayList<Put>();
//a、在微網誌使用者關系表中,添加新關注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends){
//為目前使用者添加關注的人
attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b、為被關注的人,添加粉絲
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
//将所有關注的人一個一個的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);
//c.1、微網誌收件箱添加關注的使用者釋出的微網誌内容(content)的rowkey
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
Scan scan = new Scan();
//用于存放取出來的關注的人所釋出的微網誌的rowkey
List<byte[]> rowkeys = new ArrayList<byte[]>();
for(String attend : attends){
//過濾掃描rowkey,即:前置位比對被關注的人的uid_
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
//為掃描對象指定過濾規則
scan.setFilter(filter);
//通過掃描對象得到scanner
ResultScanner result = contentTBL.getScanner(scan);
//疊代器周遊掃描出來的結果集
Iterator<Result> iterator = result.iterator();
while(iterator.hasNext()){
//取出每一個符合掃描結果的那一行資料
Result r = iterator.next();
for(Cell cell : r.rawCells()){
//将得到的rowkey放置于集合容器中
rowkeys.add(CellUtil.cloneRow(cell));
}
}
}
//c.2、将取出的微網誌rowkey放置于目前操作使用者的收件箱中
if(rowkeys.size() <= 0) return;
//得到微網誌收件箱表的操作對象
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//用于存放多個關注的使用者的釋出的多條微網誌rowkey資訊
List<Put> recPuts = new ArrayList<Put>();
for(byte[] rk : rowkeys){
Put put = new Put(Bytes.toBytes(uid));
//uid_timestamp
String rowKey = Bytes.toString(rk);
//借取uid
String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
//将微網誌rowkey添加到指定單元格中
put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
recPuts.add(put);
}
recTBL.put(recPuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
8.2.8 移除(取關)使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者移除取關的好友(attends)
b、在微網誌使用者關系表中,對被取關的使用者移除粉絲
c、微網誌收件箱中删除取關的使用者釋出的微網誌
代碼:public void removeAttends(String uid, String... attends)
public void removeAttends(String uid, String... attends){
//過濾資料
if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、在微網誌使用者關系表中,删除已關注的好友
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//待删除的使用者關系表中的所有資料
List<Delete> deletes = new ArrayList<Delete>();
//目前取關操作者的uid對應的Delete對象
Delete attendDelete = new Delete(Bytes.toBytes(uid));
//周遊取關,同時每次取關都要将被取關的人的粉絲-1
for(String attend : attends){
attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
//b
Delete fansDelete = new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendDelete);
relationsTBL.delete(deletes);
//c、删除取關的人的微網誌rowkey 從 收件箱表中
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
Delete recDelete = new Delete(Bytes.toBytes(uid));
for(String attend : attends){
recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
}
recTBL.delete(recDelete);
} catch (IOException e) {
e.printStackTrace();
}
}
8.2.9 擷取關注的人的微網誌内容
a、從微網誌收件箱中擷取所關注的使用者的微網誌RowKey
b、根據擷取的RowKey,得到微網誌内容
代碼實作:public List<Message> getAttendsContent(String uid)
public List<Message> getAttendsContent(String uid){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//a、從收件箱中取得微網誌rowKey
Get get = new Get(Bytes.toBytes(uid));
//設定最大版本号
get.setMaxVersions(5);
List<byte[]> rowkeys = new ArrayList<byte[]>();
Result result = recTBL.get(get);
for(Cell cell : result.rawCells()){
rowkeys.add(CellUtil.cloneValue(cell));
}
//b、根據取出的所有rowkey去微網誌内容表中檢索資料
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
List<Get> gets = new ArrayList<Get>();
//根據rowkey取出對應微網誌的具體内容
for(byte[] rk : rowkeys){
Get g = new Get(rk);
gets.add(g);
}
//得到所有的微網誌内容的result對象
Result[] results = contentTBL.get(gets);
List<Message> messages = new ArrayList<Message>();
for(Result res : results){
for(Cell cell : res.rawCells()){
Message message = new Message();
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String userid = rowKey.substring(0, rowKey.indexOf("_"));
String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
String content = Bytes.toString(CellUtil.cloneValue(cell));
message.setContent(content);
message.setTimestamp(timestamp);
message.setUid(userid);
messages.add(message);
}
}
return messages;
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
8.2.10 測試
-- 測試釋出微網誌内容
public void testPublishContent(WeiBo wb)
-- 測試添加關注
public void testAddAttend(WeiBo wb)
-- 測試取消關注
public void testRemoveAttend(WeiBo wb)
-- 測試展示内容
public void testShowMessage(WeiBo wb)
代碼:
public void testPublishContent(WeiBo wb){
wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!");
wb.publishContent("0001", "今天天氣不錯。");
}
public void testAddAttend(WeiBo wb){
wb.publishContent("0008", "準備下課!");
wb.publishContent("0009", "準備關機!");
wb.addAttends("0001", "0008", "0009");
}
public void testRemoveAttend(WeiBo wb){
wb.removeAttends("0001", "0008");
}
public void testShowMessage(WeiBo wb){
List<Message> messages = wb.getAttendsContent("0001");
for(Message message : messages){
System.out.println(message);
}
}
public static void main(String[] args) {
WeiBo weibo = new WeiBo();
weibo.initTable();
weibo.testPublishContent(weibo);
weibo.testAddAttend(weibo);
weibo.testShowMessage(weibo);
weibo.testRemoveAttend(weibo);
weibo.testShowMessage(weibo);
}
第9章 擴充
9.1 HBase在商業項目中的能力
每天:
1) 消息量:發送和接收的消息數超過60億
2) 将近1000億條資料的讀寫
3) 高峰期每秒150萬左右操作
4) 整體讀取資料占有約55%,寫入占有45%
5) 超過2PB的資料,涉及備援共6PB資料
6) 資料每月大概增長300千兆位元組。
9.2 布隆過濾器
在日常生活中,包括在設計計算機軟體時,我們經常要判斷一個元素是否在一個集合中。比如在字處理軟體中,需要檢查一個英語單詞是否拼寫正确(也就是要判斷它是否在已知的字典中);在 FBI,一個嫌疑人的名字是否已經在嫌疑名單上;在網絡爬蟲裡,一個網址是否被通路過等等。最直接的方法就是将集合中全部的元素存在計算機中,遇到一個新元素時,将它和集合中的元素直接比較即可。一般來講,計算機中的集合是用哈希表(hash table)來存儲的。它的好處是快速準确,缺點是費存儲空間。當集合比較小時,這個問題不顯著,但是當集合巨大時,哈希表存儲效率低的問題就顯現出來了。比如說,一個像 Yahoo,Hotmail 和 Gmai 那樣的公衆電子郵件(email)提供商,總是需要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的 email 位址。由于那些發送者不停地在注冊新的位址,全世界少說也有幾十億個發垃圾郵件的位址,将他們都存起來則需要大量的網絡伺服器。如果用哈希表,每存儲一億個 email 位址, 就需要 1.6GB 的記憶體(用哈希表實作的具體辦法是将每一個 email 位址對應成一個八位元組的資訊指紋googlechinablog.com/2006/08/blog-post.html,然後将這些資訊指紋存入哈希表,由于哈希表的存儲效率一般隻有 50%,是以一個 email 位址需要占用十六個位元組。一億個位址大約要 1.6GB, 即十六億位元組的記憶體)。是以存貯幾十億個郵件位址可能需要上百 GB 的記憶體。除非是超級計算機,一般伺服器是無法存儲的。
布隆過濾器隻需要哈希表 1/8 到 1/4 的大小就能解決同樣的問題。
Bloom Filter是一種空間效率很高的随機資料結構,它利用位數組很簡潔地表示一個集合,并能判斷一個元素是否屬于這個集合。Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬于某個集合時,有可能會把不屬于這個集合的元素誤認為屬于這個集合(false positive)。是以,Bloom Filter不适合那些"零錯誤"的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。
下面我們具體來看Bloom Filter是如何用位數組表示集合的。初始狀态時,Bloom Filter是一個包含m位的位數組,每一位都置為0,如圖9-5所示。
圖9-5
為了表達S={x1, x2,…,xn}這樣一個n個元素的集合,Bloom Filter使用k個互相獨立的哈希函數(Hash Function),它們分别将集合中的每個元素映射到{1,…,m}的範圍中。對任意一個元素x,第i個哈希函數映射的位置hi(x)就會被置為1(1≤i≤k)。注意,如果一個位置多次被置為1,那麼隻有第一次會起作用,後面幾次将沒有任何效果。如圖9-6所示,k=3,且有兩個哈希函數選中同一個位置(從左邊數第五位)。
圖9-6
在判斷y是否屬于這個集合時,我們對y應用k次哈希函數,如果所有hi(y)的位置都是1(1≤i≤k),那麼我們就認為y是集合中的元素,否則就認為y不是集合中的元素。如圖9-7所示y1就不是集合中的元素。y2或者屬于這個集合,或者剛好是一個false positive。
圖9-7
· 為了add一個元素,用k個hash function将它hash得到bloom filter中k個bit位,将這k個bit位置1。
· 為了query一個元素,即判斷它是否在集合中,用k個hash function将它hash得到k個bit位。若這k bits全為1,則此元素在集合中;若其中任一位不為1,則此元素比不在集合中(因為如果在,則在add時已經把對應的k個bits位置為1)。
· 不允許remove元素,因為那樣的話會把相應的k個bits位置為0,而其中很有可能有其他元素對應的位。是以remove會引入false negative,這是絕對不被允許的。
布隆過濾器決不會漏掉任何一個在黑名單中的可疑位址。但是,它有一條不足之處,也就是它有極小的可能将一個不在黑名單中的電子郵件位址判定為在黑名單中,因為有可能某個好的郵件位址正巧對應一個八個都被設定成一的二進制位。好在這種可能性很小,我們把它稱為誤識機率。
布隆過濾器的好處在于快速,省空間,但是有一定的誤識别率,常見的補救辦法是在建立一個小的白名單,存儲那些可能個别誤判的郵件位址。
布隆過濾器具體算法進階内容,如錯誤率估計,最優哈希函數個數計算,位數組大小計算,請參見http://blog.csdn.net/jiaomeng/article/details/1495500。
9.2 HBase2.0新特性
2017年8月22日淩晨2點左右,HBase釋出了2.0.0 alpha-2,相比于上一個版本,修複了500個更新檔,我們來了解一下2.0版本的HBase新特性。
最新文檔:
http://hbase.apache.org/book.html#ttl
官方釋出首頁:
http://mail-archives.apache.org/mod_mbox/www-announce/201708.mbox/<CA[email protected]
舉例:
1) region進行了多份備援
主region負責讀寫,從region維護在其他HregionServer中,負責讀以及同步主region中的資訊,如果同步不及時,是有可能出現client在從region中讀到了髒資料(主region還沒來得及把memstore中的變動的内容flush)。
2) 更多變動
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12340859&styleName=&projectId=12310753&Create=Create&atl_token=A5KQ-2QAV-T4JA-FDED%7Ce6f233490acdf4785b697d4b457f7adb0a72b69f%7Clout