你将從本章學習到:如何在一個 Storm topology 的不同元件間傳遞 tuple,以及如何将 topology 部署到一個 Storm 叢集。
3.1 流分組
當設計一個 topology 的時候,你需要做的最重要的一件事是定義如何在不同元件間交換資料,換言之就是 bolt 間如何消費資料流。流分組明确了哪些流被每個 bolt 消費以及被如何消費。
一個節點可以發射不止一個資料流。流分組允許我們選擇接收哪個流。
正如第二章所示,當 topology 定義時設定流分組:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
| ...
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
...
|
上面的代碼塊展示的是,在 topology 建構中設定一個 bolt,使用随機分組分發資料。一個流分組通常将源元件ID作為一個參數,以及另一個可選的取決于流分組類型的參數。
也許在 InputDeclarer
中有超過一個資料源,每個源可以使用不同的流分組方式。
3.1.1 随機分組
随機分組(Shuffle Grouping)是使用最普遍的分組方式。它隻需一個參數(源元件),随機派發資料源裡面的 tuple,保證每個 bolt 接收到的 tuple 數目大緻相同。
随機分組在進行比如數學計算等原子操作中非常有用。然而,如果操作符不能被随機分發,比如第二章中需要統計單詞頻率的例子,你需要考慮使用其他分組方式。
3.1.2 按字段分組
按字段分組(Field Grouping)允許你按照一個或多個 tuple 中的字段來控制 tuple 如何被發送到 bolt。它保證同樣的數值集合或字段組合被發送到同一個 bolt。回到單詞統計的例子,如果你按照
word
字段分組,那麼
word-normalizer
bolt 将發送 指定單詞的tuple 到相同的
word-counter
bolt 執行個體。
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
| ...
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
...
|
按字段分組中的所有字段必須存在源字段聲明中。
3.1.3 廣播分組
廣播分組(All Grouping)給所有用來接收的 bolt 發送每一個tuple的拷貝。這種分組用來向 bolt 發送信号。比如,如果需要重新整理緩存,你可以發送一個
refresh cache
信号給所有的 bolt。在單詞統計的例子中,你可以使用廣播分組來增加清楚統計緩存的功能。
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
<span class="line-number" style="margin:0px; padding:0px">8</span>
<span class="line-number" style="margin:0px; padding:0px">9</span>
<span class="line-number" style="margin:0px; padding:0px">10</span>
<span class="line-number" style="margin:0px; padding:0px">11</span>
<span class="line-number" style="margin:0px; padding:0px">12</span>
<span class="line-number" style="margin:0px; padding:0px">13</span>
| public void execute(Tuple input) {
String str = null;
try{
if(input.getSourceStreamId().equals("signals")){
str = input.getStringByField("action");
if("refreshCache".equals(str))
counters.clear();
}
}catch (IllegalArgumentException e) {
//Do nothing
}
...
}
|
添加一個
if
檢查源資料流。Storm具有命名流的能力(如果你不發送tuple到一個已命名的流,流是
default
);這是個極好的辨別源 tuple 的途徑,比如我們想用來辨別信号。
在 topology 定義中,添加另一個流到
word-counter
bolt 中,這樣使得
signals-spout
能夠發送 tuple 到前者的每一個 bolt 執行個體。
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
| builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"))
.allGrouping("signals-spout","signals");
|
signals-spout
的實作代碼可以從Github中找到。
3.1.4 自定義分組
你可以建立自己定義的分組方式,隻需實作
back type.storm.grouping.CustomStreamGrouping
接口。這給了你決定哪個 bolt 接收某個 tuple 的能力。
讓我們修改單詞統計的例子,将 tuple 按照單詞的首個字母相同的方式進行分組,這樣首字母相同的單詞将被同一個 bolt 接收。
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
<span class="line-number" style="margin:0px; padding:0px">8</span>
<span class="line-number" style="margin:0px; padding:0px">9</span>
<span class="line-number" style="margin:0px; padding:0px">10</span>
<span class="line-number" style="margin:0px; padding:0px">11</span>
<span class="line-number" style="margin:0px; padding:0px">12</span>
<span class="line-number" style="margin:0px; padding:0px">13</span>
<span class="line-number" style="margin:0px; padding:0px">14</span>
<span class="line-number" style="margin:0px; padding:0px">15</span>
<span class="line-number" style="margin:0px; padding:0px">16</span>
<span class="line-number" style="margin:0px; padding:0px">17</span>
<span class="line-number" style="margin:0px; padding:0px">18</span>
<span class="line-number" style="margin:0px; padding:0px">19</span>
<span class="line-number" style="margin:0px; padding:0px">20</span>
| public class ModuleGrouping implements CustomStreamGrouping, Serializable{
int numTasks = 0;
@Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> boltIds = new ArrayList();
if(values.size()>0){
String str = values.get(0).toString();
if(str.isEmpty())
boltIds.add(0);
}else{
boltIds.add(str.charAt(0) % numTasks);
}
return boltIds;
}
@Override
public void prepare(TopologyContext context, Fields outFields,
List<Integer> targetTasks) {
numTasks = targetTasks.size();
}
}
|
上述是
CustomStreamGrouping
的一個簡單實作。我們用單詞首字母的數值與任務節點數目來取模,來決定哪個 bolt 接收 tuple。
為了在示例中使用該分組,隻需改變一下
word-normalizer
的分組方式:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
| builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());
|
3.1.5 直接分組
直接分組(Direct Grouping)是一種特殊的分組方式,用這種分組意味着消息的發送者決定哪個元件用來接收tuple。如前所示,我們使用直接分組來實作根據單詞首字母來決定接收者是誰。為了使用直接分組,在
WordNormalizer
bolt 中使用
emitDirect
方法來代替
emit
。
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
<span class="line-number" style="margin:0px; padding:0px">8</span>
<span class="line-number" style="margin:0px; padding:0px">9</span>
<span class="line-number" style="margin:0px; padding:0px">10</span>
<span class="line-number" style="margin:0px; padding:0px">11</span>
<span class="line-number" style="margin:0px; padding:0px">12</span>
<span class="line-number" style="margin:0px; padding:0px">13</span>
<span class="line-number" style="margin:0px; padding:0px">14</span>
<span class="line-number" style="margin:0px; padding:0px">15</span>
<span class="line-number" style="margin:0px; padding:0px">16</span>
<span class="line-number" style="margin:0px; padding:0px">17</span>
<span class="line-number" style="margin:0px; padding:0px">18</span>
<span class="line-number" style="margin:0px; padding:0px">19</span>
| public void execute(Tuple input) {
...
for(String word : words){
if(!word.isEmpty()){
...
collector.emitDirect(getWordCountIndex(word),new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public Integer getWordCountIndex(String word) {
word = word.trim().toUpperCase();
if(word.isEmpty())
return 0;
else
return word.charAt(0) % numCounterTasks;
}
|
在
prepare
方法中擷取目标任務的數目:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
| public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
this.collector = collector;
this.numCounterTasks = context.getComponentTasks("word-counter");
}
|
然後,在 topology 定義中明确資料流将被直接分組:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
| builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");
|
3.1.6 全局分組
全局分組(Global Grouping)将所有源執行個體産生的 tuple 發送到一個目标執行個體中(更具體點,配置設定給ID值最低的那個任務)。
3.1.7 不分組
在寫作本書的時候(Storm 版本是 0.7.1),不分組(None Grouping)與使用随機分組具有一樣的效果。換句話說,使用這種分組方式,不用關心資料流是如何分組的。
譯者注:有一點不同的是 Storm 會把這個 bolt 放到這個 bolt 的訂閱者同一個線程裡面去執行。
3.2 LocalCluster vs. StormSubmitter
到目前為止,我們使用
LocalCluster
工具類在本機運作 topology。在本機運作 Storm 元件,允許你可以很容易地運作和調試不同的 topology。但是,如果你想将 topology 送出到一個運作中的 Storm 叢集呢? Storm 一個有趣的特性就是可以很容易地發送 topology 到真實的叢集中運作。你需要将
LocalCluster
改成
StormSubmitter
,并實作
submitTopology
方法來負責發送 topology 到叢集中。
如下是改變的代碼:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
| /LocalCluster cluster = new LocalCluster();
//cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf,
builder.createTopology());
StormSubmitter.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf,
builder.createTopology());
//Thread.sleep(1000);
//cluster.shutdown();
|
當你使用 StormSubmitter
,你無法像在 LocalCluster
中一樣通過代碼控制叢集。
接下來,打包代碼到Jar包中,它将在運作 Storm 用戶端指令時送出 topology。由于使用了Maven,你所需要做的僅僅是去源碼目錄下運作如下指令:
<span class="line-number" style="margin:0px; padding:0px">1</span>
| mvn package
|
一旦生成了Jar包,使用
storm jar
指令送出 topology。文法是:
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3
。
在本例中,在源碼的工程目錄下運作:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
| storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/
resources/words.txt
|
使用上述指令,你已經将 topology 釋出到叢集中了。
停止或殺掉它,使用:
<span class="line-number" style="margin:0px; padding:0px">1</span>
| storm kill Count-Word-Topology-With-Refresh-Cache
|
topology的名字必須是唯一的。
3.3 DRPC Topologies
有一個特别的 topology,稱為 分布式遠端方法調用(DRPC),是Storm 分布式執行的遠端方法調用(RPC)[如圖3.1所示]。Storm 提供了一些工具來使用 DRPC。第一個是 DRCP 伺服器,它在用戶端和 Strom topology 之間建立連接配接,當作 topology spout 的源。它接收執行的功能和參數。在功能運作的每一個資料片上,配置設定一個請求 ID 在topology結構中來辨別 RPC 請求。當 topology 執行到最後一個 bolt,它發射 RPC 的請求 ID 和關聯的結構,這樣幫助DRPC伺服器将結果傳回給正确的用戶端。
【Storm入門指南】第三章 Topologies 3.1 流分組 3.2 LocalCluster vs. StormSubmitter 3.3 DRPC Topologies 一台DRPC伺服器能執行很多功能。每個功能用唯一的名字來辨別。
Storm 提供的第二個工具是
LinearDRPCTopologyBuilder
,它是一個用來幫助建立 DRPC topology 的抽象類。生成的 topology 建立
DRPCSpouts
——用來連接配接到DRPC伺服器并發射資料到 topology 中的其他部分,包裝最後一個 bolt 傳回的結果。所有 bolt 被添加到
LinearDRPCTopologyBuilder
依次序執行。
建立一個加法處理過程來展示這種類型的 topology 的用法。這是一個簡單的例子,但是涉及到的概念可被擴充來完成複雜的分布式數學操作。
bolt 代碼中的輸出聲明如下:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
| public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","result"));
}
|
由于 topology 中僅有一個 bolt,它必須 發射 RPC ID 和結果。
execute
方法負責執行加法操作:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
<span class="line-number" style="margin:0px; padding:0px">8</span>
<span class="line-number" style="margin:0px; padding:0px">9</span>
<span class="line-number" style="margin:0px; padding:0px">10</span>
<span class="line-number" style="margin:0px; padding:0px">11</span>
| public void execute(Tuple input) {
String[] numbers = input.getString(1).split("\\+");
Integer added = 0;
if(numbers.length<2){
throw new InvalidParameterException("Should be at least 2 numbers");
}
for(String num : numbers){
added += Integer.parseInt(num);
}
collector.emit(new Values(input.getValue(0),added));
}
|
将 added bolt 加入 topology 的定義中:
<span class="line-number" style="margin:0px; padding:0px">1</span>
<span class="line-number" style="margin:0px; padding:0px">2</span>
<span class="line-number" style="margin:0px; padding:0px">3</span>
<span class="line-number" style="margin:0px; padding:0px">4</span>
<span class="line-number" style="margin:0px; padding:0px">5</span>
<span class="line-number" style="margin:0px; padding:0px">6</span>
<span class="line-number" style="margin:0px; padding:0px">7</span>
<span class="line-number" style="margin:0px; padding:0px">8</span>
<span class="line-number" style="margin:0px; padding:0px">9</span>
<span class="line-number" style="margin:0px; padding:0px">10</span>
<span class="line-number" style="margin:0px; padding:0px">11</span>
<span class="line-number" style="margin:0px; padding:0px">12</span>
<span class="line-number" style="margin:0px; padding:0px">13</span>
<span class="line-number" style="margin:0px; padding:0px">14</span>
<span class="line-number" style="margin:0px; padding:0px">15</span>
| public static void main(String[] args) {
LocalDRPC drpc = new LocalDRPC();
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
builder.addBolt(new AdderBolt(),2);
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-adder-topology", conf, builder.createLocalTopology(drpc));
String result = drpc.execute("add", "1+-1");
checkResult(result,0);
result = drpc.execute("add", "1+1+5+10");
checkResult(result,17);
cluster.shutdown();
drpc.shutdown();
}
|
建立一個
LocalDRPC
對象在本地運作DRPC伺服器。接下來,建立一個 topology builder 然後将 bolt 添加到這個 topology。執行DRPC對象上的
execute
方法來測試這個 topology。
使用 DRPCClient
類連接配接到一個遠端DRPC伺服器。DRPC伺服器暴露了 Thrift API,可采用不同語言調用。本地或遠端運作DRPC伺服器是同樣的API。 将 createRemoteTopology
方法替代 createLocalTopology
,使用 Storm 配置中的 DRPC 配置資訊來送出 topology 到 Storm 叢集中。
原始位址: http://JavanLu.github.io/blog/2013/10/15/getting-started-with-storm-chapter-3/
written by JavanLu posted at http://JavanLu.github.io