Linux環境下Apache Storm的使用:爬取指定網站資訊并将資訊存進redis和kafka,再将兩者讀出的資料作為spout,分别對兩者的資料詞頻統計、行數統計、字數統計,并将所得結果存入redis記憶體資料庫,并觀察這兩個情況的時延、吞吐量。注意:兩個資料源兩個拓撲。
目錄
- 1.環境搭建
-
- 1.1安裝JDK
- 1.2安裝eclipse
- 1.3 安裝、打開zookeeper服務
- 1.4安裝、打開Kafka服務
- 1.5安裝、打開Redis服務
- 1.6 安裝、打開Storm服務
- 1.7 添加maven依賴
- 2.程式使用說明
- 3.運作截圖
- 4.總體設計
- 5.詳細設計
-
- 5.1 C00Main.java
- 5.2 C10Crawler.java
- 5.3 C20KafkaProducer.java
- 5.4 C21SaveToRedis.java
- 5.5 C30Topology.java
- 5.6 C40KafkaSpout.java
- 5.7 C41RedisSpout.java
- 5.8 C50SplitBolt.java
- 5.9 C51WordCountBolt.java
- 5.10 C52RowCountBolt.java
- 5.11 C60WordFrequencyBolt.java
- 5.12 C70ReportBolt.java
- 6.存在問題
- 7. 源代碼
-
- 7.1 C00Main.java
- 7.2 C10Crawler.java
- 7.3 C20KafkaProducer.java
- 7.4 C21SaveToRedis.java
- 7.5 C30Topology.java
- 7.6 C40KafkaSpout.java
- 7.7 C41RedisSpout.java
- 7.8 C50SplitBolt.java
- 7.9 C51WordCountBolt.java
- 7.10 C52RowCountBolt.java
- 7.11 C60WordFrequencyBolt.java
- 7.12 C70ReportBolt.java
- 8. 項目位址
1.環境搭建
1.1安裝JDK
- 從官網下載下傳合适的安裝包,這裡使用的是安裝包是jdk-8u271-linux-x64.tar.gz,注意一定要使用此版本,否則無法正常使用storm
- 解壓tar -zxvf jdk-8u271-linux-x64.tar.gz
- 設定環境變量:打開檔案vim /etc/profile并在最前面添加(其中第一行的路徑為jdk檔案解壓後的路徑)
export JAVA_HOME=/usr/lib/jvm/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
- 使用source /etc/profile使修改後的檔案生效(最好重新開機電腦)
1.2安裝eclipse
由于使用jdk8,導緻原來使用的9月份版本的eclipse無法使用,是以從官網上下載下傳之前釋出的版本,解壓後直接打開裡面的執行檔案就可以了,這裡使用的安裝包是eclipse-java-2020-03-R-linux-gtk-x86_64.tar.gz。需要注意的是若是重新裝過jdk,可能eclipse項目使用的版本可能是之前的,需要在eclipse中修改過來。
1.3 安裝、打開zookeeper服務
- 官網下載下傳安裝包,這裡使用的是apache-zookeeper-3.6.1-bin.tar.gz并解壓
- 進入到解壓後的目錄,cd apache-zookeeper-3.6.1-bin
- mkdir data,建立一個檔案夾
- cp conf/zoo_sample.cfg conf/zoo.cfg,複制檔案,名稱一定要是zoo.cfg
- vim conf/zoo.cfg,打開該檔案,并将裡面的dataDir的路徑修改為建立的data目錄
- bin/zkServer.sh Start,啟動服務,若顯示STARTED則表示啟動成功
1.4安裝、打開Kafka服務
- 從官網下載下傳安裝包,這裡使用的安裝包是kafka_2.13-2.6.0.tgz
- 解壓下載下傳的安裝包tar -zxf kafka_2.13-2.6.0.tgz
- 切換到解壓後的檔案的目錄,cd kafka_2.13-2.6.0
- 最後再通過指令bin/kafka-server-start.sh config/server.properties啟動Kafka服務(在啟動kafka之前需要保證zookeeper服務已啟動)
1.5安裝、打開Redis服務
- 官網下載下傳安裝包,這裡使用的是redis-6.0.9.tar.gz并解壓
- 切換到解壓後的目錄
- make
- 完成後使用./src/redis-server ./redis.conf啟動Redis服務即可開始使用
1.6 安裝、打開Storm服務
- 官網下載下傳安裝包,本次下載下傳的是apache-storm-2.2.0.tar.gz并解壓
- cd apache-storm-2.2.0,進入解壓後的檔案
- mkdi data,建立一個目錄
- vim ./conf/storm.yaml,打開并修改配置檔案,如下圖,其中圖中的路徑為2.6.3建立的data檔案的路徑,注意各個字段中的空格
Storm/Redis/Kafka的使用1.環境搭建2.程式使用說明3.運作截圖4.總體設計5.詳細設計6.存在問題7. 源代碼8. 項目位址 - bin/storm nimbus,啟動nimbus(在此之前必須確定zookeeper已經啟動)
- bin/storm supervisor,啟動supervisor
- bin/storm ui,可以啟動ui(可選),浏覽器輸入localhost:8888即可檢視相關資訊
- jps,檢視是否啟動成功,若成功則會顯示Nimbus, Supervisor, QuorumPeerMain(zookeeper的背景程序)
1.7 添加maven依賴
在maven項目的pom.xml檔案中添加以下導入kafka、storm、redis、ansj相關jar包
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.ansj</groupId>
<artifactId>ansj_seg</artifactId>
<version>5.1.6</version>
</dependency>
2.程式使用說明
在程式運作之前需要通過指令行啟動zookeeper/kafka/redis/storm服務,也就是實驗環境搭建部分的服務啟動,然後直接在eclipse中運作即可。而爬蟲獲得的資料在kafka中儲存在名為my_topic的主題、在redis記憶體資料庫中儲存的鍵值為my_key,而運作程式的結果則儲存在redis中鍵值名為my_count。需要注意的是當爬取不同的網站時,需要根據網站資訊的多少來設定休眠時長,設定過長會浪費時間;太短則會導緻服務一直重新開機,也會浪費時間,而且時間過短可能會使程式出錯。當沒有實時輸出統計資訊時就代表運作結束,但是不可直接強制關閉程式,否則會無法輸出最後的統計資訊,等到指定時間程式會自動關閉并在控制台輸出、儲存最後的統計結果。
3.運作截圖
圖 1 kafkaSpout控制台部分輸出1
圖2 kafkaSpout控制台部分輸出2
圖 3 redisSpout控制台部分輸出1
圖4 redisSpout控制台部分輸出2
圖 5 redis資料庫部分輸出1
圖6 redis資料庫部分輸出2
圖7 redis資料庫部分輸出3
4.總體設計
C00Main.java類為程式入口,首先先将爬取指定網站的資訊分别存儲進kafka、redis,分别對應C10Crawler.java、C20KafkaProducer.java、C21SavaToRedis.java。C30Topology用于定義storm拓撲結構,C40KafkaSpout.java、C41RedisSpout.java分别利用kafka、redis将爬取的資訊讀取出來并作為兩個獨立的spout分别發往詞頻統計(C50SplitBolt.java用于将發射過來的資料分割成一個一個單詞并将分割後的單詞發射給C60WordFrequencyBolt.java用于最後的統計)、行數統計(C52RowCountBolt.java)、字數統計(C51WordCountBolt.java)的bolt,也就是2個資料流對應6個spout、2個拓撲,其中spout的資料的逐行發射。再将分别将每個拓撲的3個spout彙聚到一個C70ReportBolt.java,用于輸出、儲存統計的資訊。其中統計資訊的輸出、儲存是在資料統計完成、休眠時間到了在釋放拓撲資源時才會輸出。
5.詳細設計
5.1 C00Main.java
該類為程式入口。首先定義兩個常量,分别是爬取的網站的網址以及SLEEP_TIME,也就是經過這麼多毫秒後關閉程式,需要根據爬取的網站資訊多少來設定時長,否則過長會浪費時間;過短則會影響程式正常運作。利用C10Crawler.java爬取并傳回網站的資訊,并将此資訊利用C20KafkaProducer.java、C21SaveToRedis.java分别往kafka隊列發送資料、redis儲存資料,再開始通過C30Topology定義storm拓撲并開始統計、輸出資訊,最後再利用exit退出程式。
5.2 C10Crawler.java
該類爬取指定的網站資訊并傳回爬取的資訊。通過start入口方法傳入參數url(爬取網站網址),由于爬取的網站資訊讀取是逐行讀取的,對字元串的連接配接較多,是以這裡使用字元串生成器StringBuilder更為節省時間,然後将爬取的網站網址添加在首行。通過網址建立URL對象,利用其方法openConnection打開連接配接并傳回URLConnection對象,再通過該對象的getInputStream方法連接配接取得網頁傳回的資料,這裡傳回InputStream對象。将該對象轉化為BufferedReader對象,并指定GBK編碼,利用while進行按行讀取并利用字元串生成器append方法逐行添加。資料讀取完成後依次關閉BufferedStream, InputStream。以上代碼放在try-catch語句塊中用于處理異常資訊,然後以字元串的形式傳回讀取到的資訊。
5.3 C20KafkaProducer.java
該類将通過入口方法start的形參傳進來的資訊然後将其儲存到Kafka隊列的名為my_topic的主題中。在入口方法start處,建立一個Properties配置檔案對象設定bootstrap.s
ervers為localhost:9092用于建立初始連接配接到kafka叢集的"主機/端口對"配置清單(必須);acks 為all,表示Producer在确認一個請求發送完成之前需要收到的回報資訊的數量,0表示不會收到,認為已經完成發送;retries為0,若設定大于0的值,則用戶端會将發送失敗的記錄重新發送(可選);batch.size為16384,控制發送資料批次的大小(可選);linger.ms為1,發送時間間隔,producer會将兩個請求發送時間間隔内到達的記錄合并到一個單獨的批處理請求中(可選);buffer.memory為33554432,Producer用來緩沖等待被發送到伺服器的記錄的總位元組數(可選);key.serializer為org.apache.kafka.common.serialization.StringSeria
lizer,關鍵字的序列化類(必須);value.serializer為org.apache.kafka.common.serialization.St
ringSerializer,值的序列化類(必須)。
然後利用以上的配置建立一個Kafka生産者,并利用send方法将資料往Kafka隊列指定的主題的發送,最後close關閉即可。
5.4 C21SaveToRedis.java
該類将傳過來的資訊儲存到Redis資料庫,鍵值為my_key。該類的入口方法包含要儲存的資料。首先建立Jedis對象,連接配接本地的Redis服務,利用該對象的set方法存放資料,然後利用close關閉Redis服務即可。
5.5 C30Topology.java
該類定義storm的拓撲,組織spout和bolt之間的邏輯關系。分别定義kafka、redis資料源的資料field字段、spoutID、分割單詞boltID、單詞統計boltID、行數統計boltID、詞頻統計boltID、輸出運算結果boltID、拓撲名稱這些常量,通過構造函數傳入關閉拓撲的時間,根據資料的長度設定。由于以kafka、redis為資料源的拓撲結構一緻,是以這裡隻說明kafka拓撲的定義。
在start方法中建立一個kafka的TopologyBuilder執行個體,再利用其方法setSpout設定資料源并設定1個Executeor(線程),預設1個,如果設定兩個線程,那麼将會讀取資料兩次,以此類推。分别調用三次setBolt方法設定資料源的流向,分别流向C50SplitBolt.java、C51WordCountBolt、C52RowCountBolt,這裡同時還指定執行的線程個數以及tast個數,而shuffleGrouping表示均勻配置設定任務。而C50SplitBolt.java再将處理過的資料發往C60WordFrequencyBolt.java,而fieldsGrouping表示字段相同的tuple會被路由到同一執行個體中。再次利用setBolt方法将C51WordCountBolt/C52RowCountBolt/C60WordFrequencyBolt三個bolt的資料發射到C70ReportBolt.java,globalGrouping表示把Bolt發送的的所有tuple路由到唯一的Bolt。同樣的方法再定義redis資料源的流向。
利用建立的LocalCluster執行個體、kafka/redis拓撲的配置對象Config,利用前者的submitTopology方法本地送出定義的kafka/redis兩個拓撲,利用sleep方法休眠等待運作結束,之後利用killTopology殺掉拓撲同時并利用close方法釋放占用的資源。
5.6 C40KafkaSpout.java
該類從Kafka隊列中讀取資訊并将該資訊作為資料源Spout。繼承自BaseRichSpout,它是ISpout接口和IComponent接口的簡單實作,接口對用不到的方法提供了預設的實作。首先定義一些變量,包括從kafka隊列獲得并分割後的資料、分割後的資料條數、讀取kafka隊列的主題名、用于往下一個bolt發射資料流的SpoutOutputCollector對象、資料偏移量、kafka資料field字段ID。利用構造函數傳入kafka資料的field字段ID作為輸出field字段ID,同時利用方法kafkaConsumer從隊列中拉取資料并傳回,對傳回的資料根據換行符進行分割,并将分割後的儲存的數組長度作為資料條數。
對于kafkaConsumer方法,首先建立一個配置檔案對象,bootstrap.servers字段設定為localhost:9092,用于建立初始連接配接到kafka叢集的"主機/端口對"配置清單(必須);group.id為my_group,消費者組id(必須);auto.offset.reset為earliest,自動将偏移量重置為最早的偏移量(可選);enable.auto.commit為true,消費者的偏移量将在背景定期送出(可選);auto.commit.interval.ms為1000,如果将enable.auto.commit設定為true,則消費者偏移量自動送出給Kafka的頻率(以毫秒為機關)(可選);key.deserializer為org.apache.kafka.commo
n.serialization.StringDeserializer,關鍵字的序列化類(必須);value.deserializer 為org.apach
e.kafka.common.serialization.StringDeserializer,值的序列化類(必須)。
利用上述配置建立一個Kafka消費者,利用方法subscribe訂閱指定名稱的主題。再利用poll方法擷取隊列中的資料集,再利用for結構輸出即可,同時close消費者。若資料為空,則直接退出程式,否則傳回獲得的資料。
覆寫BaseRichSpout的open方法,該方法在Spout元件初始化時被調用,這裡儲存SpoutOutputCollector對象,用于發送tuple。覆寫BaseRichSpout的nextTuple方法,storm調用這個方法,向輸出的collector發出tuple。這裡隻發送dataNum次資料,也就是利用emit方法将資料一行一行發射出去。覆寫BaseRichSpout的declareOutputFields方法,所有Storm的元件(spout和bolt)都必須實作這個接口 用于告訴Storm流元件将會發出那些資料流,每個流的tuple将包含的字段,這裡為KAFKA_DATA_FIELD。
5.7 C41RedisSpout.java
該方法從Redis中讀出資料并發射給訂閱的Bolt。大部分代碼、思路同C40KafkaSpout.java基本一樣,唯一不同的就是此資料源利用的是getDataFromRedis方法從redis中擷取資料,首先建立Jedis對象:連接配接本地的Redis服務,通過該對象的get方法擷取資料若資料為空則退出程式,否則傳回讀取的資料。
5.8 C50SplitBolt.java
該類将獲得的資料分割單詞,并一個單詞一單詞發射給下一個bolt,繼承自BaseRichBolt,類似于前面BaseRichSpout,隻不過前者是bolt的,後者是spout的。通過構造函數獲得要訂閱的fieldID,prepare方法同前面所說的open類似,用于bolt的初始化。接下來是本人定義的方法getWords,通過形參将傳進來的字元串分割一個一個單詞并并以ArrayList的形式傳回。這裡主要是利用的ansj庫的ToAnalysis.parse方法區分出單詞/詞語出來。再利用split方法根據逗号分割字元串,同時過濾掉空格、空白字元以及标點符号,将符合條件的單詞添加到ArrayList當中。
覆寫接口當中的execute方法,該方法在運作期間不斷執行,這裡利用方法input.getStr
ingByField接收發射過來的資料,然後調用getWords方法分割單詞并逐個往下一個bolt發射資料。覆寫declareOutputFields方法,聲明fieldID為word。
5.9 C51WordCountBolt.java
該類訂閱spout發射的tuple流,實作字數統計。和上面說過的相似的就不再說了,顯得備援。主要說一下不同的地方。execute方法通過getStringByField、filedID擷取發射過來的資料,每次有資料過來,則通過getWordCount方法統計字數并和此前統計的字數相加獲得總字數并發往下一個bolt。對于方法getWordCount,利用Pattern.compile初始化對象Pattern,再利用該對象的matcher方法傳回Matcher對象,然後當matcher.find()==true,也就找到比對的,每次找到則加一。利用同樣的方法統計非中文字元并相加,得到的總字數傳回。聲明此bolt的fieldID為wordCount。
5.10 C52RowCountBolt.java
該類訂閱spout發射的tuple流,實作行數統計。在execute方法獲得發射過來的資料,由于資料是一行一行發送過來的,是以如果不為空,則統計行數的變量加一,再通過emit方法将統計的資料發往下一個bolt。聲明此bolt的fieldID為rowCount。
5.11 C60WordFrequencyBolt.java
該類訂閱C50SplitBolt的輸出流,實作單詞計數,并發送目前計數給C70ReportBolt。execute方法中獲得發射過來的單詞後通過單詞作為HashMap的key獲得統計的單詞個數,如果為空,代表之前沒有統計過此單詞,則将它額個數加1并利用put方法存進HashMap中,再往C70發射單詞以及對應的統計個數。聲明此bolt的fieldID為word和count,也就是發射一對資料。
5.12 C70ReportBolt.java
該方法通過C51/C52/C60發射過來的資料處理之後螢幕輸出顯示統計的最終結果并儲存至redis,鍵值為my_count。通過構造函數傳入單詞統計boltID、詞頻統計boltID。Execute方法中,通過Tuple對象的getSourceComponent可以獲得此資料的boltID進而判斷是詞頻統計還是行數統計還是字數統計的資訊。如果當中包含KAFKA,則源頭資料spout為kafka,并将目前spout,也就是currentSpout設定為KAFKA,否則為REDIS,用于區分資料源spout。
覆寫方法cleanup,Storm在終止一個bolt之前會調用這個方法,本程式在topology關閉時輸出、儲存最終的計數結果,也就是會顯示、儲存在execute中變量的最後的值,其中還通過定義匿名内部類并覆寫重寫方法compare,使得詞頻的統計可以根據value值由大到小進行排序輸出、儲存。由于這是資料流的終點,是以就沒必要聲明發射的fieldID,但是方法declareOutputFields還是要寫出來,prepare也是一個道理。
6.存在問題
每次重新啟動各項相關服務後運作的第1次運作都會出現從Kafka隊列中讀取的資料是空的,之後的運作就完全沒問題了。雖然知道是因為kafka consumer的問題,但是該bug目前仍然沒有辦法解決。
7. 源代碼
7.1 C00Main.java
import org.apache.kafka.common.utils.Exit;
/**
* 題目:
* 該程式爬取指定網站的資訊并存進redis和kafka,再從兩者讀出資料并利用
* storm分别對兩者的資料進行詞頻統計、行數統計、字數統計,并将所的結果存入redis
*
* 程式結構:
* C10爬取并傳回資料,通過C20、C21分别儲存到kafka和redis,C30為storm的拓撲,C40、C41
* 分别從kafka和redis中獲得資料并作為spout資料源都分别發往C50、C51、C52bolt,對應功能
* 單詞分割、字數統計、行數統計,C50将分割的單詞發往C60進行詞頻統計,然後将以kafka和redis
* 為資料源所統計的資料,也就是兩者的C60、C51、C52都發往C70輸出統計結果,同時儲存到redis,
* 一個spout對應的是一個拓撲
*
* @author zhz
*/
public class C00Main {
private final static String URL = "https://new.qq.com/omn/20201229/20201229A06GUL00.html";//爬取的網站網址
private final static long SLEEP_TIME = 15000L;// 經過SLEEP_TIME毫秒後關閉程式,根據資料長度來設定
public static void main(String[] args) {
String dataToProcess = new C10Crawler().start(URL);//爬取并傳回網站資訊
new C20KafkaProducer().start(dataToProcess);//往kafka隊列發送資料
new C21SaveToRedis().start(dataToProcess);//往redis儲存資料
new C30Topology(SLEEP_TIME).start();//運作storm拓撲
Exit.exit(0);// 退出程式
}
}
7.2 C10Crawler.java
import java.io.*;
import java.net.*;
/**
* 爬取并傳回的網站資訊
*
* @author zhz
*/
public class C10Crawler {
public String start(String url) {
StringBuilder dataSB = new StringBuilder(url + '\n');// 網址+傳回的資料(使用字元串生成器,由于需要頻繁附加字元串)
try {
URL urlObject = new URL(url);// 建立URL對象
URLConnection conn = urlObject.openConnection();// 通過URL對象打開連接配接
InputStream is = conn.getInputStream();// 通過連接配接取得網頁傳回的資料
String dataLine;// 爬取的每一行的資料
BufferedReader br = new BufferedReader(new InputStreamReader(is, "GBK"));// 一般按行讀取網頁資料,是以用BufferedReader和InputStreamReader把位元組流轉化為字元流的緩沖流
while ((dataLine = br.readLine()) != null) {// 按行讀取
dataSB.append(dataLine + '\n');
}
br.close();// 關閉BufferedReader
is.close();// 關閉InputStream
}
catch (Exception e) {
e.printStackTrace();
}
return dataSB.toString();
}
}
7.3 C20KafkaProducer.java
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 将發送過來的資訊儲存到Kafka隊列
*
* @author zhz
*/
public class C20KafkaProducer {
public void start(String data) {
try {
Properties props = new Properties();// 建立一個配置檔案對象
props.put("bootstrap.servers", "localhost:9092");// 用于建立初始連接配接到kafka叢集的"主機/端口對"配置清單(必須)
props.put("acks", "all");// Producer在确認一個請求發送完成之前需要收到的回報資訊的數量,0表示不會收到,認為已經完成發送(可選)
props.put("retries", 0);// 若設定大于0的值,則用戶端會将發送失敗的記錄重新發送(可選)
props.put("batch.size", 16384);// 控制發送資料批次的大小(可選)
props.put("linger.ms", 1);// 發送時間間隔,producer會将兩個請求發送時間間隔内到達的記錄合并到一個單獨的批處理請求中(可選)
props.put("buffer.memory", 33554432);// Producer用來緩沖等待被發送到伺服器的記錄的總位元組數(可選)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 關鍵字的序列化類(必須)
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 值的序列化類(必須)
Producer<String, String> producer = new KafkaProducer<>(props);// 建立一個以上述定義的配置的生産者
producer.send(new ProducerRecord<String, String>("my_topic", data));// 将資料傳到Kafka隊列的url主題
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4 C21SaveToRedis.java
import redis.clients.jedis.Jedis;
/**
* 将傳過來的資訊儲存到Redis
*
* @author zhz
*/
public class C21SaveToRedis {
public void start(String data) {
try {
Jedis jedis = new Jedis("localhost");// 建立Jedis對象:連接配接本地的Redis服務
jedis.set("my_key", data);// 存放資料 key value
jedis.close();// 關閉Redis服務
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.5 C30Topology.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
/**
* storm拓撲
*
* @author zhz
*/
public class C30Topology {
private static final String KAFKA_DATA_FIELD = "KAFKA_DATA";// kafka資料field字段ID
private static final String KAFKA_SPOUT_ID = "KAFKA_SPOUT";// kafka資料源SpoutID
private static final String KAFKA_SPLIT_WORD_BOLT_ID = "KAFKA_SPLIT_WORD_BOLT";// kafka分割單詞boltID
private static final String KAFKA_WORD_COUNT_BOLT_ID = "KAFKA_WORD_COUNT_BOLT";// kafka單詞統計boltID
private static final String KAFKA_ROW_COUNT_BOLT_ID = "KAFKA_ROW_COUNT_BOLT";// kafka行數統計boltID
private static final String KAFKA_FREQUENCY_COUNT_BOLT_ID = "KAFKA_FREQUENCY_COUNT_BOLT";// kafka詞頻統計boltID
private static final String KAFKA_REPORT_BOLT_ID = "KAFKA_REPORT_BOLT";// kafka輸出運算結果boltID
private static final String KAFKA_TOPOLOGY_NAME = "KAFKA_TOPOLOGY";// kafka拓撲名稱
private static final String REDIS_DATA_FIELD = "REDIS_DATA";// redis資料field字段ID
private static final String REDIS_SPOUT_ID = "REDIS_SPOUT";// redis資料源SpoutID
private static final String REDIS_SPLIT_WORD_BOLT_ID = "REDIS_SPLIT_WORD_BOLT";// redis分割單詞boltID
private static final String REDIS_WORD_COUNT_BOLT_ID = "REDIS_WORD_COUNT_BOLT";// redis單詞統計boltID
private static final String REDIS_ROW_COUNT_BOLT_ID = "REDIS_ROW_COUNT_BOLT";// redis行數統計boltID
private static final String REDIS_FREQUENCY_COUNT_BOLT_ID = "REDIS_FREQUENCY_COUNT_BOLT";// redis詞頻統計boltID
private static final String REDIS_REPORT_BOLT_ID = "REDIS_REPORT_BOLT";// redis輸出運算結果boltID
private static final String REDIS_TOPOLOGY_NAME = "REDIS_TOPOLOGY";// redis拓撲名稱
private long sleepTime;// 經過sleepTime毫秒後關閉程式,根據資料長度來設定
public C30Topology(long sleepTime) {
this.sleepTime = sleepTime;
}
public void start() {
TopologyBuilder kafkaBuilder = new TopologyBuilder();// 建立了一個kafka的TopologyBuilder執行個體
TopologyBuilder redisBuilder = new TopologyBuilder();// 建立了一個redis的TopologyBuilder執行個體
kafkaBuilder.setSpout(KAFKA_SPOUT_ID, new C40KafkaSpout(KAFKA_DATA_FIELD), 1);// 設定資料源并設定1個Executeor(線程),預設1個
kafkaBuilder.setBolt(KAFKA_SPLIT_WORD_BOLT_ID, new C50SplitBolt(KAFKA_DATA_FIELD), 2).setNumTasks(2)
.shuffleGrouping(KAFKA_SPOUT_ID);// 設定2個Executeor(線程)和2個Task,shuffleGrouping表示均勻配置設定任務,C40KafkaSpout -->
// C50SplitBolt
kafkaBuilder.setBolt(KAFKA_WORD_COUNT_BOLT_ID, new C51WordCountBolt(KAFKA_DATA_FIELD), 1).setNumTasks(1)
.shuffleGrouping(KAFKA_SPOUT_ID);// C40KafkaSpout --> C51WordCountBolt
kafkaBuilder.setBolt(KAFKA_ROW_COUNT_BOLT_ID, new C52RowCountBolt(KAFKA_DATA_FIELD), 1).setNumTasks(1)
.shuffleGrouping(KAFKA_SPOUT_ID);// C40KafkaSpout --> C52RowCountBolt
kafkaBuilder.setBolt(KAFKA_FREQUENCY_COUNT_BOLT_ID, new C60WordFrequencyBolt(), 1).setNumTasks(2)
.fieldsGrouping(KAFKA_SPLIT_WORD_BOLT_ID, new Fields("word"));// fieldsGrouping表示字段相同的tuple會被路由到同一執行個體中,C50SplitBolt
// --> C60WordFrequencyBolt
kafkaBuilder// globalGrouping表示把Bolt發送的的所有tuple路由到唯一的Bolt,C51WordCountBolt/C52RowCountBolt/C60WordFrequencyBolt
// --> C70ReportBolt
.setBolt(KAFKA_REPORT_BOLT_ID,
new C70ReportBolt(KAFKA_WORD_COUNT_BOLT_ID, KAFKA_FREQUENCY_COUNT_BOLT_ID))
.globalGrouping(KAFKA_WORD_COUNT_BOLT_ID).globalGrouping(KAFKA_ROW_COUNT_BOLT_ID)
.globalGrouping(KAFKA_FREQUENCY_COUNT_BOLT_ID);
redisBuilder.setSpout(REDIS_SPOUT_ID, new C41RedisSpout(REDIS_DATA_FIELD));// 設定資料源
redisBuilder.setBolt(REDIS_SPLIT_WORD_BOLT_ID, new C50SplitBolt(REDIS_DATA_FIELD), 2).setNumTasks(2)
.shuffleGrouping(REDIS_SPOUT_ID);// C41RedisSpout --> C50SplitBolt
redisBuilder.setBolt(REDIS_WORD_COUNT_BOLT_ID, new C51WordCountBolt(REDIS_DATA_FIELD), 1).setNumTasks(1)
.shuffleGrouping(REDIS_SPOUT_ID);// C41RedisSpout --> C51WordCountBolt
redisBuilder.setBolt(REDIS_ROW_COUNT_BOLT_ID, new C52RowCountBolt(REDIS_DATA_FIELD), 1).setNumTasks(1)
.shuffleGrouping(REDIS_SPOUT_ID);// C41RedisSpout --> C52RowCountBolt
redisBuilder.setBolt(REDIS_FREQUENCY_COUNT_BOLT_ID, new C60WordFrequencyBolt(), 1).setNumTasks(2)
.fieldsGrouping(REDIS_SPLIT_WORD_BOLT_ID, new Fields("word"));// C50SplitBolt --> C60WordFrequencyBolt
redisBuilder// C51WordCountBolt/C52RowCountBolt/C60WordFrequencyBolt --> C70ReportBolt
.setBolt(REDIS_REPORT_BOLT_ID,
new C70ReportBolt(REDIS_WORD_COUNT_BOLT_ID, REDIS_FREQUENCY_COUNT_BOLT_ID))
.globalGrouping(REDIS_WORD_COUNT_BOLT_ID).globalGrouping(REDIS_ROW_COUNT_BOLT_ID)
.globalGrouping(REDIS_FREQUENCY_COUNT_BOLT_ID);
try {
LocalCluster cluster = new LocalCluster();// 建立LocalCluster執行個體
Config kafkaConfig = new Config();// 用于配置kafka拓撲
Config redisConfig = new Config();// 用于配置redis拓撲
cluster.submitTopology(KAFKA_TOPOLOGY_NAME, kafkaConfig, kafkaBuilder.createTopology());// 本地送出運作kafka拓撲
cluster.submitTopology(REDIS_TOPOLOGY_NAME, redisConfig, redisBuilder.createTopology());// 本地送出運作redis拓撲
Utils.sleep(sleepTime);// 休眠等待運作結束
cluster.killTopology(KAFKA_TOPOLOGY_NAME);// 殺掉拓撲
cluster.killTopology(REDIS_TOPOLOGY_NAME);
cluster.shutdown();// 關閉cluster
cluster.close();// 釋放資源
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.6 C40KafkaSpout.java
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Exit;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* 從Kafka隊列中讀取資訊并将該資訊作為資料源Spout發送到C50、C51、C52Bolt
*
* @author zhz
*/
public class C40KafkaSpout extends BaseRichSpout {// BaseRichSpout是ISpout接口和IComponent接口的簡單實作,接口對用不到的方法提供了預設的實作
private static final long serialVersionUID = 1L;// 序列化号
private String[] dataLine;// 從kafka隊列獲得并分割後的資料
private int dataNum = 0;// 資料條數
private String topic = "my_topic";// kafka主題名
private SpoutOutputCollector collector;// 用于發射資料流
private int offset = 0;// 資料偏移量
private String KAFKA_DATA_FIELD;// kafka資料field字段ID
public C40KafkaSpout(String KAFKA_DATA_FIELD) {
this.KAFKA_DATA_FIELD = KAFKA_DATA_FIELD;
dataLine = kafkaConsumer().split("\n");// 從kafka隊列中獲得資料并根據換行符分割
dataNum = dataLine.length;
}
/**
* kafka消費者,用于從kafka隊列中獲得資料
*/
public String kafkaConsumer() {
Properties props = new Properties();// 建立一個配置檔案對象
props.put("bootstrap.servers", "localhost:9092");// 用于建立初始連接配接到kafka叢集的"主機/端口對"配置清單(必須)
props.put("group.id", "my_group");// 消費者組id(必須)
props.put("auto.offset.reset", "earliest");// 自動将偏移量重置為最早的偏移量(可選)
props.put("enable.auto.commit", "true");// 消費者的偏移量将在背景定期送出(可選)
props.put("auto.commit.interval.ms", "1000");// 如果将enable.auto.commit設定為true,則消費者偏移量自動送出給Kafka的頻率(以毫秒為機關)(可選)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 關鍵字的序列化類(必須)
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 值的序列化類(必須)
String data = null;
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 建立一個以上述定義的配置的消費者
consumer.subscribe(Collections.singletonList(topic));// 訂閱消息主題
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
data = record.value();
}
consumer.close();
if (data == null)// 如果獲得的資料為空,則退出程式
Exit.exit(0);
return data;
}
/**
* 在Spout元件初始化時被調用,這裡儲存SpoutOutputCollector對象,用于發送tuple
*/
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* storm調用這個方法,向輸出的collector發出tuple,這裡隻發送dataNum次資料
*/
@Override
public void nextTuple() {
if (offset < dataNum) {
collector.emit(new Values(dataLine[offset]));
offset++;
}
}
/**
* 所有Storm的元件(spout和bolt)都必須實作這個接口 用于告訴Storm流元件将會發出那些資料流,每個流的tuple将包含的字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(KAFKA_DATA_FIELD));
}
}
7.7 C41RedisSpout.java
import redis.clients.jedis.Jedis;
import java.util.Map;
import org.apache.kafka.common.utils.Exit;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* 從Redis中讀出資料并發往C50、C51、C52Bolt
*
* @author zhz
*/
public class C41RedisSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private String key = "my_key";// redis的key值
private String ip = "localhost";// redis的IP位址
private String[] dataLine;// 資料
private int dataNum = 0;// 資料條數
private int offset = 0;// 資料偏移量
private SpoutOutputCollector collector;// 用于發射資料流
private String REDIS_DATA_FIELD;// redis輸出字段field
public C41RedisSpout(String REDIS_DATA_FIELD) {
this.REDIS_DATA_FIELD = REDIS_DATA_FIELD;
dataLine = getDataFromRedis().split("\n");// 從redis獲得資料
dataNum = dataLine.length;
}
/**
* 從redis獲得資料
*/
public String getDataFromRedis() {
String data = null;
try {
Jedis jedis = new Jedis(ip);// 建立Jedis對象:連接配接本地的Redis服務
data = jedis.get(key);// 存放資料 key value
jedis.close();// 關閉Redis服務
if (data == null)// 如果獲得的資料為空,則退出程式
Exit.exit(0);
} catch (Exception e) {
e.printStackTrace();
}
return data;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
if (offset < dataNum) {
collector.emit(new Values(dataLine[offset]));
offset++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(REDIS_DATA_FIELD));
}
}
7.8 C50SplitBolt.java
import java.util.ArrayList;
import java.util.Map;
import org.ansj.splitWord.analysis.ToAnalysis;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱spout發射的tuple流,實作分割單詞,并發往C60
*
* @author zhz
*/
public class C50SplitBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private String inputField;// 要讀取的tuple字段名
public C50SplitBolt(String inputField) {
this.inputField = inputField;
}
@Override
public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
/**
* 根據字元串分割出所有單詞并以ArrayList傳回
*/
public ArrayList<String> getWords(String context) {
ArrayList<String> list = new ArrayList<String>();
String result = ToAnalysis.parse(context).toStringWithOutNature();
String[] words = result.split(",");
for(String word: words){
String str = word.trim();//過濾空格
if (str.equals(""))// 過濾空白字元
continue;
else if(str.matches("\\p{Punct}"))// 過濾标點符号
continue;
list.add(word);
}
return list;
}
/**
* 單詞一個一個發往C60
*/
@Override
public void execute(Tuple input) {
String data = input.getStringByField(inputField);
for (String word : getWords(data)) {
collector.emit(new Values(word));//向下一個bolt發射資料
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
7.9 C51WordCountBolt.java
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱C40/C41發射的tuple流,實作字數統計
*
* @author zhz
*/
public class C51WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private String inputField;// 要讀取的tuple字段名
private long wordCount = 0L;// 字數
public C51WordCountBolt(String inputField) {
this.inputField = inputField;
}
@Override
public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 字數統計
*/
public long getWordsCount(String context) {
long wordCount = 0L;
Pattern pattern = Pattern.compile("[^(\\u4e00-\\u9fa5,。《》?;’‘:“”【】、)(……¥!·)]");
Matcher matcher = pattern.matcher(context);
while (matcher.find()) {// 統計中文字元
wordCount++;
}
pattern = Pattern.compile("[^(a-zA-Z0-9`\\-=\';.,/[email protected]#$%^&*()_+|}{\":><?\\[\\])]");
matcher = pattern.matcher(context);
while (matcher.find()) {// 統計非中文字元
wordCount++;
}
return wordCount;
}
@Override
public void execute(Tuple input) {
String dataLine = input.getStringByField(inputField);
wordCount += getWordsCount(dataLine);
collector.emit(new Values(wordCount));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordCount"));
}
}
7.10 C52RowCountBolt.java
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱C40/C50發射的tuple流,實作行數統計
*
* @author zhz
*/
public class C52RowCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private String inputField;// 要讀取的tuple字段名
private long rowCount = 0L;// 行數
public C52RowCountBolt(String inputField) {
this.inputField = inputField;
}
@Override
public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 統計行數并發往C70
*/
@Override
public void execute(Tuple input) {
String dataLine = input.getStringByField(inputField);
if (dataLine != null)
rowCount++;
collector.emit(new Values(rowCount));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("rowCount"));
}
}
7.11 C60WordFrequencyBolt.java
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 訂閱C50 bolt的輸出流,實作單詞計數,并發送目前計數給下一個C70 bolt
*
* @author zhz
*/
public class C60WordFrequencyBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String, Long> counts = new HashMap<String, Long>();// 存放統計的詞頻
@Override
public void prepare(Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 對每次發送過來的單詞進行統計并發往C70
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = counts.get(word);// 通過單詞作為key擷取個數
if (count == null) {
count = 0L;//如果不存在,初始化為0
}
count++;//增加計數
counts.put(word, count);//存儲計數
collector.emit(new Values(word,count));// 往C70發射
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
7.12 C70ReportBolt.java
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
/**
* 螢幕顯示統計結果儲存至redis
*
* @author zhz
*/
public class C70ReportBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private HashMap<String, Long> counts = new HashMap<String, Long>();// 儲存資料的單詞和對應的計數
private String redisKey = "my_count";// 存放統計結果的redis key值
private long wordCount;// 資料的字數
private long rowCount;// 資料的行數
private String WORD_COUNT_BOLT_ID;// 單詞統計boltID
private String FREQUENCY_COUNT_BOLT_ID;// 詞頻統計boltID
private String currentSpout;// 目前spout(kafka/redis)
private static int index = 0;
public C70ReportBolt(String WORD_COUNT_BOLT_ID, String FREQUENCY_COUNT_BOLT_ID) {
this.WORD_COUNT_BOLT_ID = WORD_COUNT_BOLT_ID;
this.FREQUENCY_COUNT_BOLT_ID = FREQUENCY_COUNT_BOLT_ID;
}
/**
* 獲得C51、C51、C60發射過來的資料
*/
@Override
public void execute(Tuple input) {
if (FREQUENCY_COUNT_BOLT_ID.equals(input.getSourceComponent())) {// 資料的詞頻統計
if (input.getSourceComponent().contains("KAFKA"))
currentSpout = "KAFKA";
else
currentSpout = "REDIS";
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
counts.put(word, count);
System.out.println(counts);// 實時輸出統計結果
} else if (WORD_COUNT_BOLT_ID.equals(input.getSourceComponent()))// 資料的字數統計
wordCount = input.getLongByField("wordCount");
else// 資料的行數統計
rowCount = input.getLongByField("rowCount");
}
/**
* Storm在終止一個bolt之前會調用這個方法,本程式在topology關閉時輸出最終的計數結果
*/
@Override
public void cleanup() {
index++;
Jedis jedis = new Jedis("localhost");
System.out.println("---------- " + currentSpout + " FINAL COUNTS -----------");// kafka資料源統計的結果
System.out.println("詞頻統計:");
if (index == 1)
jedis.set(redisKey, "---------- " + currentSpout + " FINAL COUNTS -----------\n");
else
jedis.append(redisKey, "---------- " + currentSpout + " FINAL COUNTS -----------\n");
jedis.append(redisKey, "詞頻統計:\n");
ArrayList<HashMap.Entry<String, Long>> list = new ArrayList<HashMap.Entry<String, Long>>(counts.entrySet());// 轉換為list
list.sort(new Comparator<HashMap.Entry<String, Long>>() {// 定義匿名内部類,覆寫compare方法,根據值進行排序
@Override
public int compare(HashMap.Entry<String, Long> o1, HashMap.Entry<String, Long> o2) {
return o2.getValue().compareTo(o1.getValue());
}
});
for (Map.Entry<String, Long> map : list) {// redis逐個儲存并輸出統計并排序後的結果
jedis.append(redisKey, map.getKey() + ": " + map.getValue() + "\n");
System.out.println(map.getKey() + ": " + map.getValue());
}
jedis.append(redisKey, "\n字數:" + wordCount);
jedis.append(redisKey, "\n行數:" + rowCount + "\n\n");
System.out.println("\n字數:" + wordCount);
System.out.println("行數:" + rowCount + "\n\n");
System.out.println("----------------------------");
jedis.append(redisKey, "\n----------------------------\n");
jedis.close();
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
8. 項目位址
https://github.com/zhz000/the-use-of-storm-kafka-redis
https://gitee.com/zhz000/the-use-of-storm-kafka-redis