天天看點

Storm 與Kafka 整合一、wurstmeister/storm-kafka-0.8-plus二、kafka 生産者的建立三、建立消費 kafka 資料的Topology 四、建立資料輸出的Bolt

這裡的目标是kafka 負責生産資料,storm 消費資料并将結果輸出

一、wurstmeister/storm-kafka-0.8-plus

這裡用的是引進别人家寫的整合代碼,因為使用的人也比較多,下面是項目位址

https://github.com/wurstmeister/storm-kafka-0.8-plus

下載下傳、解壓以及将這個目錄下的代碼添加進項目

storm-kafka-0.8-plus-master\storm-kafka-0.8-plus-master\src\jvm

将kafka 和 storm 的JAR 添加進項目,作為依賴jar 包

然後添加com.netflix.curator 的相關包括client、framework和recipes

下載下傳位址:http://maven.outofmemory.cn/com.netflix.curator/

最新的所有com.google.common類,下載下傳位址

http://central.maven.org/maven2/com/google/guava/guava/18.0/guava-18.0.jar

這樣storm-kafka-0.8-plus項目應該就不會報錯了。

二、kafka 生産者的建立

在我的這篇文章裡3.6、Producer JAVA API,有生産者的例子,可以拿來直接用。

http://blog.csdn.net/looklook5/article/details/41248561

三、建立消費 kafka 資料的Topology

storm-kafka-0.8-plus 給我們寫了個測試代碼

位址是:

https://github.com/wurstmeister/storm-kafka-0.8-plus-test/blob/master/src/main/java/storm/kafka/KafkaSpoutTestTopology.java

代碼如下:

package storm.kafka;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

public class KafkaSpoutTestTopology {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestTopology.class);

    public static class PrinterBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            LOG.info(tuple.toString());
        }

    }

    private final BrokerHosts brokerHosts;

    public KafkaSpoutTestTopology(String kafkaZookeeper) {
        brokerHosts = new ZkHosts(kafkaZookeeper);
    }

    public StormTopology buildTopology() {
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "storm-sentence", "", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
        builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
        return builder.createTopology();
    }

    public static void main(String[] args) throws Exception {

        String kafkaZk = args[0];
        KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);

        StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
        if (args != null && args.length > 1) {
            String name = args[1];
            String dockerIp = args[2];
            config.setNumWorkers(2);
            config.setMaxTaskParallelism(5);
            config.put(Config.NIMBUS_HOST, dockerIp);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
            StormSubmitter.submitTopology(name, config, stormTopology);
        } else {
            config.setNumWorkers(2);
            config.setMaxTaskParallelism(2);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafka", config, stormTopology);
        }
    }
}
           

這裡清晰的寫出了建立一個與kafka整合的storm Topology,觀察main 函數,從上往下看:

下面是關于zookeeper的設定以及spout和bolt 的設定

String kafkaZk = args[0];
KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
           

下面的語句中,storm-sentence是話題,下面的語句是要求在zookeeper 伺服器中在根目錄建立檔案夾storm,用于kafka存放zookeeper相關資料

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, " storm-sentence ", "", "storm");
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); 這裡是設定spout,負責從kafka消費資料,其中word 是spout 名稱,KafkaSpout 由storm-kafka-0.8-plus 提供,10為并發數。
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); 這個是設定spout 接下去的bolt, PrinterBolt看名稱應該負責列印bolt的資料的類。shuffleGrouping("words")表示資料是采用随機模式。後面接的資料來自與叫做words的spout
           

下面是設定Topology的相關設定

Config config = new Config(); 初始化一個storm設定
config.setNumWorkers(2);  這個代表配置設定2個Worker。
StormSubmitter.submitTopology(args[0], config, builder.createTopology()); 這個表示想Storm 伺服器送出Topology任務,其中第一個參數是Topology的name.
config.setMaxTaskParallelism(3); 一個work的最大并發數為3
LocalCluster cluster = new LocalCluster(); 開啟Storm本地模式 
cluster.submitTopology("special-topology", config, builder.createTopology());  在本地網模式下送出storm任務。	           	
cluster.shutdown(); 關閉Storm本地模式。
           

下面是我修改後的腳本

import com.google.common.collect.ImmutableList;
import com.ks.bolt.CounterBolt;
import com.ks.bolt.DateCutBolt;
import com.ks.bolt.InsertMysqlBolt;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class CountTopology {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try{
			String kafkaZookeeper = "carl:2181,slave1:2181,slave2:2181";
			BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
			SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
	        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	        kafkaConfig.zkServers =  ImmutableList.of("carl","slave1","slave2");
	        kafkaConfig.zkPort = 2181;
			
	        //kafkaConfig.forceFromStart = true;
			
	        
	        TopologyBuilder builder = new TopologyBuilder();
	        builder.setSpout("spout", new KafkaSpout(kafkaConfig), 2);
	      //*************************下面是所有處理邏輯,隻關注這個*****************************
	        builder.setBolt("datecut", new CounterBolt(),1).shuffleGrouping("spout");
	        //*************************下面是所有處理邏輯,隻關注這個*****************************

	        Config config = new Config();
	        config.setDebug(true);
			
	        if(args!=null && args.length > 0) {
	            config.setNumWorkers(2);
	            
	            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
	        } else {        
	            config.setMaxTaskParallelism(3);
	    
	            LocalCluster cluster = new LocalCluster();
	            cluster.submitTopology("special-topology", config, builder.createTopology());
	            
	            Thread.sleep(500000);
	
	            cluster.shutdown();
	        }
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

}
           

這裡在本地模式下讓他運作20秒鐘自動結束,因為這個比較耗資源。注意以下這句,

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "test", "/storm", "stormid");
           

請記得在zookeeper 根目錄下面建立檔案夾storm,然後在storm 檔案夾下面繼續建立檔案夾stormid 用于存放kafka資訊資料

上面的Topology 設定了bolt 為CounterBolt,是以還要建一個CounterBolt的bolt 類。

這裡設定了,運作jar包敲參數為送出到storm伺服器,不敲參數則是運作storm本地模式。

四、建立資料輸出的Bolt

這裡實作一個十分簡單的bolt 類

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class CounterBolt extends BaseBasicBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5508421065181891596L;
	
	private static long counter = 0;
	
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		System.out.println("msg = "+tuple.getString(0)+" -------------counter = "+(counter++));

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

}
           

這裡很簡單就是将bolt 擷取的資料進行簡單的輸出,并統計接收到的資料條目數。這裡繼續BaseBasicBolt 類,因為這樣開發會比較簡單。因為這個是唯一的bolt,沒有輸出,是以在declareOutputFields 方法中不需要聲明output。

System.out.println("msg = "+ tuple.getString(0)+"-------------counter = "+(counter++));

這裡tuple就是這個bolt 從上一個spout擷取的資料集合。

這裡是控制台輸出,是以請用本地模式進行調試。

打包上傳到伺服器,運作

Storm jar jarname CountTopology     回車,會看到他在等待資料傳入。

這個時候運作kafka消費者程式,将資料輸出,則會看到storm 會迅速輸出資料和統計數目。

這裡測試不寫了。