最近正好在做一个OEP + Coherence的实时数据流分析PoC。突然想到开源世界里还有Storm这么个东东,于是心血来潮想要研究一下。
开源东西好的一点就是源码易得,但是Src拿的到不代表看的懂。这里就把每日抽空理解到的内容记录一下,也便于日后自己复习。
本次源码分析的版本是最新的0.9.3。源码分析是从一个本地例子开始的,源码如下:
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}从这个例子可以看出Storm的主程序还是很简洁的,定义好topologybuilder以后,相关的Spout,Bolt定义好,在声明好拓扑结构,往LocalCluster里一扔就好了。
这里的LocalCluster是为了本地开发方便而使用的一个开发类,但是其代码调用过程和核心内容与实际生产中使用的类似,于是就先从它下手。
我们可以看到,该类中具体调用时是通过cluster.submitTopology来调用的,参数是 cluster的名字,参数配置cof以及我们创建好的topology。
这段代码运行时,会调用\storm-core\src\clj\backtype\storm\LocalCluster.clj。LocalCluster.clj是Clojure编写的代码,本人对Clojure完全不了解,为了阅读也是硬着头皮硬生生地查了半天。(话说讨厌开源就在这点,就不能用统一的语言来编写啊,非搞这么小众!)。
打开LocalCluster.clj我们能够看到开始两段代码:
(ns backtype.storm.LocalCluster
(:use [backtype.storm testing config])
(:import [java.util Map])
(:gen-class
:init init
:implements [backtype.storm.ILocalCluster]
:constructors {[] [] [java.util.Map] [] [String Long] []}
:state state))
(defn -init
([]
(let [ret (mk-local-storm-cluster
:daemon-conf
{TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
[[] ret]))
([^String zk-host ^Long zk-port]
(let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
STORM-ZOOKEEPER-SERVERS (list zk-host)
STORM-ZOOKEEPER-PORT zk-port})]
[[] ret]))
([^Map stateMap]
[[] stateMap]))
其中第一段有:use [backtype.storm testing config] 这样一句声明。这句声明就意味着该clj中会使用相同目录下testing.clj和config.clj这两个文件。
下一段代码 (:gen-class
:init init 就类似于Java中的构造类,一位着该代码运行时,需要先运行一下init方法,这个init方法的名字就叫"init"(太糙了。。)。
在第二段代码中,我们看到(defn -init 这个声明就是init方法了。在Init方法中会调用mk-local-storm-cluster 这个方法在testing.clj里定义,我们看看代码
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(zk/mk-inprocess-zookeeper zk-tmp))
daemon-conf (merge (read-storm-config)
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
ZMQ-LINGER-MILLIS 0
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
STORM-CLUSTER-MODE "local"}
(if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
daemon-conf)
nimbus-tmp (local-temp-path)
port-counter (mk-counter supervisor-slot-port-min)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
(if inimbus inimbus (nimbus/standalone-nimbus)))
context (mk-shared-context daemon-conf)
其中,关于配置信息,我们看到调用了read-storm-config这个方法在config.clj里定义,具体代码如下
(defn read-storm-config
[]
(let [conf (clojurify-structure (Utils/readStormConfig))]
(validate-configs-with-schemas conf)
conf))
这个定义意味着调用了java backtype.storm.utils.Utils.java的方法,我们具体看一下代码
public static Map readStormConfig() {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
if (confFile==null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
return ret;
}
这个方法就是真正去解析配置的入口,解析过程:先读取默认的defualts.yaml的配置,对于源码来说该文件是在conf目录下,对于release版本则是该文件打到了storm.jar内。
其次,再解析用户配置的storm.yaml中的配置项,如果strom.yaml中有配置项与默认配置文件的配置项有冲突,则会覆盖掉默认配置项。最后,取系统环境变量中设置的storm.options的值,这一般都是没有的,因此这步可以跳过。
注:storm的配置文件用到了yaml这种配置格式,可参考其官方http://www.yaml.org/
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。