Akka 和 Storm 的设计差异
Akka 和 Storm 的设计差异
Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 不过它们并非完全的竞品,
如果说 Akka 是 linux 内核的话, storm 更像是类似 Ubuntu 的发行版.然而 Storm
并非 Akka 的发行版, 或许说 Akka 比作 BSD, Storm 比作 Ubuntu 更合适.
实现的功能差异
Akka 包括了一套 API 和执行引擎.
Storm 除了 API 和执行引擎之外,还包括了监控数据,WEB界面,集群管理,消息传递保障机制.
此文讨论 Akka 和 Storm 重合的部分,也就是 API 和 执行引擎的异同.
API 差异
我们看下 Storm 两个主要的 API
public interface ISpout extends Serializable { /** * Called when a task for this component is initialized within a worker on the cluster. * It provides the spout with the environment in which the spout executes. * * <p>This includes the:</p> * * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * Called when an ISpout is going to be shutdown. There is no guarentee that close * will be called, because the supervisor kill -9's worker processes on the cluster. * * <p>The one context where close is guaranteed to be called is a topology is * killed when running Storm in local mode.</p> */ void close(); /** * Called when a spout has been activated out of a deactivated mode. * nextTuple will be called on this spout soon. A spout can become activated * after having been deactivated when the topology is manipulated using the * `storm` client. */ void activate(); /** * Called when a spout has been deactivated. nextTuple will not be called while * a spout is deactivated. The spout may or may not be reactivated in the future. */ void deactivate(); /** * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. */ void nextTuple(); /** * Storm has determined that the tuple emitted by this spout with the msgId identifier * has been fully processed. Typically, an implementation of this method will take that * message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** * The tuple emitted by this spout with the msgId identifier has failed to be * fully processed. Typically, an implementation of this method will put that * message back on the queue to be replayed at a later time. */ void fail(Object msgId);}
以及
public interface IBasicBolt extends IComponent { void prepare(Map stormConf, TopologyContext context); /** * Process the input tuple and optionally emit new tuples based on the input tuple. * * All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); void cleanup();}
和 akka 中 actor 的 api
trait Actor { import Actor._ // to make type Receive known in subclasses without import type Receive = Actor.Receive /** * Stores the context for this actor, including self, and sender. * It is implicit to support operations such as `forward`. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! * * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a * [[akka.actor.UntypedActorContext]], which is the Java API of the actor * context. */ implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get if ((contextStack.isEmpty) || (contextStack.head eq null)) throw ActorInitializationException( s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.") val c = contextStack.head ActorCell.contextStack.set(null :: contextStack) c } /** * The 'self' field holds the ActorRef for this actor. * <p/> * Can be used to send messages to itself: * <pre> * self ! message * </pre> */ implicit final val self = context.self //MUST BE A VAL, TRUST ME /** * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, * else `deadLetters` in [[akka.actor.ActorSystem]]. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! */ final def sender(): ActorRef = context.sender() /** * This defines the initial actor behavior, it must return a partial function * with the actor logic. */ //#receive def receive: Actor.Receive //#receive /** * INTERNAL API. * * Can be overridden to intercept calls to this actor's current behavior. * * @param receive current behavior. * @param msg current message. */ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled) /** * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default. */ protected[akka] def aroundPreStart(): Unit = preStart() /** * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default. */ protected[akka] def aroundPostStop(): Unit = postStop() /** * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default. */ protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message) /** * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default. */ protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason) /** * User overridable definition the strategy to use for supervising * child actors. */ def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy /** * User overridable callback. * <p/> * Is called when an Actor is started. * Actors are automatically started asynchronously when created. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preStart(): Unit = () //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postStop(): Unit = () //#lifecycle-hooks /** * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' * @param reason the Throwable that caused the restart to happen * @param message optionally the current message the actor processed when failing, if applicable * <p/> * Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child ? context.unwatch(child) context.stop(child) } postStop() } //#lifecycle-hooks /** * User overridable callback: By default it calls `preStart()`. * @param reason the Throwable that caused the restart to happen * <p/> * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postRestart(reason: Throwable): Unit = { preStart() } //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called when a message isn't handled by the current behavior of the actor * by default it fails with either a [[akka.actor.DeathPactException]] (in * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]] * to the actor's system's [[akka.event.EventStream]] */ def unhandled(message: Any): Unit = { message match { case Terminated(dead) ? throw new DeathPactException(dead) case _ ? context.system.eventStream.publish(UnhandledMessage(message, sender(), self)) } }}
可以说 Storm 主要的 API 和 Actor 非常相像, 不过从时间线上看 Storm 和 Akka
都是从差不多的时间开始开发的,因此很有可能 Storm 是作者受了 Erlang 的 Actor 实现启发而写的.
从目前的状况看来, 很有可能作者想用 Clojure 语言写一个"朴素"的 Actor 实现, 然而这个"朴素"实现已经满足了 Storm 的设计目标, 所以作者也没有继续把 Storm 变成一个 Actor 在 clojure 上的完整实现.
那么,仅仅是从 API 上看的话 Spout/Bolt 和 Actor 的差异有哪些呢?
Storm API 比 Actor 多的地方
Storm 在 API 上比 Actor 多了 ack 和 fail 两个接口. 有这两个接口主要是因为 Storm 比 Akka 的应用场景更加细分(基本上只是用于统计), 所以已经做好了容错机制,能让在这个细分领域的用户达到开箱可用.
另外,在 Storm 的 Tuple 类中存储着一些 context 信息,也是出于目标使用场景的需求封装的.
Actor API 比 Storm 多的地方
context: Spout 的 open 方法里也有 context, 然而 context 在 actor 中是随时可以调用的,表明 Actor 比 Spout 更加鼓励用户使用 context, context 中的数据也会动态更新.
self: Actor对自身的引用,可以理解为 Actor 模型更加支持下游收到数据的组件往上游回发数据的行为,甚至自己对自己发数据也可以.在 Storm 中,我们默认数据发送是单向的,下游接收的组件不会对上游有反馈(除了系统定义的ack,和fail)
postRestart: 区分 Actor 的第一次启动和重启, 还是蛮有用的,Storm 没有应该是最初懒得写或者没想到,后来又不想改核心 API.
unhandled: 对没有预期到会发送给自身的消息做处理,默认是传到一个系统 stream,因为 Actor 本身是开放的,外部应用只要知道这个 Actor 的地址就能发消息给它.Storm 本身只接收你为它设计好的消息,所以没有这个需求.
运行时差异
Actor 和 Task 的比较, 线程调度模型的不同, 以及代码热部署,Storm 的 ack 机制对异步代码的限制等.
Actor 和 Component 的比较
Component 是 Spout 和 Bolt 的总称,是 Storm 中执行用户代码的基本组件. 共同点是都根据消息做出响应,也能够存储内容,一次只有一个线程进入,除非你手动另外开启线程.主要的区别在于 Actor 是非常轻量的组件,你可以在一个程序里创建几万个 Actor, 或者每十行代码都在一个 Actor 里, 这都没有问题. 然而换成 Storm 的Component, 情况都不一样了,你最好只用若干个 Component 来描述顶层抽象.
线程调度模型
API 很相似,为什么 Actor 可以随便开新的, Component 就要尽量少开呢? 秘密都在 Akka 的调度器(Dispatchers)里. Akka 程序的所有异步代码,包括 Actor,Future,Runnable 甚至ParIterable,可以说除了你要用主线程启动ActorSystem外,其他所有线程都可以交给Dispatcher管理.Dispatcher 可以自定义,默认的情况下采用了 "fork-join-executor",相对于一般的线程池,fork-join-executor 特别适合 Actor模型,可以提供相当优异的性能.
相比较的, Storm 的线程调度模型就要"朴素"很多,就是每个 Component 一个线程,或者若干个Component轮流共用一个线程,这也就是为什么Component不能开太多的原因.
代码热部署
实时计算方面,热部署的需求主要是诸如修改排序算法之类的,替换某个算法模块,其他东西不变.
因为 Storm 是可以通过 Thrift 支持任何语言编程的,所以你如果是用python之类的脚本语言写的算法,想要换掉算法而不重启,那只要把每台机器上相应位置的py文件替换掉就好了.不过这样就会让程序限定在用此类语言实现.
Akka 方面, 因为 Actor 模型对进程内和进程间的通信接口都是统一的, 可以负责算法的一类 Actor 作为单独的进程启动,代码更新了就重启这个进程. 虽然系统中有一个进程重启了,但是整个系统还是可以一刻不停地运转.
Storm 中的 Ack 机制
Storm 的消息保障机制是具有独创性的, 利用位亦或能够用非常小的内存,高性能地掌握数据处理过程中的成功或失败情况. 默认的情况下,在用户的代码中只需要指定一个MessageId, Ack 机制就能愉快地跑起来了. 所以通常用户不用关心这块内容, 但是默认接口的问题就是, 一旦使用了异步程序, ack 机制就会失效,包括 schedule 和 submit runnable 等行为,都不会被 Ack 机制关心,也就是说异步逻辑执行失败了,acker也不知道. 如何能让 Storm 的 Ack 机制与异步代码和谐相处,还是一个待探讨的问题.
总结
我认为 Storm 的 API 是优秀的, 可靠性也是在若干年的实践中得到证实的, 然而其核心运转机制过于朴素又给人一种烈士暮年的感觉. Storm 最初的使用者 Twitter 也在不久前公布了他们兼容 Storm 接口的新的解决方案 Heron, 不过并没有开源. 如果有开源方案能够基于 Akka "重新实现" 一个 Storm,那将是非常令人期待的事情. 我目前发现 gearpump是其中一个.
参考资料
Akka Dispatcher
[Akka and the Java Memory Model](http://doc.akka.io/docs/akka/snapshot/general/jmm.html
)
A Java Fork/Join Framework
[聊聊并发(三)——JAVA线程池的分析和使用
](http://www.infoq.com/cn/articles/java-threadPool)
Java Tip: When to use ForkJoinPool vs ExecutorService
Akka VS Storm
Guaranteeing Message Processing
Storm Source Code