需求
基于HBase开发state,作为外部存储;
支持非事务,事务,不透明事务;
目的:全面练习Trident开发;通过核心概念state自定义实现,可以深刻理解state各类语义的封装实现,从而深刻理解Trident处理流程。
在官方案例storm-starter-master的工程上做修改,增加代码。
首先在pom.xml文件中加入以下内容,再保存;
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.92.1</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.mahout.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.1</version>
</dependency>
(1)TupleTableConfig
package hbase.state;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
// 实现序列化接口
public class TupleTableConfig implements Serializable{
private static final long serialVersionUID = 1L;
// 表名
private String tableName;
// rowkey
protected String tupleRowkeyField;
// 时间戳
protected String tupleTimestampField;
// 列簇
protected Map<String, Set<String>> columnFamilies;
// 构造函数
public TupleTableConfig(final String tableName, final String tupleRowkeyField) {
super();
this.tableName = tableName;
this.tupleRowkeyField = tupleRowkeyField;
this.tupleTimestampField = "";
this.columnFamilies = new HashMap<String, Set<String>>();
}
// 构造函数
public TupleTableConfig(final String tableName, final String tupleRowkeyField, final String tupleTimestampField) {
super();
this.tableName = tableName;
this.tupleRowkeyField = tupleRowkeyField;
this.tupleTimestampField = tupleTimestampField;
this.columnFamilies = new HashMap<String, Set<String>>();
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getTupleRowkeyField() {
return tupleRowkeyField;
}
public void setTupleRowkeyField(String tupleRowkeyField) {
this.tupleRowkeyField = tupleRowkeyField;
}
public String getTupleTimestampField() {
return tupleTimestampField;
}
public void setTupleTimestampField(String tupleTimestampField) {
this.tupleTimestampField = tupleTimestampField;
}
public Map<String, Set<String>> getColumnFamilies() {
return columnFamilies;
}
public void setColumnFamilies(Map<String, Set<String>> columnFamilies) {
this.columnFamilies = columnFamilies;
}
}
(2)TridentConfig
package hbase.state;
import java.util.HashMap;
import java.util.Map;
import storm.trident.state.JSONNonTransactionalSerializer;
import storm.trident.state.JSONOpaqueSerializer;
import storm.trident.state.JSONTransactionalSerializer;
import storm.trident.state.Serializer;
import storm.trident.state.StateType;
public class TridentConfig<T> extends TupleTableConfig{
private static final long serialVersionUID = 1L;
private int stateCacheSize = 1000;
private Serializer stateSerializer;
// 支持三种语义
public static final Map<StateType, Serializer> DEFAULT_SERIALIZERS
= new HashMap<StateType, Serializer>()
{
{
put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
put(StateType.OPAQUE, new JSONOpaqueSerializer());
}
};
// 构造函数
public TridentConfig(String tableName, String tupleRowkeyField) {
super(tableName, tupleRowkeyField);
}
// 构造函数
public TridentConfig(String tableName, String tupleRowkeyField, String tupleTimestampField) {
super(tableName, tupleRowkeyField, tupleTimestampField);
}
public int getStateCacheSize() {
return stateCacheSize;
}
public void setStateCacheSize(int stateCacheSize) {
this.stateCacheSize = stateCacheSize;
}
public Serializer getStateSerializer() {
return stateSerializer;
}
public void setStateSerializer(Serializer stateSerializer) {
this.stateSerializer = stateSerializer;
}
}
(3)TridentSplit。接收到每一行log后做处理,用pattern隔开,这里的是用 "/t" 隔开,发射的4个field分别是,"date", "cf", "pv_count", "session_id"。
package hbase.state;
import Util.DateFmt;
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class TridentSplit extends BaseFunction{
private static final long serialVersionUID = 1L;
String pattern = null;
public TridentSplit(String pattern) {
this.pattern = pattern;
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 接收到每一行log后做处理,用pattern隔开,这里的是用 "/t" 隔开
String log = tuple.getString(0);
String logArr[] = log.split(pattern);
if (3 == logArr.length) {
// 发射的4个field分别是,"date", "cf", "pv_count", "session_id"
collector.emit(new Values(DateFmt.getCountDate(logArr[2], DateFmt.DATE_SHORT), "cf", "pv_count", logArr[1]));
}
}
}
(4)HTableConnector,用于连接hbase和创建hbase表
package hbase.state;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
public class HTableConnector implements Serializable{
private static final long serialVersionUID = 1L;
// 配置属性
private Configuration configuration;
// hbase表
protected HTable table;
// hbase表名
private String tableName;
public HTableConnector(TupleTableConfig conf) throws Exception
{
this.tableName = conf.getTableName();
this.configuration = HBaseConfiguration.create();
String filePathString = "hbase-site.xml";
Path path = new Path(filePathString);
this.configuration.addResource(path);
this.table = new HTable(configuration, tableName);
}
public Configuration getConfiguration() {
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
public HTable getTable() {
return table;
}
public void setTable(HTable table) {
this.table = table;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
// 关闭table
public void close () {
if (null != this.table) {
try {
this.table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
(5)HBaseAggregateState。有multiGet和multiPut。每个批次,执行multiPut之前会先执行multiGet。
package hbase.state;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import storm.trident.state.OpaqueValue;
import storm.trident.state.Serializer;
import storm.trident.state.StateType;
import storm.trident.state.map.IBackingMap;
@SuppressWarnings({"rawtypes", "unchecked"}) // 可以忽略泛型的定义
public class HBaseAggregateState<T> implements IBackingMap<T>{
private HTableConnector connector;
private Serializer<T> serializer;
public HBaseAggregateState(TridentConfig config) {
this.serializer = config.getStateSerializer();
try {
this.connector = new HTableConnector(config);
} catch (Exception e) {
e.printStackTrace();
}
}
// 不透明
public static HBaseAggregateFactory opaque(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.OPAQUE);
}
// 事务
public static HBaseAggregateFactory transactional(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.TRANSACTIONAL);
}
// 非事务
public static HBaseAggregateFactory nonTransactional(TridentConfig<OpaqueValue> config) {
return new HBaseAggregateFactory(config, StateType.NON_TRANSACTIONAL);
}
@Override
public List<T> multiGet(List<List<Object>> keys) {
System.err.println("multiGet begin");
for (List<Object> k : keys) {
System.err.println(k.toString());
}
List<Get> gets = new ArrayList<Get>(keys.size());
byte[] rowkey; // rowkey
byte[] columnFamilies; // 列簇
byte[] columnQualifier; // 字段名
for (List<Object> k : keys) {
rowkey = Bytes.toBytes(k.get(0).toString());
columnFamilies = Bytes.toBytes(k.get(1).toString());
columnQualifier = Bytes.toBytes(k.get(2).toString());
Get get = new Get(rowkey);
get.addColumn(columnFamilies, columnQualifier);
gets.add(get);
}
Result[] results = null;
try {
results = connector.getTable().get(gets);
} catch (IOException e) {
e.printStackTrace();
}
List<T> rtn = new ArrayList<T>(keys.size());
for (int i = 0; i < keys.size(); i++) {
columnFamilies = Bytes.toBytes((String)keys.get(i).get(1));
columnQualifier = Bytes.toBytes((String)keys.get(i).get(2));
Result result = results[i];
if (result.isEmpty()) {
rtn.add(null);
} else {
// 反序列化
rtn.add((T)serializer.deserialize(result.getValue(columnFamilies, columnQualifier)));
}
}
System.err.println("value = ");
for (T t : rtn) {
if (null != t) {
System.err.println(t.toString());
}
}
System.err.println("multiGet end");
return rtn;
}
// 执行multiPut之前会先执行multiGet
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
System.err.println("multiPut begin");
for (int i = 0; i < keys.size(); i++) {
System.err.println(keys.get(i).toString() + "------- value = " + vals.get(i).toString());
}
List<Put> puts = new ArrayList<Put>();
for (int i = 0; i < keys.size(); i++) {
byte[] rowkey = Bytes.toBytes((String)keys.get(i).get(0));
byte[] columnFamilies = Bytes.toBytes((String)keys.get(i).get(1));
byte[] columnQualifier = Bytes.toBytes((String)keys.get(i).get(2));
byte[] columnValue = serializer.serialize(vals.get(i)); // 序列化
Put put = new Put(rowkey);
put.add(columnFamilies, columnQualifier, columnValue);
puts.add(put);
}
try {
connector.getTable().put(puts);
connector.getTable().flushCommits();
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("multiPut end");
}
}
(6)HBaseAggregateFactory。用于获取state。
package hbase.state;
import java.util.Map;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.state.StateType;
import storm.trident.state.map.CachedMap;
import storm.trident.state.map.MapState;
import storm.trident.state.map.NonTransactionalMap;
import storm.trident.state.map.OpaqueMap;
import storm.trident.state.map.SnapshottableMap;
import storm.trident.state.map.TransactionalMap;
@SuppressWarnings({"rawtypes", "unchecked"}) // 可以忽略泛型的定义
public class HBaseAggregateFactory implements StateFactory{
private static final long serialVersionUID = 1L;
private StateType type;
private TridentConfig config;
public HBaseAggregateFactory(TridentConfig config, StateType type) {
super();
this.type = type;
this.config = config;
if (null == config.getStateSerializer()) {
config.setStateSerializer(TridentConfig.DEFAULT_SERIALIZERS.get(type));
}
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
// 打印分区号
System.err.println("[HBaseAggregateFactory, makeState] partitionIndex = " + partitionIndex);
HBaseAggregateState state = new HBaseAggregateState(config);
CachedMap c = new CachedMap(state, config.getStateCacheSize());
MapState ms = null;
if (type == StateType.NON_TRANSACTIONAL) {
ms = NonTransactionalMap.build(c);
} else if (type == StateType.OPAQUE) {
ms = OpaqueMap.build(c);
} else if (type == StateType.TRANSACTIONAL) {
ms = TransactionalMap.build(c);
}
return new SnapshottableMap(ms, new Values("$GLOBALS"));
}
}
(7)TridentPVTopo。主程序类;FixedBatchSpout作为数据源发射8个tuple,每个batch包含的tuple最多为3个。
package hbase.state;
import java.util.Random;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.FixedBatchSpout;
public class TridentPVTopo {
public static StormTopology buildTopology(LocalDRPC drpc) {
Random random = new Random();
// 登录的网站是taobao
String hosts = "www.taobao.com";
// 每次登录的session id
String[] sessionId = { "5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8",
"5D16C3E0209C16DEAA28L1824", "5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413",
"5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E", "5C3FBA728FD7D264B80769B23",
"5B16C0F7215109AG43528BA2D", "5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F" };
// 登录的时间
String[] times = { "2019-09-14 08:01:36", "2019-09-14 08:11:37", "2019-09-14 08:31:38", "2019-09-14 09:23:07",
"2019-09-14 10:51:27", "2019-09-14 10:51:56", "2019-09-14 11:01:07", "2019-09-14 11:01:20",
"2019-09-16 11:45:30", "2019-09-16 12:31:49", "2019-09-16 12:41:51", "2019-09-16 12:51:37",
"2019-09-16 13:11:27", "2019-09-16 13:20:40", "2019-09-16 13:31:38"};
// spout,数据源是若干个tuple,每个tuple是一行语句,每个batch包含的tuple最多为3个
FixedBatchSpout spout = new FixedBatchSpout(new Fields("eachLog"), 3,
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]),
new Values(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]));
// 如果设置为true,spout会持续发送数据
spout.setCycle(false);
TridentConfig config = new TridentConfig("hbase_state", "rowkey");
HBaseAggregateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
// 把spout当作数据源
topology.newStream("spout1", spout)
.each(new Fields("eachLog"), new TridentSplit("\t"), new Fields("date", "cf", "pv_count", "session_id")) // rowkey是日期
.project(new Fields("date", "cf", "pv_count")) // 投影,只需要前3列
.groupBy(new Fields("date", "cf", "pv_count")) // group by
// 持久化到hbase。pv是value
.persistentAggregate(state, new Count(), new Fields("PV"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
// conf.setDebug(true);
conf.setMaxSpoutPending(20);
// 如果没有参数,本地模式提交
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("PVCounter", conf, buildTopology(null));
} else { // 分布式模式提交
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
}
(8)在hadoop-senior主机,依次启动hadoop,zookeeper,hbase;在hbase创建表 "hbase_state",列簇为 "cf"
create 'hbase_state', 'cf'
(9)在src目录下新建 hbase-site.xml。用于连接到hbase所需要的配置;
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop-senior.ibeifeng.com:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop-senior.ibeifeng.com</value>
</property>
</configuration>
(10)运行TridentPVTopo类,运行2次;控制台打印的内容和hbase_state表的内容如下:
可以看出,hbase具有保存数据的作用,第二次的结果是在第一次的基础上累加;
每个批次,执行multiPut之前会先执行multiGet,先把 rowkey+列簇+列 对应的数据读取出来,再把数据累加到 multiPut;
hbase_state中保存有事务id和总数,事务id会变化,此处的事务id指的是本次运行的事务id的最大值,所以第一次的事务id会被第二次的事务id覆盖;总数指的是累加的总数,第二次的总数是在第一次的基础上累加;
第一次运行,控制台内容:
multiGet begin
[2019-09-16, cf, pv_count]
[2019-09-15, cf, pv_count]
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=1,txid=1]
[2019-09-15, cf, pv_count]------- value = [email protected][val=2,txid=1]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=4,txid=2]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-15, cf, pv_count]------- value = [email protected][val=4,txid=3]
multiPut end
第一次运行,hbase_state内容,value第一个数字是事务id(本次运行有几个事务),第二个是总数:
ROW COLUMN+CELL
2019-09-15 column=cf:pv_count, timestamp=1569100965061, value=[3,4]
2019-09-16 column=cf:pv_count, timestamp=1569100965056, value=[2,4]
=======================================================================================================================================
第二次运行,控制台内容:
multiGet begin
[2019-09-16, cf, pv_count]
[2019-09-15, cf, pv_count]
value =
[email protected][val=4,txid=2]
[email protected][val=4,txid=3]
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=6,txid=1]
[2019-09-15, cf, pv_count]------- value = [email protected][val=5,txid=1]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-16, cf, pv_count]------- value = [email protected][val=8,txid=2]
[2019-09-15, cf, pv_count]------- value = [email protected][val=6,txid=2]
multiPut end
multiGet begin
value =
multiGet end
multiPut begin
[2019-09-15, cf, pv_count]------- value = [email protected][val=8,txid=3]
multiPut end
第二次运行,hbase_state 内容,value第一个数字是事务id(本次运行有几个事务),第二个是总数:
ROW COLUMN+CELL
2019-09-15 column=cf:pv_count, timestamp=1569102732980, value=[3,8]
2019-09-16 column=cf:pv_count, timestamp=1569102732976, value=[2,8]