天天看点

Storm InternalIntroductionStorm InternalsBest practice for tuning storm

Introduction

Storm is one of the most popular realtime stream processing system in the world. It has been adopted by many companies and also a critical component of our infrastructure. After reading its source code and successfully tuned it to be stable for more than two months, I decided to share my insight of its internal implementation as well as some suggestion about tuning storm. This article will not talk about basic concepts of storm, and if you’re not, please read the documentation of storm first.

Storm Internals

Architecture

Worker

Storm InternalIntroductionStorm InternalsBest practice for tuning storm

Worker is the most important component since it executes the jobs assigned to it by nimbus. Each worker has one receive queue and several threads dedicated to distrubting received message to executor receive queue, where the number of threads is configurable and 1 by default. For each executor assigned to it, it creates two threads: one for executing the bolt or spout defined by user, and the other for transferring messages for executor’s out queue to worker’s global out queue. This means that for each executor, there are two queues attached to it: an in queue and an out queue. Also there is a dedicated thread for sending messages in global out queue to other workers. If the target task of a message is in the same worker as the source task, it will be sent to the in queue of target task without serialization and deserialization. The deserialization and serialization of tuples happens in the user logic thread and sending thread of each executor correspondingly. This is a reasonable design since users just need to adjust the parallelism of bolts and spouts to erease the bottleneck of the system. Each executor also has a overflow buffter to ensuer that the emit method will never block, and the execute or nextTuple method will not be called untils all the messages in the overflow buffer have been sent out.

Nimbus and supervisor

Nimbus is the manager of the whole cluster. It scans the states of components of each topology from zookeeper and reassign jobs when it finds that some worker or supervisor is dead. I don’t think this is a good design since it reads too much and too frequently from zookeeper. Also supervisor and executor will write their heartbeats into zookeeper to tell nimbus that they are alive. This design is not optimal and I think that’s the main reason why Twitter suggests deploying a cluster for each topology.

Best practice for tuning storm

Aftering deploying storm in our production environment for more then two month, we’ve learned some lessons about tuning storm. The following are some rules we insist on:

  • There should be only one worker per machine per topology. There is no reason to start more than one worker for the same topology in one machine, which will increase the overhead of serialization, process swith, etc.
  • The number of cpu intensive executors should not exceed the number of cores in the cluster, while you can have serveral io intensive executors for each cpu core.
  • Monitor the state of storm. We use Metrics to measure the state of our system and export data into OpenTSDB, which will be visualized by grafana. From the data we can easily identify the bottleneck of our system and adjust its parallelism accordingly.
  • Tuning jvm to avoid full GC as much as possible. Since storm uses heartbeat to detect whether workers are still alive, when full GC happens and heartbeat is not written to zookeeper in time, storm assumes that worker is dead, and will it and reassigned jobs. It’s quite important to add “-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=65” to your jvm args. The setting tells CMS to start the parallel collection process when the old generation is about 65% full, which can avoid much full GC.

About auto tuning of topology.max.spout.pending

Twitter mentioned in their paper that they developed an algorithm to tune the argument “topology.max.spout.pending” automatically. This is an interesting idea but I don’t think it’s useful in practice. Storm is a realtime service, which means that the resource assigned to the cluster should be able to cover peak requests. When the number of requests is smaller then peak values, larger value of argument “topology.max.spout.pending” will not lead to anything wrong. This means that the auto tuning will not do any favor.

继续阅读