在nginx這一層,接收到通路請求的時候,就把請求的流量上報發送給kafka
storm才能去消費kafka中的實時的通路日志,然後去進行緩存熱資料的統計
從lua腳本直接建立一個kafka producer,發送資料到kafka
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
yum install -y unzip
unzip lua-resty-kafka-master.zip
cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib
nginx -s reload
lua腳本:
local cjson = require("cjson")
local producer = require("resty.kafka.producer")
local broker_list = {
{ host = "192.168.31.187", port = 9092 },
{ host = "192.168.31.19", port = 9092 },
{ host = "192.168.31.227", port = 9092 }
}
local log_json = {}
log_json["headers"] = ngx.req.get_headers()
log_json["uri_args"] = ngx.req.get_uri_args()
log_json["body"] = ngx.req.read_body()
log_json["http_version"] = ngx.req.http_version()
log_json["method"] =ngx.req.get_method()
log_json["raw_reader"] = ngx.req.raw_header()
log_json["body_data"] = ngx.req.get_body_data()
local message = cjson.encode(log_json);
local productId = ngx.req.get_uri_args()["productId"]
local async_producer = producer:new(broker_list, { producer_type = "async" })
local ok, err = async_producer:send("access-log", productId, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
兩台機器上都這樣做,才能統一上報流量到kafka
bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create
bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning
(1)kafka在187上的節點死掉了,可能是虛拟機的問題,殺掉程序,重新啟動一下
nohup bin/kafka-server-start.sh config/server.properties &
(2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8;
(3)需要在kafka中加入advertised.host.name = 192.168.31.187,重新開機三個kafka程序
(4)需要啟動eshop-cache緩存服務,因為nginx中的本地緩存可能不在了
基于storm+kafka完成商品通路次數實時統計拓撲的開發:
總結思路:
1、kafka consumer spout
單獨的線程消費,寫入隊列
nextTuple,每次都是判斷隊列有沒有資料,有的話再去擷取并發射出去,不能阻塞
2、日志解析bolt
3、商品通路次數統計bolt
4、基于LRUMap完成統計
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import com.roncoo.eshop.storm.bolt.LogParseBolt;
import com.roncoo.eshop.storm.bolt.ProductCountBolt;
import com.roncoo.eshop.storm.spout.AccessLogKafkaSpout;
/**
* 熱資料統計拓撲
* @author Administrator
*
*/
public class HotProductTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("AccessLogKafkaSpout", new AccessLogKafkaSpout(), 1);
builder.setBolt("LogParseBolt", new LogParseBolt(), 5)
.setNumTasks(5)
.shuffleGrouping("AccessLogKafkaSpout");
builder.setBolt("ProductCountBolt", new ProductCountBolt(), 5)
.setNumTasks(10)
.fieldsGrouping("LogParseBolt", new Fields("productId"));
Config config = new Config();
if(args != null && args.length > 1) {
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HotProductTopology", config, builder.createTopology());
Utils.sleep(30000);
cluster.shutdown();
}
}
}
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import com.alibaba.fastjson.JSONObject;
/**
* 日志解析的bolt
* @author Administrator
*
*/
public class LogParseBolt extends BaseRichBolt {
private static final long serialVersionUID = -8017609899644290359L;
private OutputCollector collector;
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String message = tuple.getStringByField("message");
JSONObject messageJSON = JSONObject.parseObject(message);
JSONObject uriArgsJSON = messageJSON.getJSONObject("uri_args");
Long productId = uriArgsJSON.getLong("productId");
if(productId != null) {
collector.emit(new Values(productId));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("productId"));
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.storm.shade.org.json.simple.JSONArray;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import com.roncoo.eshop.storm.zk.ZooKeeperSession;
/**
* 商品通路次數統計bolt
* @author Administrator
*
*/
public class ProductCountBolt extends BaseRichBolt {
private static final long serialVersionUID = -8761807561458126413L;
private LRUMap<Long, Long> productCountMap = new LRUMap<Long, Long>(1000);
private ZooKeeperSession zkSession;
private int taskid;
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.zkSession = ZooKeeperSession.getInstance();
this.taskid = context.getThisTaskId();
new Thread(new ProductCountThread()).start();
// 1、将自己的taskid寫入一個zookeeper node中,形成taskid的清單
// 2、然後每次都将自己的熱門商品清單,寫入自己的taskid對應的zookeeper節點
// 3、然後這樣的話,并行的預熱程式才能從第一步中知道,有哪些taskid
// 4、然後并行預熱程式根據每個taskid去擷取一個鎖,然後再從對應的znode中拿到熱門商品清單
initTaskId(context.getThisTaskId());
}
private void initTaskId(int taskid) {
// ProductCountBolt所有的task啟動的時候, 都會将自己的taskid寫到同一個node的值中
// 格式就是逗号分隔,拼接成一個清單
// 111,211,355
zkSession.acquireDistributedLock();
String taskidList = zkSession.getNodeData();
if(!"".equals(taskidList)) {
taskidList += "," + taskid;
} else {
taskidList += taskid;
}
zkSession.setNodeData("/taskid-list", taskidList);
zkSession.releaseDistributedLock();
}
private class ProductCountThread implements Runnable {
public void run() {
List<Map.Entry<Long, Long>> topnProductList = new ArrayList<Map.Entry<Long, Long>>();
while(true) {
topnProductList.clear();
int topn = 3;
if(productCountMap.size() == 0) {
Utils.sleep(100);
continue;
}
for(Map.Entry<Long, Long> productCountEntry : productCountMap.entrySet()) {
if(topnProductList.size() == 0) {
topnProductList.add(productCountEntry);
} else {
// 比較大小,生成最熱topn的算法有很多種
// 但是我這裡為了簡化起見,不想引入過多的資料結構和算法的的東西
// 很有可能還是會有漏洞,但是我已經反複推演了一下了,而且也畫圖分析過這個算法的運作流程了
boolean bigger = false;
for(int i = 0; i < topnProductList.size(); i++){
Map.Entry<Long, Long> topnProductCountEntry = topnProductList.get(i);
if(productCountEntry.getValue() > topnProductCountEntry.getValue()) {
int lastIndex = topnProductList.size() < topn ? topnProductList.size() - 1 : topn - 2;
for(int j = lastIndex; j >= i; j--) {
topnProductList.set(j + 1, topnProductList.get(j));
}
topnProductList.set(i, productCountEntry);
bigger = true;
break;
}
}
if(!bigger) {
if(topnProductList.size() < topn) {
topnProductList.add(productCountEntry);
}
}
}
}
// 擷取到一個topn list
String topnProductListJSON = JSONArray.toJSONString(topnProductList);
zkSession.setNodeData("/task-hot-product-list-" + taskid, topnProductListJSON);
Utils.sleep(5000);
}
}
}
public void execute(Tuple tuple) {
Long productId = tuple.getLongByField("productId");
Long count = productCountMap.get(productId);
if(count == null) {
count = 0L;
}
count++;
productCountMap.put(productId, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
/**
* kafka消費資料的spout
*/
public class AccessLogKafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = 8698470299234327074L;
private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
private SpoutOutputCollector collector;
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
startKafkaConsumer();
}
@SuppressWarnings("rawtypes")
private void startKafkaConsumer() {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181");
props.put("group.id", "eshop-cache-group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.
createJavaConsumerConnector(consumerConfig);
String topic = "access-log";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
new Thread(new KafkaMessageProcessor(stream)).start();
}
}
private class KafkaMessageProcessor implements Runnable {
@SuppressWarnings("rawtypes")
private KafkaStream kafkaStream;
@SuppressWarnings("rawtypes")
public KafkaMessageProcessor(KafkaStream kafkaStream) {
this.kafkaStream = kafkaStream;
}
@SuppressWarnings("unchecked")
public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
try {
queue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void nextTuple() {
if(queue.size() > 0) {
try {
String message = queue.take();
collector.emit(new Values(message));
} catch (Exception e) {
e.printStackTrace();
}
} else {
Utils.sleep(100);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* ZooKeeperSession
* @author Administrator
*
*/
public class ZooKeeperSession {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
public ZooKeeperSession() {
// 去連接配接zookeeper server,建立會話的時候,是異步去進行的
// 是以要給一個監聽器,說告訴我們什麼時候才是真正完成了跟zk server的連接配接
try {
this.zookeeper = new ZooKeeper(
"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",
50000,
new ZooKeeperWatcher());
// 給一個狀态CONNECTING,連接配接中
System.out.println(zookeeper.getState());
try {
// CountDownLatch
// java多線程并發同步的一個工具類
// 會傳遞進去一些數字,比如說1,2 ,3 都可以
// 然後await(),如果數字不是0,那麼久卡住,等待
// 其他的線程可以調用coutnDown(),減1
// 如果數字減到0,那麼之前所有在await的線程,都會逃出阻塞的狀态
// 繼續向下運作
connectedSemaphore.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 擷取分布式鎖
* @param productId
*/
public void acquireDistributedLock() {
String path = "/taskid-list-lock";
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for taskid-list-lock");
} catch (Exception e) {
// 如果那個商品對應的鎖的node,已經存在了,就是已經被别人加鎖了,那麼就這裡就會報錯
// NodeExistsException
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
System.out.println("the " + count + " times try to acquire lock for taskid-list-lock......");
continue;
}
System.out.println("success to acquire lock for taskid-list-lock after " + count + " times try......");
break;
}
}
}
/**
* 釋放掉一個分布式鎖
* @param productId
*/
public void releaseDistributedLock() {
String path = "/taskid-list-lock";
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for taskid-list-lock......");
} catch (Exception e) {
e.printStackTrace();
}
}
public String getNodeData() {
try {
return new String(zookeeper.getData("/taskid-list", false, new Stat()));
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
public void setNodeData(String path, String data) {
try {
zookeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立zk session的watcher
* @author Administrator
*
*/
private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());
if(KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
/**
* 封裝單例的靜态内部類
* @author Administrator
*
*/
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/**
* 擷取單例
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/**
* 初始化單例的便捷方法
*/
public static void init() {
getInstance();
}
}
于雙重zookeeper分布式鎖完成分布式并行緩存預熱:
1、服務啟動的時候,進行緩存預熱
2、從zk中讀取taskid清單
3、依次周遊每個taskid,嘗試擷取分布式鎖,如果擷取不到,快速報錯,不要等待,因為說明已經有其他服務執行個體在預熱了
4、直接嘗試擷取下一個taskid的分布式鎖
5、即使擷取到了分布式鎖,也要檢查一下這個taskid的預熱狀态,如果已經被預熱過了,就不再預熱了
6、執行預熱操作,周遊productid清單,查詢資料,然後寫ehcache和redis
7、預熱完成後,設定taskid對應的預熱狀态
ZKsession重載兩個方法:
/**
* 擷取分布式鎖
* @param productId
*/
public boolean acquireFastFailedDistributedLock(String path) {
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for " + path);
return true;
} catch (Exception e) {
System.out.println("fail to acquire lock for " + path);
}
return false;
}
/**
* 釋放掉一個分布式鎖
* @param productId
*/
public void releaseDistributedLock(String path) {
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for " + path + "......");
} catch (Exception e) {
e.printStackTrace();
}
}
public String getNodeData(String path) {
try {
return new String(zookeeper.getData(path, false, new Stat()));
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
public void setNodeData(String path, String data) {
try {
zookeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 擷取分布式鎖
*/
public void acquireDistributedLock(String path) {
try {
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to acquire lock for " + path);
} catch (Exception e) {
// 如果那個商品對應的鎖的node,已經存在了,就是已經被别人加鎖了,那麼就這裡就會報錯
// NodeExistsException
int count = 0;
while(true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
System.out.println("the " + count + " times try to acquire lock for " + path + "......");
continue;
}
System.out.println("success to acquire lock for " + path + " after " + count + " times try......");
break;
}
}
}
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;
/**
* 緩存預熱線程
*/
public class CachePrewarmThread extends Thread {
@Override
public void run() {
CacheService cacheService = (CacheService) SpringContext.
getApplicationContext().getBean("cacheService");
ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
// 擷取storm taskid清單
String taskidList = zkSession.getNodeData("/taskid-list");
if(taskidList != null && !"".equals(taskidList)) {
String[] taskidListSplited = taskidList.split(",");
for(String taskid : taskidListSplited) {
String taskidLockPath = "/taskid-lock-" + taskid;
boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
if(!result) {
continue;
}
String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
zkSession.acquireDistributedLock(taskidStatusLockPath);
//檢查越熱的狀态
String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
if("".equals(taskidStatus)) {
String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
JSONArray productidJSONArray = JSONArray.parseArray(productidList);
for(int i = 0; i < productidJSONArray.size(); i++) {
Long productId = productidJSONArray.getLong(i);
String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售後服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
cacheService.saveProductInfo2LocalCache(productInfo);
cacheService.saveProductInfo2ReidsCache(productInfo);
}
zkSession.setNodeData(taskidStatusLockPath, "success");
}
zkSession.releaseDistributedLock(taskidStatusLockPath);
zkSession.releaseDistributedLock(taskidLockPath);
}
}
}
}
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;
/**
* 緩存預熱線程
*/
public class CachePrewarmThread extends Thread {
@Override
public void run() {
CacheService cacheService = (CacheService) SpringContext.
getApplicationContext().getBean("cacheService");
ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
// 擷取storm taskid清單
String taskidList = zkSession.getNodeData("/taskid-list");
if(taskidList != null && !"".equals(taskidList)) {
String[] taskidListSplited = taskidList.split(",");
for(String taskid : taskidListSplited) {
String taskidLockPath = "/taskid-lock-" + taskid;
boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
if(!result) {
continue;
}
String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
zkSession.acquireDistributedLock(taskidStatusLockPath);
//檢查越熱的狀态
String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
if("".equals(taskidStatus)) {
String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
JSONArray productidJSONArray = JSONArray.parseArray(productidList);
for(int i = 0; i < productidJSONArray.size(); i++) {
Long productId = productidJSONArray.getLong(i);
String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售後服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
cacheService.saveProductInfo2LocalCache(productInfo);
cacheService.saveProductInfo2ReidsCache(productInfo);
}
zkSession.setNodeData(taskidStatusLockPath, "success");
}
zkSession.releaseDistributedLock(taskidStatusLockPath);
zkSession.releaseDistributedLock(taskidLockPath);
}
}
}
}