需求
基于HBase開發state,作為外部存儲;
支援非事務,事務,不透明事務;
目的:全面練習Trident開發;通過核心概念state自定義實作,可以深刻了解state各類語義的封裝實作,進而深刻了解Trident處理流程。
在官方案例storm-starter-master的工程上做修改,增加代碼。
首先在pom.xml檔案中加入以下内容,再儲存;
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.92.1</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.mahout.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.1</version>
</dependency>
(1)TupleTableConfig
package hbase.state;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
// 實作序列化接口
public class TupleTableConfig implements Serializable{
private static final long serialVersionUID = 1L;
// 表名
private String tableName;
// rowkey
protected String tupleRowkeyField;
// 時間戳
protected String tupleTimestampField;
// 列簇
protected Map<String, Set<String>> columnFamilies;
// 構造函數
public TupleTableConfig(final String tableName, final String tupleRowkeyField) {
super();
this.tableName = tableName;
this.tupleRowkeyField = tupleRowkeyField;
this.tupleTimestampField = "";
this.columnFamilies = new HashMap<String, Set<String>>();
}
// 構造函數
public TupleTableConfig(final String tableName, final String tupleRowkeyField, final String tupleTimestampField) {
super();
this.tableName = tableName;
this.tupleRowkeyField = tupleRowkeyField;
this.tupleTimestampField = tupleTimestampField;
this.columnFamilies = new HashMap<String, Set<String>>();
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getTupleRowkeyField() {
return tupleRowkeyField;
}
public void setTupleRowkeyField(String tupleRowkeyField) {
this.tupleRowkeyField = tupleRowkeyField;
}
public String getTupleTimestampField() {
return tupleTimestampField;
}
public void setTupleTimestampField(String tupleTimestampField) {
this.tupleTimestampField = tupleTimestampField;
}
public Map<String, Set<String>> getColumnFamilies() {
return columnFamilies;
}
public void setColumnFamilies(Map<String, Set<String>> columnFamilies) {
this.columnFamilies = columnFamilies;
}
}
(2)TridentConfig
package hbase.state;
import java.util.HashMap;
import java.util.Map;
import storm.trident.state.JSONNonTransactionalSerializer;
import storm.trident.state.JSONOpaqueSerializer;
import storm.trident.state.JSONTransactionalSerializer;
import storm.trident.state.Serializer;
import storm.trident.state.StateType;
public class TridentConfig<T> extends TupleTableConfig{
private static final long serialVersionUID = 1L;
private int stateCacheSize = 1000;
private Serializer stateSerializer;
// 支援三種語義
public static final Map<StateType, Serializer> DEFAULT_SERIALIZERS
= new HashMap<StateType, Serializer>()
{
{
put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
put(StateType.OPAQUE, new JSONOpaqueSerializer());
}
};
// 構造函數
public TridentConfig(String tableName, String tupleRowkeyField) {
super(tableName, tupleRowkeyField);
}
// 構造函數
public TridentConfig(String tableName, String tupleRowkeyField, String tupleTimestampField) {
super(tableName, tupleRowkeyField, tupleTimestampField);
}
public int getStateCacheSize() {
return stateCacheSize;
}
public void setStateCacheSize(int stateCacheSize) {
this.stateCacheSize = stateCacheSize;
}
public Serializer getStateSerializer() {
return stateSerializer;
}
public void setStateSerializer(Serializer stateSerializer) {
this.stateSerializer = stateSerializer;
}
}
(3)TridentSplit。接收到每一行log後做處理,用pattern隔開,這裡的是用 "/t" 隔開,發射的4個field分别是,"date", "cf", "pv_count", "session_id"。
package hbase.state;
import Util.DateFmt;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class TridentSplit extends BaseFunction{
private static final long serialVersionUID = 1L;
String pattern = null;
public TridentSplit(String pattern) {
this.pattern = pattern;
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 接收到每一行log後做處理,用pattern隔開,這裡的是用 "/t" 隔開
String log = tuple.getString(0);
String logArr[] = log.split(pattern);
if (3 == logArr.length) {
// 發射的4個field分别是,"date", "cf", "pv_count", "session_id"
collector.emit(new Values(DateFmt.getCountDate(logArr[2], DateFmt.DATE_SHORT), "cf", "pv_count", logArr[1]));
}
}
}
(4)HTableConnector,用于連接配接hbase和建立hbase表
package hbase.state;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
public class HTableConnector implements Serializable{
private static final long serialVersionUID = 1L;
// 配置屬性
private Configuration configuration;
// hbase表
protected HTable table;
// hbase表名
private String tableName;
public HTableConnector(TupleTableConfig conf) throws Exception
{
this.tableName = conf.getTableName();
this.configuration = HBaseConfiguration.create();
String filePathString = "hbase-site.xml";
Path path = new Path(filePathString);
this.configuration.addResource(path);
this.table = new HTable(configuration, tableName);
}
public Configuration getConfiguration() {
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
public HTable getTable() {
return table;
}
public void setTable(HTable table) {
this.table = table;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
// 關閉table
public void close () {
if (null != this.table) {
try {
this.table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
(5)HBaseAggregateState。有multiGet和multiPut。每個批次,執行multiPut之前會先執行multiGet。
package hbase.state;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import storm.trident.state.OpaqueValue;
import storm.trident.state.Serializer;
import storm.trident.state.StateType;
import storm.trident.state.map.IBackingMap;
@SuppressWarnings({"rawtypes", "unchecked"}) // 可以忽略泛型的定義
public class HBaseAggregateState<T> implements IBackingMap<T>{
private HTableConnector connector;
private Serializer<T> serializer;
public HBaseAggregateState(TridentConfig config) {
this.serializer = config.getStateSerializer();
try {
this.connector = new HTableConnector(config);
} catch (Exception e) {
e.printStackTrace();
}
}
// 不透明
public static HBaseAggregateFactory opaque(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.OPAQUE);
}
// 事務
public static HBaseAggregateFactory transactional(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.TRANSACTIONAL);
}
// 非事務
public static HBaseAggregateFactory nonTransactional(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.NON_TRANSACTIONAL);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
System.err.println("multiGet begin");
for (List<Object> k : keys) {
System.err.println(k.toString());
}
List<Get> gets = new ArrayList<Get>(keys.size());
byte[] rowkey; // rowkey
byte[] columnFamilies; // 列簇
byte[] columnQualifier; // 字段名
for (List<Object> k : keys) {
rowkey = Bytes.toBytes(k.get(0).toString());
columnFamilies = Bytes.toBytes(k.get(1).toString());
columnQualifier = Bytes.toBytes(k.get(2).toString());
Get get = new Get(rowkey);
get.addColumn(columnFamilies, columnQualifier);
gets.add(get);
}
Result[] results = null;
try {
results = connector.getTable().get(gets);
} catch (IOException e) {
e.printStackTrace();
}
List<T> rtn = new ArrayList<T>(keys.size());
for (int i = 0; i < keys.size(); i++) {
columnFamilies = Bytes.toBytes((String)keys.get(i).get(1));
columnQualifier = Bytes.toBytes((String)keys.get(i).get(2));
Result result = results[i];
if (result.isEmpty()) {
rtn.add(null);
} else {
// 反序列化
rtn.add((T)serializer.deserialize(result.getValue(columnFamilies, columnQualifier)));
}
}
System.err.println("value = ");
for (T t : rtn) {
if (null != t) {
System.err.println(t.toString());
}
}
System.err.println("multiGet end");
return rtn;
}
// 執行multiPut之前會先執行multiGet
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
System.err.println("multiPut begin");
for (int i = 0; i < keys.size(); i++) {
System.err.println(keys.get(i).toString() + "------- value = " + vals.get(i).toString());
}
List<Put> puts = new ArrayList<Put>();
for (int i = 0; i < keys.size(); i++) {
byte[] rowkey = Bytes.toBytes((String)keys.get(i).get(0));
byte[] columnFamilies = Bytes.toBytes((String)keys.get(i).get(1));
byte[] columnQualifier = Bytes.toBytes((String)keys.get(i).get(2));
byte[] columnValue = serializer.serialize(vals.get(i)); // 序列化
Put put = new Put(rowkey);
put.add(columnFamilies, columnQualifier, columnValue);
puts.add(put);
}
try {
connector.getTable().put(puts);
connector.getTable().flushCommits();
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("multiPut end");
}
}
(6)HBaseAggregateFactory。用于擷取state。
package hbase.state;
import java.util.Map;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.state.StateType;
import storm.trident.state.map.CachedMap;
import storm.trident.state.map.MapState;
import storm.trident.state.map.NonTransactionalMap;
import storm.trident.state.map.OpaqueMap;
import storm.trident.state.map.SnapshottableMap;
import storm.trident.state.map.TransactionalMap;
@SuppressWarnings({"rawtypes", "unchecked"}) // 可以忽略泛型的定義
public class HBaseAggregateFactory implements StateFactory{
private static final long serialVersionUID = 1L;
private StateType type;
private TridentConfig config;
public HBaseAggregateFactory(TridentConfig config, StateType type) {
super();
this.type = type;
this.config = config;
if (null == config.getStateSerializer()) {
config.setStateSerializer(TridentConfig.DEFAULT_SERIALIZERS.get(type));
}
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
// 列印分區号
System.err.println("[HBaseAggregateFactory, makeState] partitionIndex = " + partitionIndex);
HBaseAggregateState state = new HBaseAggregateState(config);
CachedMap c = new CachedMap(state, config.getStateCacheSize());
MapState ms = null;
if (type == StateType.NON_TRANSACTIONAL) {
ms = NonTransactionalMap.build(c);
} else if (type == StateType.OPAQUE) {
ms = OpaqueMap.build(c);
} else if (type == StateType.TRANSACTIONAL) {
ms = TransactionalMap.build(c);
}
return new SnapshottableMap(ms, new Values("$GLOBALS"));
}
}
(7)TridentPVTopo。主程式類;FixedBatchSpout作為資料源發射8個tuple,每個batch包含的tuple最多為3個。
package hbase.state;
import java.util.Random;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.FixedBatchSpout;
public class TridentPVTopo {
public static StormTopology buildTopology(LocalDRPC drpc) {
Random random = new Random();
// 登入的網站是taobao
String hosts = "www.taobao.com";
// 每次登入的session id
String[] sessionId = { "5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8",
"5D16C3E0209C16DEAA28L1824", "5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413",
"5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E", "5C3FBA728FD7D264B80769B23",
"5B16C0F7215109AG43528BA2D", "5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F" };
// 登入的時間
String[] times = { "2019-09-14 08:01:36", "2019-09-14 08:11:37", "2019-09-14 08:31:38", "2019-09-14 09:23:07",
"2019-09-14 10:51:27", "2019-09-14 10:51:56", "2019-09-14 11:01:07", "2019-09-14 11:01:20",
"2019-09-16 11:45:30", "2019-09-16 12:31:49", "2019-09-16 12:41:51", "2019-09-16 12:51:37",
"2019-09-16 13:11:27", "2019-09-16 13:20:40", "2019-09-16 13:31:38"};
// spout,資料源是若幹個tuple,每個tuple是一行語句,每個batch包含的tuple最多為3個
FixedBatchSpout spout = new FixedBatchSpout(new Fields("eachLog"), 3,
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]));
// 如果設定為true,spout會持續發送資料
spout.setCycle(false);
TridentConfig config = new TridentConfig("hbase_state", "rowkey");
HBaseAggregateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
// 把spout當作資料源
topology.newStream("spout1", spout)
.each(new Fields("eachLog"), new TridentSplit("\t"), new Fields("date", "cf", "pv_count", "session_id")) // rowkey是日期
.project(new Fields("date", "cf", "pv_count")) // 投影,隻需要前3列
.groupBy(new Fields("date", "cf", "pv_count")) // group by
// 持久化到hbase。pv是value
.persistentAggregate(state, new Count(), new Fields("PV"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
// conf.setDebug(true);
conf.setMaxSpoutPending(20);
// 如果沒有參數,本地模式送出
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("PVCounter", conf, buildTopology(null));
} else { // 分布式模式送出
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
}
(8)在hadoop-senior主機,依次啟動hadoop,zookeeper,hbase;在hbase建立表 "hbase_state",列簇為 "cf"
create 'hbase_state', 'cf'
(9)在src目錄下建立 hbase-site.xml。用于連接配接到hbase所需要的配置;
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop-senior.ibeifeng.com:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop-senior.ibeifeng.com</value>
</property>
</configuration>
(10)運作TridentPVTopo類,運作2次;控制台列印的内容和hbase_state表的内容如下:
可以看出,hbase具有儲存資料的作用,第二次的結果是在第一次的基礎上累加;
每個批次,執行multiPut之前會先執行multiGet,先把 rowkey+列簇+列 對應的資料讀取出來,再把資料累加到 multiPut;
hbase_state中儲存有事務id和總數,事務id會變化,此處的事務id指的是本次運作的事務id的最大值,是以第一次的事務id會被第二次的事務id覆寫;總數指的是累加的總數,第二次的總數是在第一次的基礎上累加;
第一次運作,控制台内容:
multiGet begin
[2019-09-16, cf, pv_count]
[2019-09-15, cf, pv_count]
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=1,txid=1]
[2019-09-15, cf, pv_count]------- value = [email protected][val=2,txid=1]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=4,txid=2]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-15, cf, pv_count]------- value = [email protected][val=4,txid=3]
multiPut end
第一次運作,hbase_state内容,value第一個數字是事務id(本次運作有幾個事務),第二個是總數:
ROW COLUMN+CELL
2019-09-15 column=cf:pv_count, timestamp=1569100965061, value=[3,4]
2019-09-16 column=cf:pv_count, timestamp=1569100965056, value=[2,4]
=======================================================================================================================================
第二次運作,控制台内容:
multiGet begin
[2019-09-16, cf, pv_count]
[2019-09-15, cf, pv_count]
value =
[email protected][val=4,txid=2]
[email protected][val=4,txid=3]
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=6,txid=1]
[2019-09-15, cf, pv_count]------- value = [email protected][val=5,txid=1]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=8,txid=2]
[2019-09-15, cf, pv_count]------- value = [email protected][val=6,txid=2]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-15, cf, pv_count]------- value = [email protected][val=8,txid=3]
multiPut end
第二次運作,hbase_state 内容,value第一個數字是事務id(本次運作有幾個事務),第二個是總數:
ROW COLUMN+CELL
2019-09-15 column=cf:pv_count, timestamp=1569102732980, value=[3,8]
2019-09-16 column=cf:pv_count, timestamp=1569102732976, value=[2,8]