天天看點

Storm的資料處理程式設計單元:Bolt 學習整理

  Bolt是Topology中的資料處理的單元,也是Storm針對處理過程的程式設計單元。Topology中所有的處理都是在這些Bolt中完成的,程式設計人員可以實作自定義的處理過程,例如,過濾、函數、聚集、連接配接等計算。如果是複雜的計算過程,往往需要多個步驟和使用多個Bolt。

  Bolt可以将資料項發送至多個資料流(Stream)。程式設計人員首先可以使用OutputFieldsDeclarer類的declareStream()方法來聲明多個流,指定資料将要發送到的流,然後使用SpoutOutputCollector的emit方法将資料發送。

  當聲明了一個Bolt的輸入流後,可以從其他的元件中接收這些指定的流。當接收某個元件的所有流時,需要在程式中逐個聲明接收的過程。InputDeclarer對象預設接收來自某元件預設的流。

//從名稱為"1"的元件中接收預設的流。
declarer.shuffleGrouping("1")      

IBolt 和 IComponent接口

IBolt接口:

//在元件的任務初被初始化時,由叢集中的工作程序(worker)調用,prepare()用于執行個體化Bolt的已給運作時任務,被叢集中的某一個程序調用,提供Bolt運作的環境。
//sormConf對象維護Storm中針對該Bolt的配置資訊。(來自Topology);context對象是一個上下文對象,用于擷取該元件運作時任務的資訊。(例如Topology中該Bolt所有任務的位置,包括任務的id、元件id和輸入輸出資訊等)
//collector對象用于從該Bolt發送資料項。資料項可以在任意時刻發送,包括調用open()和close()方法。

void prepare(java.util.Map stormConf,TopologyContext context,OutputCollector collector)

//接收一個資料項并處理
//該方法用來接收一個資料項(Tuple),并可以将處理的結果作為新的資料項發送(emit),是Bolt需要實作的最重要的方法。
//參數imput是一個資料項對象,包含了衆多的中繼資料(metadata),包括它來自的元件、流、任務等。資料項中的值,可以通過Tuple類的getValue()方法獲得。

void execute(Tuple input)

//在IBolt将關閉時調用  
void cleanup()      

Tuple類的方法,這個類的對象作為execute()方法的輸入。(方法舉例: int size() ; int fieldIndex(java.lang.String field) ; ......)

方法衆多,可以整理分為以下五類:

1、擷取屬性的方法。 (size()、fieldIndex()和contains()三個方法)

2、擷取中繼資料的方法。(getMessageId()、getSourceComponent()、getSourceTask()、getSourceStreamId()和getSourceGlobalStreamid()方法)

其中MessageId是在資料項被建立時,通過一定的規則指派的。

3、根據域擷取值的方法。(getValue()和多個get具體資料類型的方法)

4、根據域的名稱擷取值的方法。(這一類包括getFields()、getValues()和select()方法)

5、擷取Tuple的值或域清單的方法。(getFields()、getValues()和select()方法)

分别擷取該資料項的所有域清單、值清單和值清單子集。

簡單的案例:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf,TopologyContext context,OutputCollector collector){
    this.collector = collector;
    }
    
    public void execute(Tuple tuple){
    String sentence = tuple.getString(0);
    for(String word : sentence.split(" "){
        collector.emit(new Values(word));
    }
}

public void cleanup(){
}

public void declareOutputFields(OutpuFieldsDeclarer declarer){
    declarer.declare(new Fields("word"));
    }
}      

這裡說下declareOutputFields()函數參數,聲明了輸出流的資料項的結構,也即Tuple的域。

結合上節給的Spout的示例,可以在Topology類的main函數中加入相關代碼,增加Bolt。

Topology builder builder = new TopologyBuilder();
Builder.SetSpout ("SentenceGenSpout ",new TestWord Spout(),1);
builder.setBolt("splitBoult",new SplitSentence(),2).shuffleGrouping("sentenceGenSpout");      
下一篇: HBASE解析

繼續閱讀