天天看點

Storm源碼閱讀(二):用戶端

Clojure

Clojure指南http://java.ociweb.com/mark/clojure/article.html

Eclipse插件Counterclockwise

Eclipse提供了專門的Clojure 語言開發插件CounterClockwise,在源代碼編輯,代碼調試,REPL 支援方面也有獨到之處,适合于習慣于Eclipse的開發者使用。

http://code.google.com/p/counterclockwise/wiki/Documentation#Install_Counterclockwise

storm源碼主要目錄結構:

.

|-- bin

|   |-- storm 執行外殼storm

|   `-- to_maven.sh

|-- conf storm的配置

|   |-- defaults

|   `-- storm.yaml.example

|-- storm-core

|   |-- src

|   |  |-- clj     :clojure代碼

|   |  |-- dev     : python,ruby測試代碼

|   |  |-- jvm     : java代碼

|   |  |-- multilang : 提供了python和ruby的storm接口

|   |  |-- py    :

|   |  |-- ui        :公用js,css

Storm中使用Thrifit進行用戶端、伺服器端通訊。由genthrift.sh生成的檔案包括

jvm/backtype/storm/generated目錄

py 目錄

用戶端

運作一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然後運作storm腳本,格式類似如下:

        stormjar all-my-code.jar backtype.storm.MyTopology arg1 arg2

backtype.storm.MyTopology是你的主類,比如storm.starter.WordCountTopology, 參數是arg1,arg2。storm jar負責連接配接到Nimbus并且上傳jar包。

//storm腳本内部實作

"""Syntax: [storm jar topology-jar-path class ...]”””

def jar(jarfile, klass, *args):

    exec_storm_class(

        klass,

        jvmtype="-client",

        extrajars=[jarfile, USER_CONF_DIR, STORM_DIR +"/bin"],

        args=args,

        jvmopts=["-Dstorm.jar="+ jarfile])

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[],args=[], fork=False):

    global CONFFILE

    all_args = [

        "java", jvmtype, get_config_opts(),

        "-Dstorm.home=" +STORM_DIR,

        "-Djava.library.path=" + confvalue("java.library.path", extrajars),

       "-Dstorm.conf.file=" + CONFFILE,

        "-cp", get_classpath(extrajars),

    ] + jvmopts+ [klass] + list(args)

    print "Running: " +" ".join(all_args)

    if fork:

        os.spawnvp(os.P_WAIT,"java", all_args)

    else:

        os.execvp("java",all_args) # replaces the current process and never returns

也就是執行如下指令

Java –Dstorm.home=xxx–Djava.libary.path=xxx –Dstorm.conf.file=xxx –cp xxx jvmopts kclass args

在你送出的jar包中,主類一般都會最後調用StormSubmitter.submitTopology。比如strom-starter的WordCountTopology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", newRandomSentenceSpout(), 5);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

以這個Storm用戶端的入口,跟蹤源碼。

//https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology

//Starting atopology

//

//StormSubmiter.submitTopology

//讀取配置default.yaml,storm.yaml ("storm.conf.file")

Map conf = Utils.readStormConfig();

Conf.putAll(stormConf)

String serConf = JSONValue.toJSONString(stormConf);//json化

if(localNimbus!=null) {

    // local模式,調用localNimbus。

    // localNimbus對象LocalCluster.clj實作類, 實作ILocalCluster 接口

    localNimbus.submitTopology(name,null, serConf, topology);

} else {

    NimbusClientclient = NimbusClient.getConfiguredClient(conf);

    // 檢查ClusterInfo是否有同名topology

    …

    //上傳jar包,指令中指定的,被指派為ENV"storm.jar"

    submitJar(conf);

try {

        LOG.info("Submitting topology" +name + "in distributed mode”);

        if(opts!=null) {

          client.getClient().submitTopologyWithOpts(name,submittedJar, serConf,topology, opts);

        } else {

          // this is for backwards compatibility

          client.getClient().submitTopology(name, submittedJar, serConf, topology);                                           

        }

    } catch(InvalidTopologyExceptione) {

//NimbusClient.getConfiguredClient

    //NImbusClient繼承ThriftClient,根據配置的host,port建立Nimbus.Client

    public static NimbusClientgetConfiguredClient(Map conf) {

        try {

            StringnimbusHost = (String) conf.get(Config.NIMBUS_HOST);

            int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

            return new NimbusClient(conf,nimbusHost, nimbusPort);

    //NimbusClient構造函數

    publicNimbusClient(Map conf, String host, int port, Integer timeout) throws ex{

        super(conf, host,port, timeout);

        _client = new Nimbus.Client(_protocol);

    }

    //NimbusClient的父類ThriftClient:和Thrift的邊界

    //super(conf,host, port, timeout)

        //安全配置java.security.auth.login.config

            //constructa transport plugin

           ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf

, login_conf);

        //建立TSocket,

             TSocket socket = newTSocket(host, port);

           _transport = transportPlugin.connect(socket, host);

            _protocol = new TBinaryProtocol(_transport);

    //NimbusClient._client為Nimbus$Client對象,繼承TServiceClient

    //Nimbus$Client 繼承TServiceClient(包含送出待發送的topology)

    //TServiceClient隻定義了發送接收接口,sendBase,receiveBase

    //似乎兩邊通過methodName交流

    //sendBase/receiveBase都要提供methodName

    //Nimbus$Client所有提供的方法最後調用TServiceClient發送接收

    //比如:

//submitJar

    //獲得上傳路徑、上傳檔案塊,結束上傳

    NimbusClientclient = NimbusClient.getConfiguredClient(conf);

    try {

        String uploadLocation = client.getClient().beginFileUpload();

        BufferFileInputStream is = newBufferFileInputStream(localJar);

        while(true) {

            byte[] toSubmit =is.read();

            if(toSubmit.length==0) break;

            client.getClient().uploadChunk(uploadLocation,ByteBuffer.wrap(toSubmit));

        }

        client.getClient().finishFileUpload(uploadLocation);

    }

    public String beginFileUpload()throws org.apache.thrift7.TException

    {

     send_beginFileUpload();

      returnrecv_beginFileUpload();

    }

    public voidsend_beginFileUpload() throws org.apache.thrift7.TException {

      beginFileUpload_args args = newbeginFileUpload_args();

      sendBase("beginFileUpload", args);

    }

    protected voidsendBase(String methodName,TBase args) throws TException {

      oprot_.writeMessageBegin(new TMessage(methodName,TMessageType.CALL, ++seqid_));

      args.write(oprot_);

      oprot_.writeMessageEnd();

      oprot_.getTransport().flush();

    }

    protected void receiveBase(TBase result, String methodName) throws TException{

      TMessage msg = iprot_.readMessageBegin();

      if (msg.seqid != seqid_) {

        throw new TApplicationException(“xxx");

      }

      result.read(iprot_);

      iprot_.readMessageEnd();

    }

//接下來調用的submitTopology方法也是類似的

    //send_submitTopology

     submitTopology_args args = newsubmitTopology_args();

     args.set_name(name);

     args.set_uploadedJarLocation(uploadedJarLocation);

     args.set_jsonConf(jsonConf);

     args.set_topology(topology);

      sendBase("submitTopology", args);

看來用戶端和伺服器端通訊時,走的是通thrift,并且每個方法都有對應的methodName。為此,到backtype.storm.generated.Nimbus中求證一下。

Nimubs用戶端backtype.storm.generated.Nimbus由thrift産生,包括了兩個用戶端Client/AsyncClient,分别實作Iface/AsyncIface接口。這兩個接口定義的方法還分别有對應的參數、結果類。另外還有一個Processor接口實作類(在Nimubus伺服器端調用)

Iface/AsyncIface接口實作将調用send_xxx和recv_xxx。send_xxx和recv_xxx分别有對應的Client内部類xxx_args,xxx_result包裝,然後往下調用sendBase/receiveBase。接着就交給Thrift(TServiceClient)了。

我們可以找到Thrift提供的服務如下:

getClusterInfo :擷取Cluster的ClusterSummary,包括topology,supervisors資訊

beginFileUpload :開始上傳檔案

uploadChunk :上傳檔案塊

finishFileUpload :上傳檔案結束

submitTopologyWithOpts :送出topology

submitTopology : …

繼續閱讀