天天看點

基于nodejs和activeMQ的消息推送

       好久沒來寫部落格了,感覺自己堕落了,哈哈,LZ糊裡糊塗的又換了家機關,空餘時間研究了一下nodejs,順勢搞了這個demo,今天來聊聊基于nodejs和activeMQ的消息推送,内容不算複雜,新手也能一看即會。

       首先介紹下一點背景,為什麼搞這個東西,LZ上家公司是做監控項目的,很多告警都要實時推送到web端,以前的技術架構是flex+corba+mq(自己封裝的),早期B/S架構的實時推送無非就兩種,一個是基于插件的長連接配接,另外一個就是輪訓或者Comet,前者受限于flash技術的衰敗肯定會逐漸退出曆史舞台,後者的低效率在資料量較大的情況下有随時崩潰的可能。随着html5的web socket技術的産生,很多前端socket技術,例如socket.io逐漸得到重用,配合nodejs使用,一個前後端socket連接配接就可以很輕松地解決問題。

       本篇使用技術socket.io+nodeJs+stomp+activeMQ,具體的技術簡介就不貼了,各位可以在網絡上輕松找到,這裡直接代碼開路。首先是頁面index.htm

<html>
<head>
  <style type="text/css">
    #messages { padding: 0px; list-style-type: none;}
    #messages li { padding: 2px 0px; border-bottom: 1px solid #ccc; }
  </style>
  <script src="http://code.jquery.com/jquery-1.7.1.min.js"></script>
  <script src="/socket.io/socket.io.js"></script>
  <script>
      $(function() {
          var socket = io.connect();
          socket.on('connect', function() {
				//
          });
		  socket.on('message', function(message) {
			  $('#messages').append($('<li></li>').text(message));
		  });
		  socket.on('disconnect', function() {
			  $('#messages').append('<li>Disconnected</li>');
		  });
      });
  </script>
</head>
<body>
 <ul id="messages"></ul>
 <hr>
</body>
</html>
           

        代碼中的/socket.io/socket.io.js,不是真實存在的,是nodejs加載的socket.io子產品,這個插件需要在nodejs中進行添加,指令如下:

npm install socket.io      

       接下來是nodejs端代碼,運作該代碼前,要首先啟動activeMQ,保證activeMQ,監聽了stomp協定和61613端口,LZ在這方面吃了虧,浪費了好長時間,配置檔案目錄:apache-activemq-5.8.0\conf\activemq.xml

<transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
</transportConnectors>      
var Stomp = require('stomp-client');  
var destination = '/topic/myTopic';  
var client = new Stomp('127.0.0.1', 61613, 'user', 'pass');  

var fs = require('fs'),
    http = require('http'),
    sio = require('socket.io');

var server = http.createServer(function(req, res) {
    res.writeHead(200, { 'Content-type': 'text/html' });
    res.end(fs.readFileSync('./index.htm'));
});
server.listen(8888, function() {
    console.log('Server listening at http://localhost:8888/');
});
// Attach the socket.io server
io = sio.listen(server);
var tempSockets = [];
 
io.sockets.on('connection', function(socket) {
	console.log('socket connection success!!!');
	tempSockets.push(socket);
	socket.on('disconnect',function(){
	});
});

client.connect(function(sessionId) {  
	client.subscribe(destination, function(body, headers) {  
		console.log('From MQ:', body);
		//tempSocket.broadcast.emit('message', body);
		tempSockets.forEach(function(socket) {
			if(socket.connected)
			{
				socket.send(body);
			}
		})
	});
	var i = 0;
	function publish(){
		setTimeout(function(){
			client.publish(destination, 'Hello World!'+(i++));
			//publish();
		},1000);
	}
	publish();
});
      

       文中的stomp-client子產品也需要nodejs添加

npm install stomp-client
           

        上面代碼中,client.publish(destination, 'Hello World!'+(i++)),是nodejs作為提供者向activeMQ發送消息,實際應用中是java端的提供者,代碼如下:

package com.feng.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author songfeng
 * @version 1.0.
 */
public class SendMessage
{

	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "choice.queue";
	private static final String TOPIC_NAME = "myTopic";
	protected String expectedBody = "<hello>world!</hello>";

	public void sendMessage() throws JMSException
	{
		Connection connection = null;
		MessageProducer producer = null;
		Session session = null;
		try
		{
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
			connection = (Connection) connectionFactory.createConnection();
			connection.start();
			session = (Session) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			//Destination destination = session.createQueue(QUEUE_NAME);
			Destination destination = session.createTopic(TOPIC_NAME);
			producer = session.createProducer(destination);
			for(int i = 1 ; i <= 20; i++)
			{
				TextMessage message = session.createTextMessage(expectedBody+i);
				message.setStringProperty("headname", "remoteB");
				producer.send(message);
				Thread.sleep(1000l);
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally 
		{
			producer.close();
			session.close();
			connection.close();
		}
	}

	public static void main(String[] args)
	{
		SendMessage sendMsg = new SendMessage();
		try
		{
			sendMsg.sendMessage();
		}
		catch (Exception ex)
		{
			System.out.println(ex.toString());
		}
	}
}
           

       是不是很簡單,重要的是研究的過程,哈哈,繼續撸碼。。。