天天看点

消息总线重构之EventBusEventBus简介改进广播通知事件驱动替代责任链模式

最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景:

(1)改进广播通知

(2)业务逻辑串联,用事件驱动替代责任链模式

广播通知是消息总线提供的功能之一。在重构之前,客户端接收广播通知是通过消息总线客户端SDK的一个API来实现的:

但之前的广播通知设计并不合理。它受限于之前的基于RabbitMQ的树形路由拓扑模型:

消息总线重构之EventBusEventBus简介改进广播通知事件驱动替代责任链模式

这个拓扑结构中有些只发送不接受的“虚拟队列”并不是真实存在的队列。这些消息生产者无法接收消息,这是非常大的一个缺陷。我一直在想办法重新设计它,之前的关注点都集中在RabbitMQ上,想在MQ上找到一种解决方案,但这很难,除非摈弃“虚拟队列”的设计。于是,我将关注点转移到消息总线中另一个可以提供pub/sub的组件上(后称之为pubsuber),该组件目前可以是redis也可以是zookeeper。因为每个client(更准确得说是每个创建client的pool)都会以长连接的方式挂在pubsuber上。所以,它本身就是一个很不错的广播渠道,并且因为它脱离RabbitMQ单独实现,跟虚拟队列的设计不相冲突。

上面的思路没有问题,但语义与实现上并不对等。通知的收发从语义上来说应该是Client

API级别的。而PubSuber接收到的广播事件却是Pool级别的,并不依赖client(Pool创建PubSuber以及Client)。我们不应该在Pool层面上接收广播事件。因此这里存在一个事件的截获与二次转发的过程。这是我们针对EventBus的第一个应用场景:用它转发PubSuber接收到的广播通知给client。

PubSuber接收到广播消息之后通过EventBus 作二次转发:

事件发布完了之后,EventBus会将其分发到该事件的订阅者处理,这里需要注意的是创建的EventBus是一个异步EventBus的实例,它在一个独立的线程上执行事件处理器方法。而所有的事件处理器都需要通过Client进行注册:

以上这一步,就将消息通知跟Client关联起来。而且对多个client注册不同的事件处理器,还可以起到多播的作用(原来在Pool级别是一个事件,现在在Client级别,多个Client可以应对若干个处理器)。

EventBus通过注解来解析事件处理器与事件之间的关联关系,更多的实现细节,请参考之前的文章。下面就是订阅广播通知的方式:

仅仅需要一个注解即可。当然最后别忘记移除注册,如果你不再希望接收通知的话,整个过程如下:

这样,原先的拓扑结构就不再包含广播通知的实现了:

消息总线重构之EventBusEventBus简介改进广播通知事件驱动替代责任链模式

但基于同步事件驱动的方式似乎能起到跟责任链模式一样的效果。它通过事件分发来驱动业务逻辑调用。将chain的每一个调用都看做是一个事件处理方法,一个单向通信逻辑(比如produce)对应一个事件处理器(produceEventProcessor)。因为此处的EventBus是同步的(事件处理逻辑在调用线程上执行,执行顺序跟事件发生的顺序相同),所以只要编排好事件顺序,一一触发事件,事件处理器也就会一一按照事件触发的顺序执行。

我们以消息生产者来看一下通过EventBus改造后的业务逻辑是什么样子。

首先我们定义一个生产消息的事件处理器:

为了使得逻辑关系紧凑,我们将事件以内部类的方式定义在生产消息的事件处理器内部:

定义每个事件的事件处理方法:

在client被调用以生产消息时,首先创建该事件处理器的实例,然后向EventBus注册事件处理器:

只有注册了该实例,在发布事件时,才会触发该实例的事件处理方法。注册完成该实例之后,需要初始化事件对象,这里事件之间以及事件处理器之间没有必然联系,我们以一个消息上下文对象的引用来让它们以共享“内存”的方式进行数据交换:

准备工作就绪,现在开始发布事件。这里事件的发布顺序跟执行顺序是一致的,所以我们需要根据业务逻辑来编排事件,以形成原先的串联调用的效果:

这就是重构的整个过程。我们发现这里不再存在链式(递归)调用了,各个事件处理器方法之间也没有耦合性,它们通过MessageContext来共享上下文。如果我们要增加新的业务逻辑,如何扩展?四步走:

(1)定义一个新事件对象

(2)定义一个新的事件处理器方法

(3)实例化该事件对象

(4)根据需要插入原先的编排过的事件中去并发布该事件

跟原先的事件没有任何关系。

原文发布时间为:2015-06-30

本文作者:vinoYang