Storm的DRPC模式的作用是實作從遠端調用storm叢集的計算資源,而不需要連接配接到叢集的某一個節點。OK。那麼Storm實作DRPC主要是使用LinearDRPCTopologyBuilder這個類。下面就先來看看一個簡單的例子,它的源碼在github上。
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt { //主要需要覆寫execute方法和declareoutputfields方法
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");//實作DRPC模式
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
if (args == null || args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
for (String word : new String[]{ "hello", "goodbye" }) {
System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
}
cluster.shutdown();
drpc.shutdown();
}
else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
}
}
}
這段代碼主要實作的功能是給接收到的每一個輸入後面添加一個感歎号。ok,這樣就可以編譯送出了。
不過在這之前需要先配置storm叢集的drpc server的ip。如圖。主要是下面的server的ip需要配置好。并且叢集的每一個節點的配置檔案都需要配置這項參數!

然後即可使用storm drpc &指令啟動drpc模式。(這裡的分工是172.17.150.6為用戶端,其餘的172.17.150.7(.8,.11)為叢集的三個節點,.11是nimbus節點。)
OK,那接下來就使用用戶端向叢集送出Topology。如圖。使用用戶端向叢集送出名為exclaim的Topology。裡面設定的worker數為3。
從下圖可以看到兩個supervisor分别有一個是運作兩個worker,有一個是運作一個worker。
ok,下面是用戶端調用遠端資源進行計算的程式。主要是聲明DRPCClient的ip以及端口,以及指定執行的方法名和傳入的參數(client.execute("exclamation?imageView2/2/w/1620",word))。
運作結果如下。
OK,整個DRPC的過程就是這樣。
Storm程序通信機制分析 http://www.linuxidc.com/Linux/2014-12/110158.htm
Apache Storm 的曆史及經驗教訓 http://www.linuxidc.com/Linux/2014-10/108544.htm