天天看點

Storm1.2.2整合Kafka2.1.1程式設計

前言

Storm到1版本後發送了很大的變化,很多api都有很多改變。然而網絡上的多數教程都是舊版本的api。

導緻了這一部分的程式設計花了我非常非常多的時間閱讀官方文檔的github上的程式。

而且某些錯誤是真的難debug。

本文隻寫一個簡單的讀取kafka消息的demo

因為程式設計是簡單的,麻煩的是程式設計之外的細節。

至于程式設計的問題可以參考官方在github上的example

在storm目錄裡面也能找到example

環境

Ubuntu18.04

Storm1.2.2

Kafka2.1.1

Zookeeper3.4.13

運作架構

開啟zookeeper

開啟kafka的broker

開啟storm nimbus

開啟storm supervisor

開啟storm ui

導包

網絡環境好可以用maven

但其實所要的jar包基本都放在lib目錄下了。

另外需要載下strom-kafka-client.jar和strom-starter.jar

可以去maven官網下

程式設計

首先聲明一個TopologyBuilder

TopologyBuilder tp = new TopologyBuilder();
           

設定spout和bolt

topologybuilder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:9092", "test").build()), 1);
topologybuilder.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
           

如果向更加詳細的設定spout

寫一個方法生成KafkaSpoutConfig

private static KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String topic) {
		ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
	            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
	            new Fields("topic", "partition", "offset", "key", "value"), "stream1");
		//bootstrapServer 以及topic
        return KafkaSpoutConfig.builder(IPUtil.getUbuntu1()+":9092", topic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                .setRecordTranslator(trans)
                .setRetry(getRetryService())
                .setOffsetCommitPeriodMs(10_000)
                .setFirstPollOffsetStrategy(EARLIEST)
                .setMaxUncommittedOffsets(250)
                .build();
    }
    protected static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
    }
           

topologybuilder這樣設定

topologybuilder.setSpout("kafka_spout", new KafkaSpout<String,String>(newKafkaSpoutConfig("test")));
           

獲得storm設定

上面設定的是topology,現在設定的是storm配置

Config stormConf=new Config();
stormConf.setNumWorkers(1);
stormConf.setDebug(true);
           

其實還有很多設定,topology也是。可以去官網看看,因為太多就不例舉了。

上傳topology到storm叢集

這一步是最最重要的!

首先要告訴叢集上傳的topology的jar包叫什麼名字。

System.setProperty("storm.jar", "storm-test.jar");
           

然後才上傳

StormSubmitter.submitTopology("testtopology", stormConf, topologybuilder.createTopology());
           

上傳有很多方式,你可以StormSubmitter的方法,裡面還有很多其他參數的上傳方式,你可以通過看源碼知道其運作方式。

上傳寫是寫完了,但還不行。還有需要注意的地方。繼續看。

本文最重要的地方

如果隻是像上文那樣寫完了是不行的。雖然這以及是最終的代碼。

因為你還要做兩個操作

1.将整個項目export成jar包,

export->JAR file -> Export all output folders for checked projects -> next -> next -> generate the manifest file->seal the jar -> finsh。

2.将需要的外部檔案放到storm目錄下的exlib目錄裡,這一步非常重要。

因為我們用kafka,是以記得把kafka的jar包放到exlib裡面。

原因:本地可以讀取本地檔案系統及java項目中的檔案,但是送出叢集後就不能讀取了,storm隻是将topology送出到了叢集,是以隻能在main方法中将需要讀取的配置在送出topology之前讀出來,然後再解析,而不是在spout和bolt中讀取配置檔案;參考-》storm本地可以運作叢集出錯遇到的問題

打包成jar包後放在項目根目錄。

配合System.setProperty(“storm.jar”, “storm-test.jar”);這個語句就可以遠端發送topology。

然後就成功了。

整份代碼參考

KafkaTopoDemo

package stormtest;

import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import datacool.hadoop.common.IPUtil;

public class KafkaTopoDemo {
	public static void main(String[] args) {
		final TopologyBuilder topologybuilder = new TopologyBuilder();
		topologybuilder.setSpout("kafka_spout", new KafkaSpout<String,String>(newKafkaSpoutConfig("test")));
		topologybuilder.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
		
		Config stormConf=new Config();
		stormConf.setNumWorkers(1);
		stormConf.setDebug(true);
		try {
			System.setProperty("storm.jar", "storm-test.jar");
			StormSubmitter.submitTopologyAs("testtopology", stormConf, topologybuilder.createTopology(), null, null, "czq");
			StormSubmitter.
			System.out.println("送出完成");
		} catch (AlreadyAliveException e) {
			// TODO Auto-generated catch block
			System.out.println("出現AlreadyAliveException");
			e.printStackTrace();
		} catch (InvalidTopologyException e) {
			// TODO Auto-generated catch block
			System.out.println("出現InvalidTopologyException");
			e.printStackTrace();
		} catch (AuthorizationException e) {
			// TODO Auto-generated catch block
			System.out.println("出現AuthorizationException");
			e.printStackTrace();
		}
    }
	
	private static KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String topic) {
		ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
	            (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
	            new Fields("topic", "partition", "offset", "key", "value"), "stream1");
		//bootstrapServer 以及topic
        return KafkaSpoutConfig.builder(IPUtil.getUbuntu1()+":9092", topic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                .setRecordTranslator(trans)
                .setRetry(getRetryService())
                .setOffsetCommitPeriodMs(10_000)
                .setFirstPollOffsetStrategy(EARLIEST)
                .setMaxUncommittedOffsets(250)
                .build();
    }
	
	protected static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
            TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
    }
}

           

Bolt

package stormtest;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class myBolt implements IRichBolt {
	private FileWriter fileWriter;
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		// TODO Auto-generated method stub
		try {
			System.out.println("準備filewriter");
			fileWriter = new FileWriter("/home/czq/tmp/please.txt");
			System.out.println("filewriter準備成功");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Override
	public void execute(Tuple input) {
		// TODO Auto-generated method stub
		System.out.println(input);
		try {
			System.out.println("準備輸出");
			fileWriter.write(input.getStringByField("value")+"\n");
			fileWriter.flush();
			System.out.println("輸出成功");
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}

           

繼續閱讀