天天看点

Poco库使用:事件通知

文章目录

    • 通知中心NotificationCenter
    • 通知队列NotificationQueue
    • 优先级通知队列PriorityNotificationQueue
    • 带时间戳的通知队列TimedNotificationQueue

Poco为模块之间的通信提供了一套事件通知机制,可以实现模块间和线程间的通信。有点类似于QT中的信号和槽机制。通过该机制我们可以在不同的模块单元之间传递自定义数据。通知框架支持两种策略:NotificationCenter(通知中心)和NotificationQueue(通知队列) 。其中通知队列又包含优先级通知队列和带时间戳的通知队列。

通知中心NotificationCenter

通知中心有点像消息订阅的模式。我们通过给通知中心添加观察者(也就是回调函数),来实现消息的订阅。添加了消息观察者之后当我们发送消息的时候,对应的消息观察者就可以收到对应的消息了。有一点需要注意:每个消息观察者绑定的消息类型是固定的,我们可以通过消息类型来实现将特定的消息发送给特定的观察者。对应的调用实现如下:

//自定义消息
class TestNotification: public Notification
{
};

//通知中心
class NotificationCenterSample
{
public:
	NotificationCenterSample();
	~NotificationCenterSample();
public:
	void useNotification();

protected:
	void handle1(Poco::Notification* pNf);
	void handle2(Poco::Notification* pNf);
	void handleAuto(const Poco::AutoPtr<Poco::Notification>& pNf);

private:
	std::set<std::string> _set;
};

void NotificationCenterSample::handle1(Poco::Notification* pNf)
{
	poco_check_ptr (pNf);
	AutoPtr<Notification> nf = pNf;
	_set.insert("handle1");
}
void NotificationCenterSample::handle2(TestNotification* pNf)
{
	poco_check_ptr (pNf);
	AutoPtr<TestNotification> nf = pNf;
	_set.insert("handleTest");
}
void NotificationCenterSample::handleAuto(const AutoPtr<Notification>& pNf)
{
	_set.insert("handleAuto");
}

void NotificationCenterSample::useNotification()
{
	//缺省的通知类
    //NotificationCenter& nc = NotificationCenter::defaultCenter();
	//通知的观察者
	NotificationCenter nc;
	Observer<NotificationCenterSample, Notification> o(*this, &NotificationCenterSample::handle1);
	//使用自定义的消息
	Observer<NotificationCenterSample, TestNotification> o2(*this, &NotificationCenterSample::handle2);
	//对消息参数进行保护
	nc.addObserver(NObserver<NotificationCenterSample, Notification>(*this, &NotificationCenterSample::handleAuto));
   //添加消息的观察者
	nc.addObserver(o);
	nc.addObserver(o2);
	//发送通知
	nc.postNotification(new Notification);
	//移除观察者
	nc.removeObserver(Observer<NotificationCenterSample, Notification>(*this, &NotificationCenterSample::handle1));
	//判断是否包含对应的回调函数
	//assertTrue (!nc.hasObserver(o));
	//assertTrue (!nc.hasObservers());
	//assertTrue (nc.countObservers() == 0);
}
           

通知队列NotificationQueue

通知队列和通知中心不同,通知队列是一个消息队列。通知队列的两端一端是消息的生产者一端是消息的消费者。生产者负责向消息队列中添加消息,消费者负责读取队列中的消息进行处理。在一些并发性比较高的场景下,我们可以通过引入通知队列,缓解服务端的压力。下面介绍一下通知队列的用法:

//自定义通知
class QTestNotification: public Notification
{
public:
	QTestNotification(const std::string& data): _data(data)
	{
	}
	~QTestNotification()
	{
	}
	const std::string& data() const
	{
		return _data;
	}

private:
	std::string _data;
};

//使用通知队列
void useQueueDequeue()
{
    //可以使用缺省的消息队列也可以使用自定义的消息队列。
	//缺省队列
	//NotificationQueue& queue = NotificationQueue::defaultQueue();
	NotificationQueue queue;
	//队列的长度
	int queue_size = queue.size();
	//入队列
	queue.enqueueNotification(new Notification);
	//出队列
	Notification* pNf = queue.dequeueNotification();
	//释放通知
	pNf->release();
	
	//添加自定义通知
	queue.enqueueNotification(new QTestNotification("first"));
	queue.enqueueNotification(new QTestNotification("second"));
	
    //添加紧急通知,紧急通知优先级更高
	queue.enqueueUrgentNotification(new QTestNotification("third"));
	QTestNotification* pTNf = dynamic_cast<QTestNotification*>(queue.dequeueNotification());
	pTNf->release();
	
	//等待消息来临,可以指定超时时间
	QTestNotification* pTNf = dynamic_cast<QTestNotification*>(queue.waitDequeueNotification(10));
}

//在线程中使用通知队列
class TestClass
{
public:
	void useThreads()

protected:
	void work();

private:
	Poco::NotificationQueue    _queue;
	std::multiset<std::string> _handled;
	Poco::FastMutex            _mutex;
};
void TestClass::work()
{
	Poco::Random rnd;
	Thread::sleep(50);
	Notification* pNf = _queue.waitDequeueNotification();
	while (pNf)
	{
		pNf->release();
		_mutex.lock();
		_handled.insert(Thread::current()->name());
		_mutex.unlock();
		Thread::sleep(rnd.next(5));
		pNf = _queue.waitDequeueNotification();
	}
}
void TestClass::useThreads()
{
	const int NOTIFICATION_COUNT = 5000;
	Thread t1("thread1");
	Thread t2("thread2");
    //开辟多个线程,在线程中添加通知
	RunnableAdapter<TestClass> ra(*this, &TestClass::work);
	t1.start(ra);
	t2.start(ra);
	
	for (int i = 0; i < NOTIFICATION_COUNT; ++i)
	{
       _queue.enqueueNotification(new Notification);
	}
	while (!_queue.empty()) Thread::sleep(50);
	Thread::sleep(20);
   //唤醒所有的等待
	_queue.wakeUpAll();
	t1.join();
	t2.join();
	t3.join();
}
           

优先级通知队列PriorityNotificationQueue

普通通知队列的消息顺序是先进先出,后进后出(FIFO),消息队列中消息的顺序跟进入的时间是有关系的。而优先级队列中我们可以给消息指定一个固定的优先级,队列通过优先级的大小来判断消息处理的先后顺序。优先级通知队列的使用方法如下:

//自定义消息
class QTestNotification: public Notification
{
public:
	QTestNotification(const std::string& data): _data(data)
	{
	}
	~QTestNotification()
	{
	}
	const std::string& data() const
	{
		return _data;
	}

private:
	std::string _data;
};

void usePriorityQueueDequeue()
{
	//默认优先级队列或自定义优先级队列
	//PriorityNotificationQueue& queue = PriorityNotificationQueue::defaultQueue();
	PriorityNotificationQueue queue;
   //指定消息的优先级
	queue.enqueueNotification(new Notification, 1);
   //获得队列的大小
	int queue_size = queue.size();
   //出队列
	Notification* pNf = queue.dequeueNotification();
	pNf->release();
	//使用自定义通知
	queue.enqueueNotification(new QTestNotification("first"), 1);
	queue.enqueueNotification(new QTestNotification("fourth"), 4);
	QTestNotification* pTNf = dynamic_cast<QTestNotification*>(queue.dequeueNotification());
	pTNf->release();
	//等待通知
	pTNf = dynamic_cast<QTestNotification*>(queue.waitDequeueNotification(10));
}
           

带时间戳的通知队列TimedNotificationQueue

带事件戳的消息队列中我们可以自己指定消息的事件戳。消息队列根据消息的时间戳的大小来判定通知的优先级。带时间戳的通知队列的使用方法如下所示:

//自定义消息
class QTestNotification: public Notification
{
public:
	QTestNotification(const std::string& data): _data(data)
	{
	}
	~QTestNotification()
	{
	}
	const std::string& data() const
	{
		return _data;
	}

private:
	std::string _data;
};

void useTimeDequeue()
{
    //发送缺省消息
	TimedNotificationQueue queue;
	queue.enqueueNotification(new Notification, Timestamp());
	Notification* pNf = queue.dequeueNotification();
	pNf->release();

	//使用自定义通知
   //为每个通知添加一个时间戳
	Poco::Clock ts1; ts1 += 100000;
	Poco::Clock ts2; ts2 += 200000;
	queue.enqueueNotification(new QTestNotification("first"), ts1);
	queue.enqueueNotification(new QTestNotification("second"), ts2);
	QTestNotification* pTNf = 0;
	while (!pTNf)
	{
       pTNf = dynamic_cast<QTestNotification*>(queue.dequeueNotification());
	}
	pTNf->release();
	//等待通知,可以指定超时时间
	QTestNotification* pTNf = dynamic_cast<QTestNotification*>(queue.waitDequeueNotification());

}