第8章 HBase實戰之谷粒微網誌
8.1 需求分析
- 微網誌内容的浏覽,資料庫表設計
- 使用者社交展現:關注使用者,取關使用者
- 拉取關注的人的微網誌内容
8.2 代碼實作
8.2.1 代碼設計總覽:
- 建立命名空間以及表名的定義
- 建立微網誌内容表
- 建立使用者關系表
- 建立使用者微網誌内容接收郵件表
- 釋出微網誌内容
- 添加關注使用者
- 移除(取關)使用者
- 擷取關注的人的微網誌内容
- 測試
8.2.2 建立命名空間以及表名的定義
//擷取配置conf
private Configuration conf = HBaseConfiguration.create();
//微網誌内容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//使用者關系表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微網誌收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//命名空間類似于關系型資料庫中的schema,可以想象成檔案夾
NamespaceDescriptor weibo = NamespaceDescriptor
.create("weibo")
.addConfiguration("creator", "Jinji")
.addConfiguration("create_time", System.currentTimeMillis() + "")
.build();
admin.createNamespace(weibo);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.3 建立微網誌内容表
表結構:
public void createTableContent(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//建立表表述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//建立列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//設定塊緩存
info.setBlockCacheEnabled(true);
//設定塊緩存大小
info.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
info.setMaxVersions(1);
info.setMinVersions(1);
content.addFamily(info);
admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.4 建立使用者關系表
public void createTableRelations(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
//關注的人的列族
HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
//設定塊緩存
attends.setBlockCacheEnabled(true);
//設定塊緩存大小
attends.setBlocksize(2097152);
//設定壓縮方式
// info.setCompressionType(Algorithm.SNAPPY);
//設定版本确界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉絲列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.5 建立微網誌收件箱表
表結構
代碼示例
public void createTableReceiveContentEmail(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
info.setBlockCacheEnabled(true);
info.setBlocksize(2097152);
info.setMaxVersions(1000);
info.setMinVersions(1000);
receive_content_email.addFamily(info);;
admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.6 釋出微網誌内容
a、微網誌内容表中添加1條資料
b、微網誌收件箱表對所有粉絲使用者添加資料
代碼:Message.java
public class Message {
private String uid;
private String timestamp;
private String content;
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
}
}
代碼
public void publishContent(String uid, String content){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、微網誌内容表中添加1條資料,首先擷取微網誌内容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
//組裝Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
contentTBL.put(put);
//b、向微網誌收件箱表中加入釋出的Rowkey
//b.1、查詢使用者關系表,得到目前使用者有哪些粉絲
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//b.2、取出目标資料
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationsTBL.get(get);
List<byte[]> fans = new ArrayList<byte[]>();
//周遊取出目前釋出微網誌的使用者的所有粉絲資料
for(Cell cell : result.rawCells()){
fans.add(CellUtil.cloneQualifier(cell));
}
//如果該使用者沒有粉絲,則直接return
if(fans.size() <= 0) return;
//開始操作收件箱表
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
List<Put> puts = new ArrayList<Put>();
for(byte[] fan : fans){
Put fanPut = new Put(fan);
fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
puts.add(fanPut);
}
recTBL.put(puts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.7 添加關注使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者添加新關注的好友
b、在微網誌使用者關系表中,對被關注的使用者添加新的粉絲
c、微網誌收件箱表中添加所關注的使用者釋出的微網誌
代碼實作:public void addAttends(String uid, String… attends)
/**
* 關注使用者邏輯
* a、在微網誌使用者關系表中,對目前主動操作的使用者添加新的關注的好友
* b、在微網誌使用者關系表中,對被關注的使用者添加粉絲(目前操作的使用者)
* c、目前操作使用者的微網誌收件箱添加所關注的使用者釋出的微網誌rowkey
*/
public void addAttends(String uid, String... attends){
//參數過濾
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
return;
}
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//使用者關系表操作對象(連接配接到使用者關系表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
List<Put> puts = new ArrayList<Put>();
//a、在微網誌使用者關系表中,添加新關注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends){
//為目前使用者添加關注的人
attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b、為被關注的人,添加粉絲
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
//将所有關注的人一個一個的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);
//c.1、微網誌收件箱添加關注的使用者釋出的微網誌内容(content)的rowkey
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
Scan scan = new Scan();
//用于存放取出來的關注的人所釋出的微網誌的rowkey
List<byte[]> rowkeys = new ArrayList<byte[]>();
for(String attend : attends){
//過濾掃描rowkey,即:前置位比對被關注的人的uid_
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
//為掃描對象指定過濾規則
scan.setFilter(filter);
//通過掃描對象得到scanner
ResultScanner result = contentTBL.getScanner(scan);
//疊代器周遊掃描出來的結果集
Iterator<Result> iterator = result.iterator();
while(iterator.hasNext()){
//取出每一個符合掃描結果的那一行資料
Result r = iterator.next();
for(Cell cell : r.rawCells()){
//将得到的rowkey放置于集合容器中
rowkeys.add(CellUtil.cloneRow(cell));
}
}
}
//c.2、将取出的微網誌rowkey放置于目前操作使用者的收件箱中
if(rowkeys.size() <= 0) return;
//得到微網誌收件箱表的操作對象
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//用于存放多個關注的使用者的釋出的多條微網誌rowkey資訊
List<Put> recPuts = new ArrayList<Put>();
for(byte[] rk : rowkeys){
Put put = new Put(Bytes.toBytes(uid));
//uid_timestamp
String rowKey = Bytes.toString(rk);
//借取uid
String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
//将微網誌rowkey添加到指定單元格中
put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
recPuts.add(put);
}
recTBL.put(recPuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
8.2.8 移除(取關)使用者
a、在微網誌使用者關系表中,對目前主動操作的使用者移除取關的好友(attends)
b、在微網誌使用者關系表中,對被取關的使用者移除粉絲
c、微網誌收件箱中删除取關的使用者釋出的微網誌
代碼:public void removeAttends(String uid, String… attends)
/**
* 取消關注(remove)
* a、在微網誌使用者關系表中,對目前主動操作的使用者删除對應取關的好友
* b、在微網誌使用者關系表中,對被取消關注的人删除粉絲(目前操作人)
* c、從收件箱中,删除取關的人的微網誌的rowkey
*/
public void removeAttends(String uid, String... attends){
//過濾資料
if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、在微網誌使用者關系表中,删除已關注的好友
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
//待删除的使用者關系表中的所有資料
List<Delete> deletes = new ArrayList<Delete>();
//目前取關操作者的uid對應的Delete對象
Delete attendDelete = new Delete(Bytes.toBytes(uid));
//周遊取關,同時每次取關都要将被取關的人的粉絲-1
for(String attend : attends){
attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
//b
Delete fansDelete = new Delete(Bytes.toBytes(attend));
fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
deletes.add(fansDelete);
}
deletes.add(attendDelete);
relationsTBL.delete(deletes);
//c、删除取關的人的微網誌rowkey 從 收件箱表中
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
Delete recDelete = new Delete(Bytes.toBytes(uid));
for(String attend : attends){
recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
}
recTBL.delete(recDelete);
} catch (IOException e) {
e.printStackTrace();
}
}
8.2.9 擷取關注的人的微網誌内容
a、從微網誌收件箱中擷取所關注的使用者的微網誌RowKey
b、根據擷取的RowKey,得到微網誌内容
代碼實作:
public List<Message> getAttendsContent(String uid)
/**
* 擷取微網誌實際内容
* a、從微網誌收件箱中擷取所有關注的人的釋出的微網誌的rowkey
* b、根據得到的rowkey去微網誌内容表中得到資料
* c、将得到的資料封裝到Message對象中
*/
public List<Message> getAttendsContent(String uid){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
//a、從收件箱中取得微網誌rowKey
Get get = new Get(Bytes.toBytes(uid));
//設定最大版本号
get.setMaxVersions(5);
List<byte[]> rowkeys = new ArrayList<byte[]>();
Result result = recTBL.get(get);
for(Cell cell : result.rawCells()){
rowkeys.add(CellUtil.cloneValue(cell));
}
//b、根據取出的所有rowkey去微網誌内容表中檢索資料
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
List<Get> gets = new ArrayList<Get>();
//根據rowkey取出對應微網誌的具體内容
for(byte[] rk : rowkeys){
Get g = new Get(rk);
gets.add(g);
}
//得到所有的微網誌内容的result對象
Result[] results = contentTBL.get(gets);
List<Message> messages = new ArrayList<Message>();
for(Result res : results){
for(Cell cell : res.rawCells()){
Message message = new Message();
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String userid = rowKey.substring(0, rowKey.indexOf("_"));
String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
String content = Bytes.toString(CellUtil.cloneValue(cell));
message.setContent(content);
message.setTimestamp(timestamp);
message.setUid(userid);
messages.add(message);
}
}
return messages;
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}
8.2.10 測試
– 測試釋出微網誌内容
public void testPublishContent(WeiBo wb)
– 測試添加關注
public void testAddAttend(WeiBo wb)
public void testRemoveAttend(WeiBo wb)
public void testShowMessage(WeiBo wb)
/**
* 釋出微網誌内容
* 添加關注
* 取消關注
* 展示内容
*/
public void testPublishContent(WeiBo wb){
wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!");
wb.publishContent("0001", "今天天氣不錯。");
}
public void testAddAttend(WeiBo wb){
wb.publishContent("0008", "準備下課!");
wb.publishContent("0009", "準備關機!");
wb.addAttends("0001", "0008", "0009");
}
public void testRemoveAttend(WeiBo wb){
wb.removeAttends("0001", "0008");
}
public void testShowMessage(WeiBo wb){
List<Message> messages = wb.getAttendsContent("0001");
for(Message message : messages){
System.out.println(message);
}
}
public static void main(String[] args) {
WeiBo weibo = new WeiBo();
weibo.initTable();
weibo.testPublishContent(weibo);
weibo.testAddAttend(weibo);
weibo.testShowMessage(weibo);
weibo.testRemoveAttend(weibo);
weibo.testShowMessage(weibo);
}