天天看点

Storm入门 第二章准备开始

准备开始

在本章,我们要创建一个storm工程和我们的第一个storm拓扑结构。

操作模式

开始之前,有必要了解一下storm的操作模式。有下面两种方式。

本地模式

在本地模式下,storm拓扑结构运行在本地计算机的单一jvm进程上。这个模式用于开发、测试以及调试,因为这是观察所有组件如何协同工作的最简单方法。在这种模式下,我们可以调整参数,观察我们的拓扑结构如何在不同的storm配置环境下运行。要在本地模式下运行,我们要下载storm开发依赖,以便用来开发并测试我们的拓扑结构。我们创建了第一个storm工程以后,很快就会明白如何使用本地模式了。

note: 在本地模式下,跟在集群环境运行很像。不过很有必要确认一下所有组件都是线程安全的,因为当把它们部署到远程模式时它们可能会运行在不同的jvm进程甚至不同的物理机上,这个时候它们之间没有直接的通讯或共享内存。

我们要在本地模式运行本章的所有例子。

远程模式

在远程模式下,我们向storm集群提交拓扑,它通常由许多运行在不同机器上的流程组成。远程模式不会出现调试信息, 因此它也称作生产模式。不过在单一开发机上建立一个storm集群是一个好主意,可以在部署到生产环境之前,用来确认拓扑在集群环境下没有任何问题。

我们在这个工程里创建一个简单的拓扑,数单词数量。我们可以把这个看作storm的“hello world”。不过,这是一个非常强大的拓扑,因为它能够扩展到几乎无限大的规模,而且只需要做一些小修改,就能用它构建一个统计系统。举个例子,我们可以修改一下工程用来找出twitter上的热点话题。

要创建这个拓扑,我们要用一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数,如图2-1所示。

Storm入门 第二章准备开始

构建storm运行环境的第一步是检查你安装的java版本。打开一个控制台窗口并执行命令:java -version。控制台应该会显示出类似如下的内容:

开始之前,先为这个应用建一个目录(就像你平常为java应用做的那样)。这个目录用来存放工程源码。

接下来我们要下载storm依赖包,这是一些jar包,我们要把它们添加到应用类路径中。你可以采用如下两种方式之一完成这一步:

下载所有依赖,解压缩它们,把它 们添加到类路径

note: storm的maven依赖引用了运行storm本地模式的所有库。

要运行我们的拓扑,我们可以编写一个包含基本组件的pom.xml文件。

开头几行指定了工程名称和版本号。然后我们添加了一个编译器插件,告知maven我们的代码要用java1.6编译。接下来我们定义了maven仓库(maven支持为同一个工程指定多个仓库)。clojars是存放storm依赖的仓库。maven会为运行本地模式自动下载必要的所有子包依赖。

一个典型的maven java工程会拥有如下结构:

java目录下的子目录包含我们的代码,我们把要统计单词数的文件保存在resource目录下。

note:命令mkdir -p 会创建所有需要的父目录。

我们将为运行单词计数创建所有必要的类。可能这个例子中的某些部分,现在无法讲的很清楚,不过我们会在随后的章节做进一步的讲解。

note: 一个spout发布一个定义域列表。这个架构允许你使用不同的bolts从同一个spout流读取数据,它们的输出也可作为其它bolts的定义域,以此类推。

例2-1包含wordread类的完整代码(我们将会分析下述代码的每一部分)。

第一个被调用的spout方法都是public void open(map conf, topologycontext context, spoutoutputcollector collector)。它接收如下参数:配置对象,在定义topology对象是创建;topologycontext对象,包含所有拓扑数据;还有spoutoutputcollector对象,它能让我们发布交给bolts处理的数据。下面的代码主是这个方法的实现。

我们在这个方法里创建了一个filereader对象,用来读取文件。接下来我们要实现public void nexttuple(),我们要通过它向bolts发布待处理的数据。在这个例子里,这个方法要读取文件并逐行发布数据。

note: values是一个arrarlist实现,它的元素就是传入构造器的参数。

nexttuple()会在同一个循环内被ack()和fail()周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。因此nexttuple的第一行就要检查是否已处理完成。如果完成了,为了降低处理器负载,会在返回前休眠一毫秒。如果任务完成了,文件中的每一行都已被读出并分发了。

note:元组(tuple)是一个具名值列表,它可以是任意java对象(只要它是可序列化的)。默认情况,storm会序列化字符串、字节数组、arraylist、hashmap和hashset等类型。

现在我们有了一个spout,用来按行读取文件并每行发布一个元组,还要创建两个bolts,用来处理它们(看图2-1)。bolts实现了接口backtype.storm.topology.irichbolt。

bolt最重要的方法是void execute(tuple input),每次接收到元组时都会被调用一次,还会再发布若干个元组。

第一个bolt,wordnormalizer,负责得到并标准化每行文本。它把文本行切分成单词,大写转化成小写,去掉头尾空白符。

首先我们要声明bolt的出参:

这里我们声明bolt将发布一个名为“word”的域。

下一步我们实现public void execute(tuple input),处理传入的元组:

第一行从元组读取值。值可以按位置或名称读取。接下来值被处理并用collector对象发布。最后,每次都调用collector对象的ack()方法确认已成功处理了一个元组。

例2-2是这个类的完整代码。

note:通过这个例子,我们了解了在一次execute调用中发布多个元组。如果这个方法在一次调用中接收到句子“this is the storm book”,它将会发布五个元组。

下一个bolt,wordcounter,负责为单词计数。这个拓扑结束时(cleanup()方法被调用时),我们将显示每个单词的数量。

note: 这个例子的bolt什么也没发布,它把数据保存在map里,但是在真实的场景中可以把数据保存到数据库。

execute方法使用一个map收集单词并计数。拓扑结束时,将调用clearup()方法打印计数器map。(虽然这只是一个例子,但是通常情况下,当拓扑关闭时,你应当使用cleanup()方法关闭活动的连接和其它资源。)

note:所有拓扑节点的各个进程必须能够独立运行,而不依赖共享数据(也就是没有全局变量或类变量),因为当拓扑运行在真实的集群环境时,这些进程可能会运行在不同的机器上。

接下来,topologybuilder将用来创建拓扑,它决定storm如何安排各节点,以及它们交换数据的方式。

在spout和bolts之间通过shufflegrouping方法连接。这种分组方式决定了storm会以随机分配方式从源节点向目标节点发送消息。

下一步,创建一个包含拓扑配置的config对象,它会在运行时与集群配置合并,并通过prepare方法发送给所有节点。

由spout读取的文件的文件名,赋值给wordfile属性。由于是在开发阶段,设置debug属性为true,strom会打印节点间交换的所有消息,以及其它有助于理解拓扑运行方式的调试数据。

正如之前讲过的,你要用一个localcluster对象运行这个拓扑。在生产环境中,拓扑会持续运行,不过对于这个例子而言,你只要运行它几秒钟就能看到结果。

调用createtopology和submittopology,运行拓扑,休眠两秒钟(拓扑在另外的线程运行),然后关闭集群。

例2-3是完整的代码

<a href="https://github.com/runfriends/gettingstartedwithstorm-cn/blob/master/chapter2/hello%20world%20storm.md#%e8%a7%82%e5%af%9f%e8%bf%90%e8%a1%8c%e6%83%85%e5%86%b5">观察运行情况</a>

你已经为运行你的第一个拓扑准备好了。在这个目录下面创建一个文件,/src/main/resources/words.txt,一个单词一行,然后用下面的命令运行这个拓扑:mvn exec:java -dexec.mainclass=”topologymain” -dexec.args=”src/main/resources/words.txt。

举个例子,如果你的words.txt文件有如下内容: storm test are great is an storm simple application but very powerful really storm is great 你应该会在日志中看到类似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在这个例子中,每类节点只有一个实例。但是如果你有一个非常大的日志文件呢?你能够很轻松的改变系统中的节点数量实现并行工作。这个时候,你就要创建两个wordcounter实例。

程序返回时,你将看到: — 单词数 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 storm: 3 — 单词数 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒极了!修改并行度实在是太容易了(当然对于实际情况来说,每个实例都会运行在单独的机器上)。不过似乎有一个问题:单词is和great分别在每个wordcounter各计数一次。怎么会这样?当你调用shufflegrouping时,就决定了storm会以随机分配的方式向你的bolt实例发送消息。在这个例子中,理想的做法是相同的单词问题发送给同一个wordcounter实例。你把shufflegrouping(“word-normalizer”)换成fieldsgrouping(“word-normalizer”, new fields(“word”))就能达到目的。试一试,重新运行程序,确认结果。 你将在后续章节学习更多分组方式和消息流类型。

<a href="https://github.com/runfriends/gettingstartedwithstorm-cn/blob/master/chapter2/hello%20world%20storm.md#%e7%bb%93%e8%ae%ba">结论 </a>

我们已经讨论了storm的本地和远程操作模式之间的不同,以及storm的强大和易于开发的特性。你也学习了一些storm的基本概念,我们将在后续章节深入讲解它们。