IceStorm 是一个高效的publish/subscribe服务。程序需要分发给多个信息的情况很多。比如一个天气监控程序,它从气象塔收集风速和温度情况然后定期的把他们分发到天气监控站,架构如下图
但是,这种架构的缺点就是,collector和monitor耦合在一起。这种耦合使得collector的实现变得复杂,因为他需要处理monitor的注册,投递,和错误恢复等等。
有了Ice我们可以摆脱这些,新架构如下图
通过解耦,IceStorm简化了collector的实现。它作为collctor(publisher)和monitor(subscriber)的中介者,提供了几个优点:
1当collector准备提交一批新的数据时,他只需要发送一条请求给IceStorm server。server会负责将请求分发给monitor,包括处理其中产生的各种异常。collector不用再感知到monitror,甚至不用知道当时是否有monitor存在。
2 同样,monitor也不再需要处理订阅,取消订阅之类的事,它可以专注与程序业务本身。
3 为了使用IceStorm,monitor和collector需要改动的代码很少。
IceStorm Concepts
下面介绍几个概念对于理解IceStorm的能力非常重要。
Message
IceStorm message是强类型的,他代表了Slice操作。操作的名字确定message的类型,操作的参数确定message的内容。message一般被用来发布,subscriber回调函数接受message.IceStorm使用push模型来进行message的投递,不支持轮询。
IceStorm Topics
应用程序订阅他感兴趣的topic。一个server支持任意数量的topic,这些topic被动态的创建,并以唯一的名字来进行区分。每个topic可能有若干个publisher和subscriber。
topic本质就等于程序定义的Slice接口:接口的操作定义了消息支持的topic。publisher使用topic接口的代理来发送message,subscriber通过实现这个topic接口来接受message。
Unidirectional Messages
Unidirectional的意思就是,他们必须返回void,不能有输出参数,不能产生用户异常。subscriber不会回复publisher。
Federation
IceStorm 支持topic graphs,也被称为federation。topic graphs是由topic连接在一起形成的,这些link都是undirectional。
如下图所示。
IceStorm不能防止subscriber接受到重复的信息。加入一个subscriber订阅了T2和T3,那么在T上发布一条message,这个subscriber将接收到2个请求。
Quality of Service
IceStorm允许每个subscriber有自己的QOS参数,用名值对来代表。
Replication
支持同步
Persistent Mode
topic,links和subscriber的信息会被维护在一个数据库中。但是,message不会被存储。当一个message被投递到topic后,他马上就会被丢弃。
Transient Mode
IceStorm可以不使用数据库。这种模式下不支持同步。
Subscriber Errors
如果在投递message过程中发生错误,那么subscriber会立即停止订阅这个发生错误的topic。
Configuring IceStorm
IceStorm是基于IceBox服务实现的一个轻量服务,使用时只需要很少的配置。IceStorm支持的配置属性在IceStorm Properties中描述。这些配置属性的前缀都是IceStorm的IceBox服务名。比如,当IceStorm被配置成一个IceBox服务,服务名是DemoIceStorm的时候。属性service.TopicManager.Endpoints就变成了 DemoIceStorm.TopicManager.Endpoints。
创建config.icebox文件。
IceBox.ServiceManager.Endpoints=tcp -h localhost -p 9998
IceBox.Service.IceStormDemo=IceStormService,35:createIceStorm --Ice.Config=config.service
创建config.service文件
IceStormDemo.InstanceName=WeatherService
IceStormDemo.TopicManager.Endpoints=tcp -p 9999
IceStormDemo.Publish.Endpoints=tcp -p 10000
Freeze.DbEnv.IceStormDemo.DbHome=db
之后在命令行中执行
icebox --Ice.Config=config.icebox
IceStorm服务就启动了。
Using IceStorm
现在我们用之前的天气监控来说明如何订阅发布消息。
1.Slice定义
//Monitor.ice
module Demo {
struct Measurement {
string tower;
float windSpeed;
short windDirection;
float temperature;
};
interface Monitor{
void report(Measurement m);
};
};
};
monitor是topic接口。为了简单起见,只定义了一个操作report。
2.实现发布者
一个天气数据collector的程序可以简单概括为
a.取得TopicManager的代理。这是IceStorm最主要的对象,publisher和subscriber都会用到。
b.取得天气topic的代理,通过创建topic或者从已有的topic中获得。
c.取得天气topic的publisher object的代理。这个代理用来发布message。
d.收集报道天气信息。
下边是C++的实现
// IceStorm_Publisher.cpp : 定义控制台应用程序的入口点。
//
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>
using namespace Demo;
//getMeasurement 取得天气信息。
Measurement getMeasurement()
{
Measurement m;
m.temperature=10.1f;
m.tower="气象塔";
m.windDirection=1;
m.windSpeed=10;
return m;
}
int main(int argc, char* argv[])
{
Ice::CommunicatorPtr communicator = Ice::initialize(argc, argv);
//注意这里要用config.service中定义的instance名。
Ice::ObjectPrx obj = communicator->stringToProxy("WeatherService/TopicManager:default -h localhost -p 9999");
IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(obj);
IceStorm::TopicPrx topic;
while (!topic) {
try {
topic = topicManager->retrieve("Weather");
} catch (const IceStorm::NoSuchTopic&) {
try {
topic = topicManager->create("Weather");
} catch (const IceStorm::TopicExists&) {
// Another client created the topic.
}
}
}
//取得publisher对象的代理
Ice::ObjectPrx pub = topic->getPublisher()->ice_oneway();
MonitorPrx monitor = MonitorPrx::uncheckedCast(pub);
while (true) {
Measurement m = getMeasurement();
monitor->report(m);
Sleep(2000);
}
}
3.实现订阅者
天气信息订阅者的实现需要以下步骤
a.取得TopicManager的代理。这是IceStorm最主要的对象,publisher和subscriber都会用到。
b.创建一个adapter作为Monitor Servant的宿主。
c.初始化Monitor Servant,并把它和adapter一起激活。
d.订阅topic。
e.处理report message直到关闭。
f.取消topic订阅。
下边是C++的实现。
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Monitor.h>
using namespace Demo;
using namespace std;
//Monitor servant 这里的实现很简单,实际中可能包括实时计算天气信息等功能。
class MonitorI : virtual public Monitor {
public:
virtual void report(const Measurement& m, const Ice::Current&) {
cout << "Measurement report:" << endl
<< " Tower: " << m.tower << endl
<< " W Spd: " << m.windSpeed << endl
<< " W Dir: " << m.windDirection << endl
<< " Temp: " << m.temperature << endl
<< endl;
}
};
int main(int argc, char* argv[])
{
Ice::CommunicatorPtr communicator = Ice::initialize(argc, argv);
//注意这里要用config.service中定义的instance名。
Ice::ObjectPrx obj = communicator->stringToProxy("WeatherService/TopicManager:default -h localhost -p 9999");
IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(obj);
Ice::ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("MonitorAdapter","default -h localhost -p 5438");
MonitorPtr monitor = new MonitorI;
Ice::ObjectPrx proxy = adapter->addWithUUID(monitor)->ice_oneway();
adapter->activate();
IceStorm::TopicPrx topic;
try {
topic = topicManager->retrieve("Weather");
IceStorm::QoS qos;
topic->subscribeAndGetPublisher(qos, proxy);
}
catch (const IceStorm::NoSuchTopic&) {
// Error! No topic found!
}
communicator->waitForShutdown();
topic->unsubscribe(proxy);
}
运行IceStorm,publisher,subscriber,如下图