Clojure
Clojure指南http://java.ociweb.com/mark/clojure/article.html
Eclipse插件Counterclockwise
Eclipse提供了專門的Clojure 語言開發插件CounterClockwise,在源代碼編輯,代碼調試,REPL 支援方面也有獨到之處,适合于習慣于Eclipse的開發者使用。
http://code.google.com/p/counterclockwise/wiki/Documentation#Install_Counterclockwise
storm源碼主要目錄結構:
.
|-- bin
| |-- storm 執行外殼storm
| `-- to_maven.sh
|-- conf storm的配置
| |-- defaults
| `-- storm.yaml.example
|-- storm-core
| |-- src
| | |-- clj :clojure代碼
| | |-- dev : python,ruby測試代碼
| | |-- jvm : java代碼
| | |-- multilang : 提供了python和ruby的storm接口
| | |-- py :
| | |-- ui :公用js,css
Storm中使用Thrifit進行用戶端、伺服器端通訊。由genthrift.sh生成的檔案包括
jvm/backtype/storm/generated目錄
py 目錄
用戶端
運作一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然後運作storm腳本,格式類似如下:
stormjar all-my-code.jar backtype.storm.MyTopology arg1 arg2
backtype.storm.MyTopology是你的主類,比如storm.starter.WordCountTopology, 參數是arg1,arg2。storm jar負責連接配接到Nimbus并且上傳jar包。
//storm腳本内部實作
"""Syntax: [storm jar topology-jar-path class ...]”””
def jar(jarfile, klass, *args):
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, USER_CONF_DIR, STORM_DIR +"/bin"],
args=args,
jvmopts=["-Dstorm.jar="+ jarfile])
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[],args=[], fork=False):
global CONFFILE
all_args = [
"java", jvmtype, get_config_opts(),
"-Dstorm.home=" +STORM_DIR,
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
"-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrajars),
] + jvmopts+ [klass] + list(args)
print "Running: " +" ".join(all_args)
if fork:
os.spawnvp(os.P_WAIT,"java", all_args)
else:
os.execvp("java",all_args) # replaces the current process and never returns
也就是執行如下指令
Java –Dstorm.home=xxx–Djava.libary.path=xxx –Dstorm.conf.file=xxx –cp xxx jvmopts kclass args
在你送出的jar包中,主類一般都會最後調用StormSubmitter.submitTopology。比如strom-starter的WordCountTopology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", newRandomSentenceSpout(), 5);
…
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
以這個Storm用戶端的入口,跟蹤源碼。
//https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology
//Starting atopology
//
//StormSubmiter.submitTopology
//讀取配置default.yaml,storm.yaml ("storm.conf.file")
Map conf = Utils.readStormConfig();
Conf.putAll(stormConf)
String serConf = JSONValue.toJSONString(stormConf);//json化
if(localNimbus!=null) {
// local模式,調用localNimbus。
// localNimbus對象LocalCluster.clj實作類, 實作ILocalCluster 接口
localNimbus.submitTopology(name,null, serConf, topology);
} else {
NimbusClientclient = NimbusClient.getConfiguredClient(conf);
// 檢查ClusterInfo是否有同名topology
…
//上傳jar包,指令中指定的,被指派為ENV"storm.jar"
submitJar(conf);
try {
LOG.info("Submitting topology" +name + "in distributed mode”);
if(opts!=null) {
client.getClient().submitTopologyWithOpts(name,submittedJar, serConf,topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
} catch(InvalidTopologyExceptione) {
…
//NimbusClient.getConfiguredClient
//NImbusClient繼承ThriftClient,根據配置的host,port建立Nimbus.Client
public static NimbusClientgetConfiguredClient(Map conf) {
try {
StringnimbusHost = (String) conf.get(Config.NIMBUS_HOST);
int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
return new NimbusClient(conf,nimbusHost, nimbusPort);
//NimbusClient構造函數
publicNimbusClient(Map conf, String host, int port, Integer timeout) throws ex{
super(conf, host,port, timeout);
_client = new Nimbus.Client(_protocol);
}
//NimbusClient的父類ThriftClient:和Thrift的邊界
//super(conf,host, port, timeout)
//安全配置java.security.auth.login.config
//constructa transport plugin
ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf
, login_conf);
//建立TSocket,
TSocket socket = newTSocket(host, port);
_transport = transportPlugin.connect(socket, host);
_protocol = new TBinaryProtocol(_transport);
//NimbusClient._client為Nimbus$Client對象,繼承TServiceClient
//Nimbus$Client 繼承TServiceClient(包含送出待發送的topology)
//TServiceClient隻定義了發送接收接口,sendBase,receiveBase
//似乎兩邊通過methodName交流
//sendBase/receiveBase都要提供methodName
//Nimbus$Client所有提供的方法最後調用TServiceClient發送接收
//比如:
//submitJar
//獲得上傳路徑、上傳檔案塊,結束上傳
NimbusClientclient = NimbusClient.getConfiguredClient(conf);
try {
String uploadLocation = client.getClient().beginFileUpload();
BufferFileInputStream is = newBufferFileInputStream(localJar);
while(true) {
byte[] toSubmit =is.read();
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation,ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
}
public String beginFileUpload()throws org.apache.thrift7.TException
{
send_beginFileUpload();
returnrecv_beginFileUpload();
}
public voidsend_beginFileUpload() throws org.apache.thrift7.TException {
beginFileUpload_args args = newbeginFileUpload_args();
sendBase("beginFileUpload", args);
}
protected voidsendBase(String methodName,TBase args) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName,TMessageType.CALL, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
protected void receiveBase(TBase result, String methodName) throws TException{
TMessage msg = iprot_.readMessageBegin();
if (msg.seqid != seqid_) {
throw new TApplicationException(“xxx");
}
result.read(iprot_);
iprot_.readMessageEnd();
}
//接下來調用的submitTopology方法也是類似的
//send_submitTopology
submitTopology_args args = newsubmitTopology_args();
args.set_name(name);
args.set_uploadedJarLocation(uploadedJarLocation);
args.set_jsonConf(jsonConf);
args.set_topology(topology);
sendBase("submitTopology", args);
看來用戶端和伺服器端通訊時,走的是通thrift,并且每個方法都有對應的methodName。為此,到backtype.storm.generated.Nimbus中求證一下。
Nimubs用戶端backtype.storm.generated.Nimbus由thrift産生,包括了兩個用戶端Client/AsyncClient,分别實作Iface/AsyncIface接口。這兩個接口定義的方法還分别有對應的參數、結果類。另外還有一個Processor接口實作類(在Nimubus伺服器端調用)
Iface/AsyncIface接口實作将調用send_xxx和recv_xxx。send_xxx和recv_xxx分别有對應的Client内部類xxx_args,xxx_result包裝,然後往下調用sendBase/receiveBase。接着就交給Thrift(TServiceClient)了。
我們可以找到Thrift提供的服務如下:
getClusterInfo :擷取Cluster的ClusterSummary,包括topology,supervisors資訊
beginFileUpload :開始上傳檔案
uploadChunk :上傳檔案塊
finishFileUpload :上傳檔案結束
submitTopologyWithOpts :送出topology
submitTopology : …
…