天天看點

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

(一)COMET介紹:

      還在為AJAX贊歎的時候,COMET竟也悄悄降臨,更有甚者已經将COMET比作是AJAX的接班人。暫且不考慮服務性能和維持connection的負擔,COMET的日益走紅,讓SERVER PUSH這樣在傳統C/S模式下需要借助線程服務和SOCKET通信的實作,衍生成為了借助事件和Script注冊機制的WEB應用架構。

      SERVER PUSH就如同它自己的名字一樣,将更多華麗的web體驗“推”進了我們的視野。

      一位叫Grace Lin的工程師在自己的部落格中對應用SERVER PUSH技術的線上股票系統進行了一番詳細的介紹 。(位址:http://www.zkoss.org/smalltalks/stockchart/)同時,更是有很多IM應用湧現出來。本主題将從AJAX和COMET兩者的結構特點進行比較,同時層層深入剖析COMET技術特點,和相應應用。

AJAX技術架構:

      大家都了解AJAX的結構,這裡不再深入探究。AJAX的架構千奇百怪,但實際上就是強調了異步這個特點:資料請求後,異步的伺服器處理和使用者界面操作。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

圖1.1 傳統模型與AJAX模型的對比

AJAX加入了異步運作的通訊架構。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

圖1.2 AJAX異步通訊模型

      似乎一切都是那麼完美,但是實際上AJAX隻是實作了單使用者的響應回調,與伺服器的異步通訊,但是并不能反映多使用者的協同相應。一個頁面中同時存在的多個AJAX的異步很可能讓使用者修改了沒有顯示出來的資料,在邏輯上存在資料庫事務中“髒讀”或者“幻影讀”的概念,還沒有看到資料,發出下一個請求(當然請求是發送到了别的頁面元素,但願有頁面元素中仍然是重新整理了未修改的AJAX請求資訊。)這些似乎都是AJAX的軟肋。而COMET實際上就是解決這些問題的解決架構。

COMET技術架構:

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

圖2.1 AJAX與COMET架構對比

      在應用COMET架構的網頁中,頁面初始化後,會維持一連接配接,同時監聽伺服器端的事件資訊。伺服器通過事件機制來完成對浏覽器(也可以是用戶端)的“推”機制。

      不同用戶端同時監聽到伺服器端的事件,并獲得伺服器傳來的資料,而每一個用戶端的請求都變成伺服器的事件在網絡中進行“廣播”。

      當然COMET需要基于特定的伺服器環境,這也是它與AJAX的很大不同。

      網上出現過基于用戶端的伺服器推技術,實際上都是使用IFRAME,APPLET,FLASH這樣的元素實作的長連接配接,在http://www.ibm.com/developerworks/cn/web/wa-lo-comet/一文中,IBM工程師對這幾者做了相應介紹,有時間的話,可以做幾個demo出來與大家交流,但下一篇中,還是為大家分析一個基于comet的IM架構。

(二)基于SERVER PUSH的消息傳輸

引言:

      在上一篇随筆中,對COMET的機制和原理做了簡短的介紹。網上的确有很多類似的資料可以檢視,基于PHP,JSP,.NET的架構更是層出不窮。本文将簡要介紹并分析一個基于.NET的SERVER PUSH架構,實作了從消息的釋出,廣播,到接收等一系列SERVER PUSH關鍵技術。

COMET技術要點:

      COMET的理念是先進而直接的,從伺服器釋出資訊,同時讓浏覽器(也可以是用戶端)接收并響應消息事件。就是這樣簡單的理念,造成了系統設計上的多重麻煩。

      首先就是如何讓我們的浏覽器接收到伺服器發來的事件呢?

      有一種實作方式是借由無實體大小的FLASH,IFRAME或者APPLET等元件來間接打通用戶端SOCKET,然後向這些元件中推入資訊,并通過javascript線程,得到元件傳回的資訊(當然也有極個别情況下,由這些元件來直接向頁面注入資訊)。

      從最終結果看,我們好似得到了一個伺服器的穩定連接配接,并重新整理了頁面資訊。但是實際上,這樣的實作方式實在是有點不夠“直接”。

      FLASH(或FLEX)本身可以通過WEBSERVICE元件調用WSDL,或者直接調用java serverlet,這是FLASH PLAYER給我們的便利條件,但通過FLASH通訊的辦法來實作SERVER PUSH總是讓人覺得無法接受,為何不直接做一個FLASH通訊呢。

      APPLET更不用說,直接将JAR包“推”到頁面運作,啟動麻煩的JRE的同時還要下載下傳龐大的JAVA程式。沒有JRE的情況下,還需要相應下載下傳。

      SILVER LIGHT的機制與FLASH類似,那是由于并沒有FLASH PLAYER普及,使用者同樣需要下載下傳相應播放器。

      以上三者實在談不上是完全的web應用,我們隻是想用最純粹的javascript和html來解決問題,又何必勞師動衆呢?單純的javascript事件和線程,難道就不能滿足我們的需要麼?

      于是又有人想到了IFRAME,通過一個隐形的IFRAME來發送AJAX請求,通過長輪詢得到消息。但這也不是真正意義上COMET,頂多是一個不成熟的AJAX應用。

      COMET的精髓就在于用伺服器與javascript來維持浏覽器的長連接配接,同時完成伺服器端事件的浏覽器端響應。這樣的事件廣播機制是跨網絡的,同時也是實時的。

COMET技術難點:

      1.維持長連接配接

      2.維持伺服器端的“心跳”

      3.浏覽器端對“心跳”的“感應”

      4.維持長連接配接與逾時的平衡

      5.線程同步

      6.通用的接口設計

     PS:這裡需要注意的是,我們是在說COMET技術,而不是在談論如何找女朋友。雖然從某個角度來看這二者似乎存在着很多類似的地方……

.NET下的COMET實作:

      我們需要COMET,我們需要源碼,我們需要最單純的實作。本着這三點需求,我為大家準備了以下這個例子,James Simpson的基于COMET技術的.NET IM。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

圖1.1 COMET基類類圖

      實體類:

      1.CometClient:                      COMET的服務實體,執行個體化一個CometClinet的意義在于記錄必要的服務資訊,比如使用者名稱逾時設定等。

      2.CometMessage:                  包含對服務資訊和相信屬性的實體類,是消息傳輸的主體。

      3.InProcCometStateProvider:繼承了ICometStateProvider并實作了消息操作的方法,同時将消息儲存在記憶體中。

      接口類:

      1.ICometStateProvider:          定義了一系列的接口來提供對CometClient實體和CometMessage的操作。

      異常類:

      1.CometException:                沒有什麼特殊的異常類。

      事件:

      1.CometClientEventHandler:  CometClient的事件監聽類和廣播類。

      線程相關:

      1.CometWaitThread:              伺服器端線程池,排隊等待資訊請求并調用,同時提供逾時等操作。

      2.CometWaitRequest:            伺服器端線程,監聽用戶端的message請求。

      3.CometAsyncResult:           異步請求的管理類,通過繼承SYSTEM.IAsyncResult來實作異步。

      主體:

      1.CometStateManager:         用來管理線程池,管理線程和使用者連接配接,消息轉發等一系列操作的工廠類。

     ps:從某種意義上來說,這個例子最好的地方就是在于它的CometStateManager實作是可以被拓展的,定制自己的管理模式,定制自己的存儲操作等都可以通過這個類來進行擴充。線程管理在這個DEMO裡做得相對成熟,而且事件機制可以通過定制Adapter更加豐富。

      由于時間有限,下一篇随筆中将介紹幾個關鍵類的具體實作,和一些關于設計模式的思考。年末了,希望園子裡的朋友們都過個收獲滿滿的年。

COMET彗星(三)建構自己的COMET核心

引言:

      在上一篇随筆中,對COMET使用的類和作用進行了簡短的介紹,從本篇随筆開始,将從實體類開始,對COMET的核心進行建構分析。

CORE架構:

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息傳輸COMET彗星(三)建構自己的COMET核心

圖1.1 COMET核心架構

CometMessage類:

     CometMessage類是COMET的通信載體,對消息的主體進行抽象,實際上這個類是最容易進行擴充的,因為從設計上看,它隻是一個消息的容器。而諸如地理坐标,業務資料等,都可以通過這個類來進行直接擴充。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// CometMessage Class
///
/// This is a CometMessage that has been sent to the client, the DataContract names have been
/// shortened to remove any bytes we dont need from the message (ok, did'nt save much, but we can do it!)
/// </summary>
    [DataContract(Name="cm")]
public class CometMessage
{
[DataMember(Name="mid")]
private long messageId;
[DataMember(Name="n")]
private string name;
[DataMember(Name="c")]
private object contents;
/// <summary>
/// Gets or Sets the MessageId, used to track which message the Client last received
/// </summary>
        public long MessageId
{
get { return this.messageId; }
set { this.messageId = value; }
}
/// <summary>
/// Gets or Sets the Content of the Message
/// </summary>
        public object Contents
{
get { return this.contents; }
set { this.contents = value; }
}
/// <summary>
/// Gets or Sets the error message if this is a failure
/// </summary>
        public string Name
{
get { return this.name; }
set { this.name = value; }
}
}
}
           

      類的設計簡單明了,這裡有必要解釋下使用System.Runtime.Serialization命名空間的意義。

      “System.Runtime.Serialization 命名空間包含可用于将對象序列化和反序列化的類。序列化是将對象或對象圖形轉換為線性位元組序列,以存儲或傳輸到另一個位置的過程。反序列化是接受存儲的資訊并利用它重新建立對象的過程。”

      這是MSDN給我們的解釋,将對象轉變為線性位元組,然後友善傳輸與調用。當然這個例子中的資料類型并不複雜,但也包含了LONG,OBJECT,STRING這樣的資料類型。其中Contents成員為object對象,這給我們留下了非常大的想像空間。(圖檔?複雜對象類型?自定義對象類型?……)

CometClient類:

      CometClient類是對用戶端資訊的抽象類,同時包含了兩個關鍵屬性ConnectionIdleSeconds和ConnectionTimeoutSeconds。由于考慮到不同用戶端間傳遞屬性,仍然使用System.Runtime.Serialization來序列化資訊。

      關于JSON的應用,這個架構其實也提供了相應支援。後面的随筆中我會介紹。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// CometClient Class
///
/// This represents a logged in client within the COMET application.  This marked as a DataContract becuase
/// it can be seralized to the client using JSON
/// </summary>
    [DataContract]
public class CometClient
{
[DataMember]
private string privateToken;
[DataMember]
private string publicToken;
[DataMember]
private string displayName;
[DataMember]
private DateTime lastActivity;
[DataMember]
private int connectionIdleSeconds;
[DataMember]
private int connectionTimeoutSeconds;
/// <summary>
/// Gets or Sets the token used to identify the client to themselves
/// </summary>
        public string PrivateToken
{
get { return this.privateToken; }
set { this.privateToken = value; }
}
/// <summary>
/// Gets or Sets the token used to identify the client to other clients
/// </summary>
        public string PublicToken
{
get { return this.publicToken; }
set { this.publicToken = value; }
}
/// <summary>
/// Gets or Sets the display name of the client
/// </summary>
        public string DisplayName
{
get { return this.displayName; }
set { this.displayName = value; }
}
/// <summary>
/// Gets or Sets the last activity of the client
/// </summary>
        public DateTime LastActivity
{
get { return this.lastActivity; }
set { this.lastActivity = value; }
}
/// <summary>
/// Gets or Sets the ConnectionIdleSections property which is the number of seconds a connection will remain
/// alive for without being connected to a client, after this time has expired the client will
/// be removed from the state manager
/// </summary>
        public int ConnectionIdleSeconds
{
get { return this.connectionIdleSeconds; }
set { this.connectionIdleSeconds = value; }
}
/// <summary>
/// Gets or Sets the ConnectionTimeOutSections property which is the number of seconds a connection will remain
/// alive for whilst being connected to a client, but without receiving any messages.  After a timeout has expired
/// A client should restablish a connection to the server
/// </summary>
        public int ConnectionTimeoutSeconds
{
get { return this.connectionTimeoutSeconds; }
set { this.connectionTimeoutSeconds = value; }
}
}
}
           

ConnectionIdleSeconds:用來設定連接配接線程,當connection斷線後,背景Thread的存活時間。

ConnectionTimeoutSeconds:用戶端的逾時時間,當超過時間後,用戶端重新連接配接伺服器。

ps:有這兩個屬性後,基本上完成了用戶端連接配接的控制,逾時重連接配接,無連接配接時殺死背景線程。

ICometStateProvider接口:

     ICometStateProvider接口直接被CometStateMessager建立,這樣的好處是執行個體化CometStateMessager對象後,CometStateMessager對象可以直接調用ICometStateProvider接口的實作,實際上實作了Adapter的方式,我們可以定制不同的InProcCometStateProvider類(在下面會提到)來定制自己的接口。

      ICometStateProvider接口類提供了如下幾個接口。

      InitializeClient:提供ComentClient的初始化操作。

      GetMessages:得到一個Messages的消息隊列。

      SendMessage:發送Message資料。

      SendMessage:可以針對Client name來進行消息釋出(私人會話)。

      GetCometClient:傳回一個Client。

      KillIdleCometClient:殺掉一個無效的Client對象。

      ps:當然這個接口是可以被拓展的,而且實作起來非常簡單。得到Client隊列資訊,得到Client狀态等等。甚至你可以想象一些更複雜的應用(比如加入一個流媒體消息……)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// This interface can be implemented to provide a custom state provider
/// for the CometStateManager class.  Typical examples may be using SqlServer
/// to enable the operation over a server farm
/// </summary>
    public interface ICometStateProvider
{
/// <summary>
/// Implementation of this method should store the cometClient instance in some sort
/// of cache (eg Memory, Db etc..)
/// </summary>
/// <param name="cometClient"></param>
        void InitializeClient(CometClient cometClient);
/// <summary>
/// Imeplementation of this method should return all the messages that are queued
/// for a specific client, it is only interested in messages that have a greater id than
/// lastMessageId
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <param name="lastMessageId"></param>
/// <returns></returns>
        CometMessage[] GetMessages(string clientPrivateToken, long lastMessageId);
/// <summary>
/// Implementation of this method should queue a message for the specific client
/// </summary>
/// <param name="clientPublicToken"></param>
/// <param name="name"></param>
/// <param name="contents"></param>
        void SendMessage(string clientPublicToken, string name, object contents);
/// <summary>
/// Implementation of this method should queue a message for all the clients
/// </summary>
/// <param name="name"></param>
/// <param name="contents"></param>
        void SendMessage(string name, object contents);
/// <summary>
/// Implementation of this method should return a specific comet client
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <returns></returns>
        CometClient GetCometClient(string clientPrivateToken);
/// <summary>
/// Implementation of this method should remove a client from the cache
/// </summary>
/// <param name="clientPrivateToken"></param>
        void KillIdleCometClient(string clientPrivateToken);
}
}
           

InProcCometStateProvider類:

      InProcCometStateProvider類實作了ICometStateProvider接口,并且提供了一個很好的範例,針對這個類,我們可以想象很多很好的拓展,諸如調用AO元件,封裝報警資訊等等。

      InProcCometStateProvider類包含類一個私有的内部類InProcCometStateProvider,實際上可以了解為一種對消息的簡單封裝,設計的時候考慮到需要關聯CometClient和Message,其實也可以單獨作為一個外部類來設計。不過Adapter本身不應該太多關聯類,這樣做也是權衡了一些拓展上的需求。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// Class InProcCometStateProvider
///
/// This class provides an implementation of ICometStateProvider that keeps the
/// information in memory.  This provider is not scalable as it will not run on a server
/// farm but demonstrates how you should implemement the provider.
/// </summary>
    public class InProcCometStateProvider : ICometStateProvider
{
/// <summary>
/// Private class which holds the state of each connected client
/// </summary>
        private class InProcCometClient
{
public CometClient CometClient;
public Dictionary<long, CometMessage> Messages = new Dictionary<long, CometMessage>();
public long NextMessageId = 1;
}
/// <summary>
/// Cache of clients
/// </summary>
        private Dictionary<string, InProcCometClient> publicClients = new Dictionary<string, InProcCometClient>();
private Dictionary<string, InProcCometClient> privateClients = new Dictionary<string, InProcCometClient>();
private static object state = new object();
#region ICometStateProvider Members
/// <summary>
/// Store the new client in memory
/// </summary>
/// <param name="cometClient"></param>
        public void InitializeClient(CometClient cometClient)
{
if (cometClient == null)
throw new ArgumentNullException("cometClient");
lock (state)
{
//  ok, ensure we dont already exist
                if (publicClients.ContainsKey(cometClient.PublicToken) || privateClients.ContainsKey(cometClient.PrivateToken))
throw CometException.CometClientAlreadyExistsException();
InProcCometClient inProcCometClient = new InProcCometClient()
{
CometClient = cometClient
};
//  stick the client int he arrays
//  ready to be used
                publicClients.Add(cometClient.PublicToken, inProcCometClient);
privateClients.Add(cometClient.PrivateToken, inProcCometClient);
}
//  ok, they are in there ready to be used
        }
/// <summary>
/// Get the messages for a specific client
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <param name="lastMessageId"></param>
/// <returns></returns>
        public CometMessage[] GetMessages(string clientPrivateToken, long lastMessageId)
{
if(string.IsNullOrEmpty(clientPrivateToken))
throw new ArgumentNullException("clientPrivateToken");
lock (state)
{
if (!privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//
//  ok, get the client
                InProcCometClient cometClient = privateClients[clientPrivateToken];
List<long> toDelete = new List<long>();
List<long> toReturn = new List<long>();
//  wicked, we have the client, so we can get its messages from our list
//  we delete any before the last messageId becuase we dont want them
                foreach(long key in cometClient.Messages.Keys)
{
if(key <= lastMessageId)
toDelete.Add(key);
else
toReturn.Add(key);
}
//  delete the ones from the messages
                foreach (long key in toDelete)
{
cometClient.Messages.Remove(key);
}
//  and return the ones in the toReturn array
                List<CometMessage> cometMessages = new List<CometMessage>();
foreach (long key in toReturn)
{
cometMessages.Add(cometClient.Messages[key]);
}
return cometMessages.ToArray();
}
}
/// <summary>
/// Send a message to a specific client
/// </summary>
/// <param name="clientPublicToken"></param>
/// <param name="name"></param>
/// <param name="contents"></param>
        public void SendMessage(string clientPublicToken, string name, object contents)
{
if (string.IsNullOrEmpty(clientPublicToken))
throw new ArgumentNullException("clientPublicToken");
if (contents == null)
throw new ArgumentNullException("contents");
lock (state)
{
if (!publicClients.ContainsKey(clientPublicToken))
throw CometException.CometClientDoesNotExistException();
//
//  ok, get the client
                InProcCometClient cometClient = publicClients[clientPublicToken];
// ok, stick the message in the array
                CometMessage message = new CometMessage();
message.Contents = contents;
message.Name = name;
message.MessageId = cometClient.NextMessageId;
//  increment
                cometClient.NextMessageId++;
cometClient.Messages.Add(message.MessageId, message);
}
}
/// <summary>
/// Send a message to all the clients
/// </summary>
/// <param name="name"></param>
/// <param name="contents"></param>
        public void SendMessage(string name, object contents)
{
if (contents == null)
throw new ArgumentNullException("contents");
lock (state)
{
foreach (InProcCometClient cometClient in publicClients.Values)
{
// ok, stick the message in the array
                    CometMessage message = new CometMessage();
message.Contents = contents;
message.Name = name;
message.MessageId = cometClient.NextMessageId;
//  increment
                    cometClient.NextMessageId++;
cometClient.Messages.Add(message.MessageId, message);
}
}
}
/// <summary>
/// Get the client from the state provider
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <returns></returns>
        public CometClient GetCometClient(string clientPrivateToken)
{
if (!this.privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//  return the client private token
            return this.privateClients[clientPrivateToken].CometClient;
}
/// <summary>
/// Remove an idle client from the memory
/// </summary>
/// <param name="clientPrivateToken"></param>
        public void KillIdleCometClient(string clientPrivateToken)
{
if (!this.privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//  get the client
            InProcCometClient ipCometClient = this.privateClients[clientPrivateToken];
//  and remove the dictionarys
            this.privateClients.Remove(ipCometClient.CometClient.PrivateToken);
this.publicClients.Remove(ipCometClient.CometClient.PublicToken);
}
#endregion
}
}
           

繼續閱讀