天天看點

nginx+lua通路流量實時上報kafka

在nginx這一層,接收到通路請求的時候,就把請求的流量上報發送給kafka

storm才能去消費kafka中的實時的通路日志,然後去進行緩存熱資料的統計

從lua腳本直接建立一個kafka producer,發送資料到kafka

wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

yum install -y unzip

unzip lua-resty-kafka-master.zip

cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib
nginx -s reload      

  lua腳本:

local cjson = require("cjson")  
local producer = require("resty.kafka.producer")  

local broker_list = {  
    { host = "192.168.31.187", port = 9092 },  
    { host = "192.168.31.19", port = 9092 },  
    { host = "192.168.31.227", port = 9092 }
}

local log_json = {}  
log_json["headers"] = ngx.req.get_headers()  
log_json["uri_args"] = ngx.req.get_uri_args()  
log_json["body"] = ngx.req.read_body()  
log_json["http_version"] = ngx.req.http_version()  
log_json["method"] =ngx.req.get_method() 
log_json["raw_reader"] = ngx.req.raw_header()  
log_json["body_data"] = ngx.req.get_body_data()  

local message = cjson.encode(log_json);  

local productId = ngx.req.get_uri_args()["productId"]

local async_producer = producer:new(broker_list, { producer_type = "async" })   
local ok, err = async_producer:send("access-log", productId, message)  

if not ok then  
    ngx.log(ngx.ERR, "kafka send err:", err)  
    return  
end      

  

兩台機器上都這樣做,才能統一上報流量到kafka

bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create

bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning

(1)kafka在187上的節點死掉了,可能是虛拟機的問題,殺掉程序,重新啟動一下

nohup bin/kafka-server-start.sh config/server.properties &

(2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8;

(3)需要在kafka中加入advertised.host.name = 192.168.31.187,重新開機三個kafka程序

(4)需要啟動eshop-cache緩存服務,因為nginx中的本地緩存可能不在了

基于storm+kafka完成商品通路次數實時統計拓撲的開發:

總結思路:

1、kafka consumer spout

單獨的線程消費,寫入隊列

nextTuple,每次都是判斷隊列有沒有資料,有的話再去擷取并發射出去,不能阻塞

2、日志解析bolt

3、商品通路次數統計bolt

4、基于LRUMap完成統計

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

import com.roncoo.eshop.storm.bolt.LogParseBolt;
import com.roncoo.eshop.storm.bolt.ProductCountBolt;
import com.roncoo.eshop.storm.spout.AccessLogKafkaSpout;

/**
 * 熱資料統計拓撲
 * @author Administrator
 *
 */
public class HotProductTopology {

  public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
  
    builder.setSpout("AccessLogKafkaSpout", new AccessLogKafkaSpout(), 1);
    builder.setBolt("LogParseBolt", new LogParseBolt(), 5)
        .setNumTasks(5)
        .shuffleGrouping("AccessLogKafkaSpout");  
    builder.setBolt("ProductCountBolt", new ProductCountBolt(), 5)
        .setNumTasks(10)
        .fieldsGrouping("LogParseBolt", new Fields("productId"));  
    
    Config config = new Config();
    
    if(args != null && args.length > 1) {
      config.setNumWorkers(3);  
      try {
        StormSubmitter.submitTopology(args[0], config, builder.createTopology());
      } catch (Exception e) {
        e.printStackTrace();
      }
    } else {
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("HotProductTopology", config, builder.createTopology());  
      Utils.sleep(30000); 
      cluster.shutdown();
    }
  }
  
}      

  

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import com.alibaba.fastjson.JSONObject;

/**
 * 日志解析的bolt
 * @author Administrator
 *
 */
public class LogParseBolt extends BaseRichBolt {

  private static final long serialVersionUID = -8017609899644290359L;

  private OutputCollector collector;
  
  @SuppressWarnings("rawtypes")
  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }
  
  public void execute(Tuple tuple) {
    String message = tuple.getStringByField("message");  
    JSONObject messageJSON = JSONObject.parseObject(message);
    JSONObject uriArgsJSON = messageJSON.getJSONObject("uri_args"); 
    Long productId = uriArgsJSON.getLong("productId"); 
    
    if(productId != null) {
      collector.emit(new Values(productId));  
    }
  }
  
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("productId"));   
  }

}      

  

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.storm.shade.org.json.simple.JSONArray;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.trident.util.LRUMap;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;

import com.roncoo.eshop.storm.zk.ZooKeeperSession;

/**
 * 商品通路次數統計bolt
 * @author Administrator
 *
 */
public class ProductCountBolt extends BaseRichBolt {

  private static final long serialVersionUID = -8761807561458126413L;

  private LRUMap<Long, Long> productCountMap = new LRUMap<Long, Long>(1000);
  private ZooKeeperSession zkSession;
  private int taskid;
  
  @SuppressWarnings("rawtypes")
  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.zkSession = ZooKeeperSession.getInstance();
    this.taskid = context.getThisTaskId();
    
    new Thread(new ProductCountThread()).start();
    
    // 1、将自己的taskid寫入一個zookeeper node中,形成taskid的清單
    // 2、然後每次都将自己的熱門商品清單,寫入自己的taskid對應的zookeeper節點
    // 3、然後這樣的話,并行的預熱程式才能從第一步中知道,有哪些taskid
    // 4、然後并行預熱程式根據每個taskid去擷取一個鎖,然後再從對應的znode中拿到熱門商品清單
    initTaskId(context.getThisTaskId());
  }
  
  private void initTaskId(int taskid) {
    // ProductCountBolt所有的task啟動的時候, 都會将自己的taskid寫到同一個node的值中
    // 格式就是逗号分隔,拼接成一個清單
    // 111,211,355
    
    zkSession.acquireDistributedLock();
    
    String taskidList = zkSession.getNodeData();
    if(!"".equals(taskidList)) {
      taskidList += "," + taskid;
    } else {
      taskidList += taskid;
    }
    
    zkSession.setNodeData("/taskid-list", taskidList);  
    
    zkSession.releaseDistributedLock();
  }
  
  private class ProductCountThread implements Runnable {
    
    public void run() {
      List<Map.Entry<Long, Long>> topnProductList = new ArrayList<Map.Entry<Long, Long>>();   
      
      while(true) {
        topnProductList.clear();
        
        int topn = 3;
        
        if(productCountMap.size() == 0) {
          Utils.sleep(100);
          continue;
        }
        
        for(Map.Entry<Long, Long> productCountEntry : productCountMap.entrySet()) {
          if(topnProductList.size() == 0) {
            topnProductList.add(productCountEntry);
          } else {
            // 比較大小,生成最熱topn的算法有很多種
            // 但是我這裡為了簡化起見,不想引入過多的資料結構和算法的的東西
            // 很有可能還是會有漏洞,但是我已經反複推演了一下了,而且也畫圖分析過這個算法的運作流程了
            boolean bigger = false;
            
            for(int i = 0; i < topnProductList.size(); i++){
              Map.Entry<Long, Long> topnProductCountEntry = topnProductList.get(i);
              
              if(productCountEntry.getValue() > topnProductCountEntry.getValue()) {
                int lastIndex = topnProductList.size() < topn ? topnProductList.size() - 1 : topn - 2;
                for(int j = lastIndex; j >= i; j--) {
                  topnProductList.set(j + 1, topnProductList.get(j));  
                }
                topnProductList.set(i, productCountEntry);
                bigger = true;
                break;
              }
            }
            
            if(!bigger) {
              if(topnProductList.size() < topn) {
                topnProductList.add(productCountEntry);
              }
            }
          }
        }
        
        // 擷取到一個topn list
        String topnProductListJSON = JSONArray.toJSONString(topnProductList);
        zkSession.setNodeData("/task-hot-product-list-" + taskid, topnProductListJSON);
        
        Utils.sleep(5000); 
      }
    }
    
  }
  
  public void execute(Tuple tuple) {
    Long productId = tuple.getLongByField("productId"); 
    
    Long count = productCountMap.get(productId);
    if(count == null) {
      count = 0L;
    }
    count++;
    
    productCountMap.put(productId, count);
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
  }

}      

  

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
 * kafka消費資料的spout
 */
public class AccessLogKafkaSpout extends BaseRichSpout {

  private static final long serialVersionUID = 8698470299234327074L;

  private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
  
  private SpoutOutputCollector collector;
  
  @SuppressWarnings("rawtypes")
  public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
    this.collector = collector;
    startKafkaConsumer();
  }
  
  @SuppressWarnings("rawtypes")
  private void startKafkaConsumer() {
    Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181");
        props.put("group.id", "eshop-cache-group");
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
    
    ConsumerConnector consumerConnector = Consumer.
        createJavaConsumerConnector(consumerConfig);
    String topic = "access-log";
    
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
            consumerConnector.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        
        for (KafkaStream stream : streams) {
            new Thread(new KafkaMessageProcessor(stream)).start();
        }
  }
  
  private class KafkaMessageProcessor implements Runnable {

    @SuppressWarnings("rawtypes")
    private KafkaStream kafkaStream;
    
    @SuppressWarnings("rawtypes")
    public KafkaMessageProcessor(KafkaStream kafkaStream) {
      this.kafkaStream = kafkaStream;
    }
    
    @SuppressWarnings("unchecked")
    public void run() {
      ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
          while (it.hasNext()) {
            String message = new String(it.next().message());
            try {
          queue.put(message);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } 
          }
    }
    
  }
  
  public void nextTuple() {
    if(queue.size() > 0) {
      try {
        String message = queue.take();
        collector.emit(new Values(message));  
      } catch (Exception e) {
        e.printStackTrace();
      }
    } else {
      Utils.sleep(100);  
    }
  }
   
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("message"));  
  }
  
}      

  

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * ZooKeeperSession
 * @author Administrator
 *
 */
public class ZooKeeperSession {
  
  private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
  
  private ZooKeeper zookeeper;

  public ZooKeeperSession() {
    // 去連接配接zookeeper server,建立會話的時候,是異步去進行的
    // 是以要給一個監聽器,說告訴我們什麼時候才是真正完成了跟zk server的連接配接
    try {
      this.zookeeper = new ZooKeeper(
          "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
          50000, 
          new ZooKeeperWatcher());
      // 給一個狀态CONNECTING,連接配接中
      System.out.println(zookeeper.getState());
      
      try {
        // CountDownLatch
        // java多線程并發同步的一個工具類
        // 會傳遞進去一些數字,比如說1,2 ,3 都可以
        // 然後await(),如果數字不是0,那麼久卡住,等待
        
        // 其他的線程可以調用coutnDown(),減1
        // 如果數字減到0,那麼之前所有在await的線程,都會逃出阻塞的狀态
        // 繼續向下運作
        
        connectedSemaphore.await();
      } catch(InterruptedException e) {
        e.printStackTrace();
      }

      System.out.println("ZooKeeper session established......");
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  /**
   * 擷取分布式鎖
   * @param productId
   */
  public void acquireDistributedLock() {
    String path = "/taskid-list-lock";
  
    try {
      zookeeper.create(path, "".getBytes(), 
          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
      System.out.println("success to acquire lock for taskid-list-lock");  
    } catch (Exception e) {
      // 如果那個商品對應的鎖的node,已經存在了,就是已經被别人加鎖了,那麼就這裡就會報錯
      // NodeExistsException
      int count = 0;
      while(true) {
        try {
          Thread.sleep(1000); 
          zookeeper.create(path, "".getBytes(), 
              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e2) {
          count++;
          System.out.println("the " + count + " times try to acquire lock for taskid-list-lock......");
          continue;
        }
        System.out.println("success to acquire lock for taskid-list-lock after " + count + " times try......");
        break;
      }
    }
  }
  
  /**
   * 釋放掉一個分布式鎖
   * @param productId
   */
  public void releaseDistributedLock() {
    String path = "/taskid-list-lock";
    try {
      zookeeper.delete(path, -1); 
      System.out.println("release the lock for taskid-list-lock......");  
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  public String getNodeData() {
    try {
      return new String(zookeeper.getData("/taskid-list", false, new Stat()));  
    } catch (Exception e) {
      e.printStackTrace();
    }
    return "";
  }
  
  public void setNodeData(String path, String data) {
    try {
      zookeeper.setData(path, data.getBytes(), -1);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  /**
   * 建立zk session的watcher
   * @author Administrator
   *
   */
  private class ZooKeeperWatcher implements Watcher {

    public void process(WatchedEvent event) {
      System.out.println("Receive watched event: " + event.getState());
      if(KeeperState.SyncConnected == event.getState()) {
        connectedSemaphore.countDown();
      } 
    }
    
  }
  
  /**
   * 封裝單例的靜态内部類
   * @author Administrator
   *
   */
  private static class Singleton {
    
    private static ZooKeeperSession instance;
    
    static {
      instance = new ZooKeeperSession();
    }
    
    public static ZooKeeperSession getInstance() {
      return instance;
    }
    
  }
  
  /**
   * 擷取單例
   * @return
   */
  public static ZooKeeperSession getInstance() {
    return Singleton.getInstance();
  }
  
  /**
   * 初始化單例的便捷方法
   */
  public static void init() {
    getInstance();
  }
  
}      

  于雙重zookeeper分布式鎖完成分布式并行緩存預熱:

1、服務啟動的時候,進行緩存預熱

2、從zk中讀取taskid清單

3、依次周遊每個taskid,嘗試擷取分布式鎖,如果擷取不到,快速報錯,不要等待,因為說明已經有其他服務執行個體在預熱了

4、直接嘗試擷取下一個taskid的分布式鎖

5、即使擷取到了分布式鎖,也要檢查一下這個taskid的預熱狀态,如果已經被預熱過了,就不再預熱了

6、執行預熱操作,周遊productid清單,查詢資料,然後寫ehcache和redis

7、預熱完成後,設定taskid對應的預熱狀态

ZKsession重載兩個方法:

/**
   * 擷取分布式鎖
   * @param productId
   */
  public boolean acquireFastFailedDistributedLock(String path) {
    try {
      zookeeper.create(path, "".getBytes(), 
          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
      System.out.println("success to acquire lock for " + path);  
      return true;
    } catch (Exception e) {
      System.out.println("fail to acquire lock for " + path);  
    }
    return false;
  }

/**
   * 釋放掉一個分布式鎖
   * @param productId
   */
  public void releaseDistributedLock(String path) {
    try {
      zookeeper.delete(path, -1); 
      System.out.println("release the lock for " + path + "......");  
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
public String getNodeData(String path) {
    try {
      return new String(zookeeper.getData(path, false, new Stat())); 
    } catch (Exception e) {
      e.printStackTrace();
    }
    return "";
  }
  
  public void setNodeData(String path, String data) {
    try {
      zookeeper.setData(path, data.getBytes(), -1);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }      

  

/**
   * 擷取分布式鎖
   */
  public void acquireDistributedLock(String path) {
    try {
      zookeeper.create(path, "".getBytes(), 
          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
      System.out.println("success to acquire lock for " + path);  
    } catch (Exception e) {
      // 如果那個商品對應的鎖的node,已經存在了,就是已經被别人加鎖了,那麼就這裡就會報錯
      // NodeExistsException
      int count = 0;
      while(true) {
        try {
          Thread.sleep(1000); 
          zookeeper.create(path, "".getBytes(), 
              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e2) {
          count++;
          System.out.println("the " + count + " times try to acquire lock for " + path + "......");
          continue;
        }
        System.out.println("success to acquire lock for " + path + " after " + count + " times try......");
        break;
      }
    }
  }      

  

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;

/**
 * 緩存預熱線程
 */
public class CachePrewarmThread extends Thread {
  
  @Override
  public void run() {
    CacheService cacheService = (CacheService) SpringContext.
        getApplicationContext().getBean("cacheService"); 
    ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
    
    // 擷取storm taskid清單
    String taskidList = zkSession.getNodeData("/taskid-list"); 
    
    if(taskidList != null && !"".equals(taskidList)) {
      String[] taskidListSplited = taskidList.split(",");  
      for(String taskid : taskidListSplited) {
        String taskidLockPath = "/taskid-lock-" + taskid;
        
        boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
        if(!result) {
          continue;
        }
        
        String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
        zkSession.acquireDistributedLock(taskidStatusLockPath);  
        //檢查越熱的狀态
        String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
        
        if("".equals(taskidStatus)) {
          String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
          JSONArray productidJSONArray = JSONArray.parseArray(productidList);
          
          for(int i = 0; i < productidJSONArray.size(); i++) {
            Long productId = productidJSONArray.getLong(i);
            String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售後服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
            ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
            cacheService.saveProductInfo2LocalCache(productInfo);
            cacheService.saveProductInfo2ReidsCache(productInfo);  
          }
          
          zkSession.setNodeData(taskidStatusLockPath, "success");   
        }
        
        zkSession.releaseDistributedLock(taskidStatusLockPath);
        
        zkSession.releaseDistributedLock(taskidLockPath);
      }
    }
  }
  
}      

  

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.roncoo.eshop.cache.model.ProductInfo;
import com.roncoo.eshop.cache.service.CacheService;
import com.roncoo.eshop.cache.spring.SpringContext;
import com.roncoo.eshop.cache.zk.ZooKeeperSession;

/**
 * 緩存預熱線程
 */
public class CachePrewarmThread extends Thread {
  
  @Override
  public void run() {
    CacheService cacheService = (CacheService) SpringContext.
        getApplicationContext().getBean("cacheService"); 
    ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
    
    // 擷取storm taskid清單
    String taskidList = zkSession.getNodeData("/taskid-list"); 
    
    if(taskidList != null && !"".equals(taskidList)) {
      String[] taskidListSplited = taskidList.split(",");  
      for(String taskid : taskidListSplited) {
        String taskidLockPath = "/taskid-lock-" + taskid;
        
        boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath);
        if(!result) {
          continue;
        }
        
        String taskidStatusLockPath = "/taskid-status-lock-" + taskid;
        zkSession.acquireDistributedLock(taskidStatusLockPath);  
        //檢查越熱的狀态
        String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid);
        
        if("".equals(taskidStatus)) {
          String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid);
          JSONArray productidJSONArray = JSONArray.parseArray(productidList);
          
          for(int i = 0; i < productidJSONArray.size(); i++) {
            Long productId = productidJSONArray.getLong(i);
            String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售後服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}";
            ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
            cacheService.saveProductInfo2LocalCache(productInfo);
            cacheService.saveProductInfo2ReidsCache(productInfo);  
          }
          
          zkSession.setNodeData(taskidStatusLockPath, "success");   
        }
        
        zkSession.releaseDistributedLock(taskidStatusLockPath);
        
        zkSession.releaseDistributedLock(taskidLockPath);
      }
    }
  }
  
}