天天看點

快速學習-HBase實戰之谷粒微網誌

第8章 HBase實戰之谷粒微網誌

8.1 需求分析

  1. 微網誌内容的浏覽,資料庫表設計
  2. 使用者社交展現:關注使用者,取關使用者
  3. 拉取關注的人的微網誌内容

8.2 代碼實作

8.2.1 代碼設計總覽:

  1. 建立命名空間以及表名的定義
  2. 建立微網誌内容表
  3. 建立使用者關系表
  4. 建立使用者微網誌内容接收郵件表
  5. 釋出微網誌内容
  6. 添加關注使用者
  7. 移除(取關)使用者
  8. 擷取關注的人的微網誌内容
  9. 測試

8.2.2 建立命名空間以及表名的定義

//擷取配置conf
private Configuration conf = HBaseConfiguration.create();

//微網誌内容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//使用者關系表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微網誌收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		//命名空間類似于關系型資料庫中的schema,可以想象成檔案夾
		NamespaceDescriptor weibo = NamespaceDescriptor
				.create("weibo")
				.addConfiguration("creator", "Jinji")
				.addConfiguration("create_time", System.currentTimeMillis() + "")
				.build();
		admin.createNamespace(weibo);
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

8.2.3 建立微網誌内容表

表結構:

快速學習-HBase實戰之谷粒微網誌
public void createTableContent(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		//建立表表述
		HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
		//建立列族描述
		HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
		//設定塊緩存
		info.setBlockCacheEnabled(true);
		//設定塊緩存大小
		info.setBlocksize(2097152);
		//設定壓縮方式
//			info.setCompressionType(Algorithm.SNAPPY);
		//設定版本确界
		info.setMaxVersions(1);
		info.setMinVersions(1);
		
		content.addFamily(info);
		admin.createTable(content);
		
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

8.2.4 建立使用者關系表

快速學習-HBase實戰之谷粒微網誌
public void createTableRelations(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));
		
		//關注的人的列族
		HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends"));
		//設定塊緩存
		attends.setBlockCacheEnabled(true);
		//設定塊緩存大小
		attends.setBlocksize(2097152);
		//設定壓縮方式
//			info.setCompressionType(Algorithm.SNAPPY);
		//設定版本确界
		attends.setMaxVersions(1);
		attends.setMinVersions(1);
		
		//粉絲列族
		HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
		fans.setBlockCacheEnabled(true);
		fans.setBlocksize(2097152);
		fans.setMaxVersions(1);
		fans.setMinVersions(1);
		
		
		relations.addFamily(attends);
		relations.addFamily(fans);
		admin.createTable(relations);
		
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

8.2.5 建立微網誌收件箱表

表結構

快速學習-HBase實戰之谷粒微網誌

代碼示例

public void createTableReceiveContentEmail(){
	HBaseAdmin admin = null;
	try {
		admin = new HBaseAdmin(conf);
		HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
		
		info.setBlockCacheEnabled(true);
		info.setBlocksize(2097152);
		info.setMaxVersions(1000);
		info.setMinVersions(1000);
		
		receive_content_email.addFamily(info);;
		admin.createTable(receive_content_email);
	} catch (MasterNotRunningException e) {
		e.printStackTrace();
	} catch (ZooKeeperConnectionException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != admin){
			try {
				admin.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

8.2.6 釋出微網誌内容

a、微網誌内容表中添加1條資料

b、微網誌收件箱表對所有粉絲使用者添加資料

代碼:Message.java

public class Message {
	private String uid;
	private String timestamp;
	private String content;
	
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	public String getTimestamp() {
		return timestamp;
	}
	public void setTimestamp(String timestamp) {
		this.timestamp = timestamp;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	@Override
	public String toString() {
		return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
	}
}
           

代碼

public void publishContent(String uid, String content){
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		//a、微網誌内容表中添加1條資料,首先擷取微網誌内容表描述
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		//組裝Rowkey
		long timestamp = System.currentTimeMillis();
		String rowKey = uid + "_" + timestamp;
		
		Put put = new Put(Bytes.toBytes(rowKey));
		put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content));
		
		contentTBL.put(put);
		
		//b、向微網誌收件箱表中加入釋出的Rowkey
		//b.1、查詢使用者關系表,得到目前使用者有哪些粉絲
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		//b.2、取出目标資料
		Get get = new Get(Bytes.toBytes(uid));
		get.addFamily(Bytes.toBytes("fans"));
		
		Result result = relationsTBL.get(get);
		List<byte[]> fans = new ArrayList<byte[]>();
		
		//周遊取出目前釋出微網誌的使用者的所有粉絲資料
		for(Cell cell : result.rawCells()){
			fans.add(CellUtil.cloneQualifier(cell));
		}
		//如果該使用者沒有粉絲,則直接return
		if(fans.size() <= 0) return;
		//開始操作收件箱表
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		List<Put> puts = new ArrayList<Put>();
		for(byte[] fan : fans){
			Put fanPut = new Put(fan);
			fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey));
			puts.add(fanPut);
		}
		recTBL.put(puts);
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != connection){
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
           

8.2.7 添加關注使用者

a、在微網誌使用者關系表中,對目前主動操作的使用者添加新關注的好友

b、在微網誌使用者關系表中,對被關注的使用者添加新的粉絲

c、微網誌收件箱表中添加所關注的使用者釋出的微網誌

代碼實作:public void addAttends(String uid, String… attends)

/**
 * 關注使用者邏輯
 * a、在微網誌使用者關系表中,對目前主動操作的使用者添加新的關注的好友
 * b、在微網誌使用者關系表中,對被關注的使用者添加粉絲(目前操作的使用者)
 * c、目前操作使用者的微網誌收件箱添加所關注的使用者釋出的微網誌rowkey
 */
public void addAttends(String uid, String... attends){
	//參數過濾
	if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
		return;
	}
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		//使用者關系表操作對象(連接配接到使用者關系表)
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		List<Put> puts = new ArrayList<Put>();
		//a、在微網誌使用者關系表中,添加新關注的好友
		Put attendPut = new Put(Bytes.toBytes(uid));
		for(String attend : attends){
			//為目前使用者添加關注的人
			attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
			//b、為被關注的人,添加粉絲
			Put fansPut = new Put(Bytes.toBytes(attend));
			fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
			//将所有關注的人一個一個的添加到puts(List)集合中
			puts.add(fansPut);
		}
		puts.add(attendPut);
		relationsTBL.put(puts);
		
		//c.1、微網誌收件箱添加關注的使用者釋出的微網誌内容(content)的rowkey
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		Scan scan = new Scan();
		//用于存放取出來的關注的人所釋出的微網誌的rowkey
		List<byte[]> rowkeys = new ArrayList<byte[]>();
		
		for(String attend : attends){
			//過濾掃描rowkey,即:前置位比對被關注的人的uid_
			RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
			//為掃描對象指定過濾規則
			scan.setFilter(filter);
			//通過掃描對象得到scanner
			ResultScanner result = contentTBL.getScanner(scan);
			//疊代器周遊掃描出來的結果集
			Iterator<Result> iterator = result.iterator();
			while(iterator.hasNext()){
				//取出每一個符合掃描結果的那一行資料
				Result r = iterator.next();
				for(Cell cell : r.rawCells()){
					//将得到的rowkey放置于集合容器中
					rowkeys.add(CellUtil.cloneRow(cell));
				}
				
			}
		}
		
		//c.2、将取出的微網誌rowkey放置于目前操作使用者的收件箱中
		if(rowkeys.size() <= 0) return;
		//得到微網誌收件箱表的操作對象
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		//用于存放多個關注的使用者的釋出的多條微網誌rowkey資訊
		List<Put> recPuts = new ArrayList<Put>();
		for(byte[] rk : rowkeys){
			Put put = new Put(Bytes.toBytes(uid));
			//uid_timestamp
			String rowKey = Bytes.toString(rk);
			//借取uid
			String attendUID = rowKey.substring(0, rowKey.indexOf("_"));
			long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1));
			//将微網誌rowkey添加到指定單元格中
			put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk);
			recPuts.add(put);
		}
		
		recTBL.put(recPuts);
		
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		if(null != connection){
			try {
				connection.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}
           

8.2.8 移除(取關)使用者

a、在微網誌使用者關系表中,對目前主動操作的使用者移除取關的好友(attends)

b、在微網誌使用者關系表中,對被取關的使用者移除粉絲

c、微網誌收件箱中删除取關的使用者釋出的微網誌

代碼:public void removeAttends(String uid, String… attends)

/**
 * 取消關注(remove)
 * a、在微網誌使用者關系表中,對目前主動操作的使用者删除對應取關的好友
 * b、在微網誌使用者關系表中,對被取消關注的人删除粉絲(目前操作人)
 * c、從收件箱中,删除取關的人的微網誌的rowkey
 */
public void removeAttends(String uid, String... attends){
	//過濾資料
	if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return;
	HConnection connection = null;
	
	try {
		connection = HConnectionManager.createConnection(conf);
		//a、在微網誌使用者關系表中,删除已關注的好友
		HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
		
		//待删除的使用者關系表中的所有資料
		List<Delete> deletes = new ArrayList<Delete>();
		//目前取關操作者的uid對應的Delete對象
		Delete attendDelete = new Delete(Bytes.toBytes(uid));
		//周遊取關,同時每次取關都要将被取關的人的粉絲-1
		for(String attend : attends){
			attendDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
			//b
			Delete fansDelete = new Delete(Bytes.toBytes(attend));
			fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
			deletes.add(fansDelete);
		}
		
		deletes.add(attendDelete);
		relationsTBL.delete(deletes);
		
		//c、删除取關的人的微網誌rowkey 從 收件箱表中
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		
		Delete recDelete = new Delete(Bytes.toBytes(uid));
		for(String attend : attends){
			recDelete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes(attend));
		}
		recTBL.delete(recDelete);
	} catch (IOException e) {
		e.printStackTrace();
	}
}
           

8.2.9 擷取關注的人的微網誌内容

a、從微網誌收件箱中擷取所關注的使用者的微網誌RowKey

b、根據擷取的RowKey,得到微網誌内容

代碼實作:

public List<Message> getAttendsContent(String uid)

/**
 * 擷取微網誌實際内容
 * a、從微網誌收件箱中擷取所有關注的人的釋出的微網誌的rowkey
 * b、根據得到的rowkey去微網誌内容表中得到資料
 * c、将得到的資料封裝到Message對象中
 */
public List<Message> getAttendsContent(String uid){
	HConnection connection = null;
	try {
		connection = HConnectionManager.createConnection(conf);
		HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
		//a、從收件箱中取得微網誌rowKey
		Get get = new Get(Bytes.toBytes(uid));
		//設定最大版本号
		get.setMaxVersions(5);
		List<byte[]> rowkeys = new ArrayList<byte[]>();
		Result result = recTBL.get(get);
		for(Cell cell : result.rawCells()){
			rowkeys.add(CellUtil.cloneValue(cell));
		}
		//b、根據取出的所有rowkey去微網誌内容表中檢索資料
		HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
		List<Get> gets = new ArrayList<Get>();
		//根據rowkey取出對應微網誌的具體内容
		for(byte[] rk : rowkeys){
			Get g = new Get(rk);
			gets.add(g);
		}
		//得到所有的微網誌内容的result對象
		Result[] results = contentTBL.get(gets);
		
		List<Message> messages = new ArrayList<Message>();
		for(Result res : results){
			for(Cell cell : res.rawCells()){
				Message message = new Message();
				
				String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
				String userid = rowKey.substring(0, rowKey.indexOf("_"));
				String timestamp = rowKey.substring(rowKey.indexOf("_") + 1);
				String content = Bytes.toString(CellUtil.cloneValue(cell));
				
				message.setContent(content);
				message.setTimestamp(timestamp);
				message.setUid(userid);
				
				messages.add(message);
			}
		}
		return messages;
	} catch (IOException e) {
		e.printStackTrace();
	}finally{
		try {
			connection.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	return null;
}
           

8.2.10 測試

– 測試釋出微網誌内容

public void testPublishContent(WeiBo wb)
           

– 測試添加關注

public void testAddAttend(WeiBo wb)
           
public void testRemoveAttend(WeiBo wb)
           
public void testShowMessage(WeiBo wb)
           
/**
 * 釋出微網誌内容
 * 添加關注
 * 取消關注
 * 展示内容
 */
public void testPublishContent(WeiBo wb){
	wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!");
	wb.publishContent("0001", "今天天氣不錯。");
}

public void testAddAttend(WeiBo wb){
	wb.publishContent("0008", "準備下課!");
	wb.publishContent("0009", "準備關機!");
	wb.addAttends("0001", "0008", "0009");
}

public void testRemoveAttend(WeiBo wb){
	wb.removeAttends("0001", "0008");
}

public void testShowMessage(WeiBo wb){
	List<Message> messages = wb.getAttendsContent("0001");
	for(Message message : messages){
		System.out.println(message);
	}
}

public static void main(String[] args) {
	WeiBo weibo = new WeiBo();
	weibo.initTable();
	
	weibo.testPublishContent(weibo);
	weibo.testAddAttend(weibo);
	weibo.testShowMessage(weibo);
	weibo.testRemoveAttend(weibo);
	weibo.testShowMessage(weibo);
}
           
下一篇: 微網誌語