天天看点

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

(一)COMET介绍:

      还在为AJAX赞叹的时候,COMET竟也悄悄降临,更有甚者已经将COMET比作是AJAX的接班人。暂且不考虑服务性能和维持connection的负担,COMET的日益走红,让SERVER PUSH这样在传统C/S模式下需要借助线程服务和SOCKET通信的实现,衍生成为了借助事件和Script注册机制的WEB应用框架。

      SERVER PUSH就如同它自己的名字一样,将更多华丽的web体验“推”进了我们的视野。

      一位叫Grace Lin的工程师在自己的博客中对应用SERVER PUSH技术的在线股票系统进行了一番详细的介绍 。(地址:http://www.zkoss.org/smalltalks/stockchart/)同时,更是有很多IM应用涌现出来。本主题将从AJAX和COMET两者的结构特点进行比较,同时层层深入剖析COMET技术特点,和相应应用。

AJAX技术框架:

      大家都理解AJAX的结构,这里不再深入探究。AJAX的框架千奇百怪,但实际上就是强调了异步这个特点:数据请求后,异步的服务器处理和用户界面操作。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

图1.1 传统模型与AJAX模型的对比

AJAX加入了异步运行的通讯框架。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

图1.2 AJAX异步通讯模型

      似乎一切都是那么完美,但是实际上AJAX只是实现了单用户的响应回调,与服务器的异步通讯,但是并不能反映多用户的协同相应。一个页面中同时存在的多个AJAX的异步很可能让用户修改了没有显示出来的数据,在逻辑上存在数据库事务中“脏读”或者“幻影读”的概念,还没有看到数据,发出下一个请求(当然请求是发送到了别的页面元素,但愿有页面元素中仍然是刷新了未修改的AJAX请求信息。)这些似乎都是AJAX的软肋。而COMET实际上就是解决这些问题的解决框架。

COMET技术框架:

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

图2.1 AJAX与COMET框架对比

      在应用COMET框架的网页中,页面初始化后,会维持一连接,同时监听服务器端的事件信息。服务器通过事件机制来完成对浏览器(也可以是客户端)的“推”机制。

      不同客户端同时监听到服务器端的事件,并获得服务器传来的数据,而每一个客户端的请求都变成服务器的事件在网络中进行“广播”。

      当然COMET需要基于特定的服务器环境,这也是它与AJAX的很大不同。

      网上出现过基于客户端的服务器推技术,实际上都是使用IFRAME,APPLET,FLASH这样的元素实现的长连接,在http://www.ibm.com/developerworks/cn/web/wa-lo-comet/一文中,IBM工程师对这几者做了相应介绍,有时间的话,可以做几个demo出来与大家交流,但下一篇中,还是为大家分析一个基于comet的IM框架。

(二)基于SERVER PUSH的消息传输

引言:

      在上一篇随笔中,对COMET的机制和原理做了简短的介绍。网上的确有很多类似的资料可以查看,基于PHP,JSP,.NET的框架更是层出不穷。本文将简要介绍并分析一个基于.NET的SERVER PUSH框架,实现了从消息的发布,广播,到接收等一系列SERVER PUSH关键技术。

COMET技术要点:

      COMET的理念是先进而直接的,从服务器发布信息,同时让浏览器(也可以是客户端)接收并响应消息事件。就是这样简单的理念,造成了系统设计上的多重麻烦。

      首先就是如何让我们的浏览器接收到服务器发来的事件呢?

      有一种实现方式是借由无实体大小的FLASH,IFRAME或者APPLET等组件来间接打通客户端SOCKET,然后向这些元件中推入信息,并通过javascript线程,得到元件返回的信息(当然也有极个别情况下,由这些元件来直接向页面注入信息)。

      从最终结果看,我们好似得到了一个服务器的稳定连接,并刷新了页面信息。但是实际上,这样的实现方式实在是有点不够“直接”。

      FLASH(或FLEX)本身可以通过WEBSERVICE组件调用WSDL,或者直接调用java serverlet,这是FLASH PLAYER给我们的便利条件,但通过FLASH通讯的办法来实现SERVER PUSH总是让人觉得无法接受,为何不直接做一个FLASH通讯呢。

      APPLET更不用说,直接将JAR包“推”到页面运行,启动麻烦的JRE的同时还要下载庞大的JAVA程序。没有JRE的情况下,还需要相应下载。

      SILVER LIGHT的机制与FLASH类似,那是由于并没有FLASH PLAYER普及,用户同样需要下载相应播放器。

      以上三者实在谈不上是完全的web应用,我们只是想用最纯粹的javascript和html来解决问题,又何必劳师动众呢?单纯的javascript事件和线程,难道就不能满足我们的需要么?

      于是又有人想到了IFRAME,通过一个隐形的IFRAME来发送AJAX请求,通过长轮询得到消息。但这也不是真正意义上COMET,顶多是一个不成熟的AJAX应用。

      COMET的精髓就在于用服务器与javascript来维持浏览器的长连接,同时完成服务器端事件的浏览器端响应。这样的事件广播机制是跨网络的,同时也是实时的。

COMET技术难点:

      1.维持长连接

      2.维持服务器端的“心跳”

      3.浏览器端对“心跳”的“感应”

      4.维持长连接与超时的平衡

      5.线程同步

      6.通用的接口设计

     PS:这里需要注意的是,我们是在说COMET技术,而不是在谈论如何找女朋友。虽然从某个角度来看这二者似乎存在着很多类似的地方……

.NET下的COMET实现:

      我们需要COMET,我们需要源码,我们需要最单纯的实现。本着这三点需求,我为大家准备了以下这个例子,James Simpson的基于COMET技术的.NET IM。

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

图1.1 COMET基类类图

      实体类:

      1.CometClient:                      COMET的服务实体,实例化一个CometClinet的意义在于记录必要的服务信息,比如用户名称超时设置等。

      2.CometMessage:                  包含对服务信息和相信属性的实体类,是消息传输的主体。

      3.InProcCometStateProvider:继承了ICometStateProvider并实现了消息操作的方法,同时将消息保存在内存中。

      接口类:

      1.ICometStateProvider:          定义了一系列的接口来提供对CometClient实体和CometMessage的操作。

      异常类:

      1.CometException:                没有什么特殊的异常类。

      事件:

      1.CometClientEventHandler:  CometClient的事件监听类和广播类。

      线程相关:

      1.CometWaitThread:              服务器端线程池,排队等待信息请求并调用,同时提供超时等操作。

      2.CometWaitRequest:            服务器端线程,监听客户端的message请求。

      3.CometAsyncResult:           异步请求的管理类,通过继承SYSTEM.IAsyncResult来实现异步。

      主体:

      1.CometStateManager:         用来管理线程池,管理线程和用户连接,消息转发等一系列操作的工厂类。

     ps:从某种意义上来说,这个例子最好的地方就是在于它的CometStateManager实现是可以被拓展的,定制自己的管理模式,定制自己的存储操作等都可以通过这个类来进行扩展。线程管理在这个DEMO里做得相对成熟,而且事件机制可以通过定制Adapter更加丰富。

      由于时间有限,下一篇随笔中将介绍几个关键类的具体实现,和一些关于设计模式的思考。年末了,希望园子里的朋友们都过个收获满满的年。

COMET彗星(三)构建自己的COMET核心

引言:

      在上一篇随笔中,对COMET使用的类和作用进行了简短的介绍,从本篇随笔开始,将从实体类开始,对COMET的核心进行构建分析。

CORE框架:

COMET SERVER PUSH (IM Demo)(二)基于SERVER PUSH的消息传输COMET彗星(三)构建自己的COMET核心

图1.1 COMET核心框架

CometMessage类:

     CometMessage类是COMET的通信载体,对消息的主体进行抽象,实际上这个类是最容易进行扩展的,因为从设计上看,它只是一个消息的容器。而诸如地理坐标,业务数据等,都可以通过这个类来进行直接扩充。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// CometMessage Class
///
/// This is a CometMessage that has been sent to the client, the DataContract names have been
/// shortened to remove any bytes we dont need from the message (ok, did'nt save much, but we can do it!)
/// </summary>
    [DataContract(Name="cm")]
public class CometMessage
{
[DataMember(Name="mid")]
private long messageId;
[DataMember(Name="n")]
private string name;
[DataMember(Name="c")]
private object contents;
/// <summary>
/// Gets or Sets the MessageId, used to track which message the Client last received
/// </summary>
        public long MessageId
{
get { return this.messageId; }
set { this.messageId = value; }
}
/// <summary>
/// Gets or Sets the Content of the Message
/// </summary>
        public object Contents
{
get { return this.contents; }
set { this.contents = value; }
}
/// <summary>
/// Gets or Sets the error message if this is a failure
/// </summary>
        public string Name
{
get { return this.name; }
set { this.name = value; }
}
}
}
           

      类的设计简单明了,这里有必要解释下使用System.Runtime.Serialization命名空间的意义。

      “System.Runtime.Serialization 命名空间包含可用于将对象序列化和反序列化的类。序列化是将对象或对象图形转换为线性字节序列,以存储或传输到另一个位置的过程。反序列化是接受存储的信息并利用它重新创建对象的过程。”

      这是MSDN给我们的解释,将对象转变为线性字节,然后方便传输与调用。当然这个例子中的数据类型并不复杂,但也包含了LONG,OBJECT,STRING这样的数据类型。其中Contents成员为object对象,这给我们留下了非常大的想像空间。(图片?复杂对象类型?自定义对象类型?……)

CometClient类:

      CometClient类是对客户端信息的抽象类,同时包含了两个关键属性ConnectionIdleSeconds和ConnectionTimeoutSeconds。由于考虑到不同客户端间传递属性,仍然使用System.Runtime.Serialization来序列化信息。

      关于JSON的应用,这个框架其实也提供了相应支持。后面的随笔中我会介绍。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// CometClient Class
///
/// This represents a logged in client within the COMET application.  This marked as a DataContract becuase
/// it can be seralized to the client using JSON
/// </summary>
    [DataContract]
public class CometClient
{
[DataMember]
private string privateToken;
[DataMember]
private string publicToken;
[DataMember]
private string displayName;
[DataMember]
private DateTime lastActivity;
[DataMember]
private int connectionIdleSeconds;
[DataMember]
private int connectionTimeoutSeconds;
/// <summary>
/// Gets or Sets the token used to identify the client to themselves
/// </summary>
        public string PrivateToken
{
get { return this.privateToken; }
set { this.privateToken = value; }
}
/// <summary>
/// Gets or Sets the token used to identify the client to other clients
/// </summary>
        public string PublicToken
{
get { return this.publicToken; }
set { this.publicToken = value; }
}
/// <summary>
/// Gets or Sets the display name of the client
/// </summary>
        public string DisplayName
{
get { return this.displayName; }
set { this.displayName = value; }
}
/// <summary>
/// Gets or Sets the last activity of the client
/// </summary>
        public DateTime LastActivity
{
get { return this.lastActivity; }
set { this.lastActivity = value; }
}
/// <summary>
/// Gets or Sets the ConnectionIdleSections property which is the number of seconds a connection will remain
/// alive for without being connected to a client, after this time has expired the client will
/// be removed from the state manager
/// </summary>
        public int ConnectionIdleSeconds
{
get { return this.connectionIdleSeconds; }
set { this.connectionIdleSeconds = value; }
}
/// <summary>
/// Gets or Sets the ConnectionTimeOutSections property which is the number of seconds a connection will remain
/// alive for whilst being connected to a client, but without receiving any messages.  After a timeout has expired
/// A client should restablish a connection to the server
/// </summary>
        public int ConnectionTimeoutSeconds
{
get { return this.connectionTimeoutSeconds; }
set { this.connectionTimeoutSeconds = value; }
}
}
}
           

ConnectionIdleSeconds:用来设置连接线程,当connection断线后,后台Thread的存活时间。

ConnectionTimeoutSeconds:客户端的超时时间,当超过时间后,客户端重新连接服务器。

ps:有这两个属性后,基本上完成了客户端连接的控制,超时重连接,无连接时杀死后台线程。

ICometStateProvider接口:

     ICometStateProvider接口直接被CometStateMessager建立,这样的好处是实例化CometStateMessager对象后,CometStateMessager对象可以直接调用ICometStateProvider接口的实现,实际上实现了Adapter的方式,我们可以定制不同的InProcCometStateProvider类(在下面会提到)来定制自己的接口。

      ICometStateProvider接口类提供了如下几个接口。

      InitializeClient:提供ComentClient的初始化操作。

      GetMessages:得到一个Messages的消息队列。

      SendMessage:发送Message数据。

      SendMessage:可以针对Client name来进行消息发布(私人会话)。

      GetCometClient:返回一个Client。

      KillIdleCometClient:杀掉一个无效的Client对象。

      ps:当然这个接口是可以被拓展的,而且实现起来非常简单。得到Client队列信息,得到Client状态等等。甚至你可以想象一些更复杂的应用(比如加入一个流媒体消息……)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// This interface can be implemented to provide a custom state provider
/// for the CometStateManager class.  Typical examples may be using SqlServer
/// to enable the operation over a server farm
/// </summary>
    public interface ICometStateProvider
{
/// <summary>
/// Implementation of this method should store the cometClient instance in some sort
/// of cache (eg Memory, Db etc..)
/// </summary>
/// <param name="cometClient"></param>
        void InitializeClient(CometClient cometClient);
/// <summary>
/// Imeplementation of this method should return all the messages that are queued
/// for a specific client, it is only interested in messages that have a greater id than
/// lastMessageId
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <param name="lastMessageId"></param>
/// <returns></returns>
        CometMessage[] GetMessages(string clientPrivateToken, long lastMessageId);
/// <summary>
/// Implementation of this method should queue a message for the specific client
/// </summary>
/// <param name="clientPublicToken"></param>
/// <param name="name"></param>
/// <param name="contents"></param>
        void SendMessage(string clientPublicToken, string name, object contents);
/// <summary>
/// Implementation of this method should queue a message for all the clients
/// </summary>
/// <param name="name"></param>
/// <param name="contents"></param>
        void SendMessage(string name, object contents);
/// <summary>
/// Implementation of this method should return a specific comet client
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <returns></returns>
        CometClient GetCometClient(string clientPrivateToken);
/// <summary>
/// Implementation of this method should remove a client from the cache
/// </summary>
/// <param name="clientPrivateToken"></param>
        void KillIdleCometClient(string clientPrivateToken);
}
}
           

InProcCometStateProvider类:

      InProcCometStateProvider类实现了ICometStateProvider接口,并且提供了一个很好的范例,针对这个类,我们可以想象很多很好的拓展,诸如调用AO组件,封装报警信息等等。

      InProcCometStateProvider类包含类一个私有的内部类InProcCometStateProvider,实际上可以理解为一种对消息的简单封装,设计的时候考虑到需要关联CometClient和Message,其实也可以单独作为一个外部类来设计。不过Adapter本身不应该太多关联类,这样做也是权衡了一些拓展上的需求。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MethodWorx.AspNetComet.Core
{
/// <summary>
/// Class InProcCometStateProvider
///
/// This class provides an implementation of ICometStateProvider that keeps the
/// information in memory.  This provider is not scalable as it will not run on a server
/// farm but demonstrates how you should implemement the provider.
/// </summary>
    public class InProcCometStateProvider : ICometStateProvider
{
/// <summary>
/// Private class which holds the state of each connected client
/// </summary>
        private class InProcCometClient
{
public CometClient CometClient;
public Dictionary<long, CometMessage> Messages = new Dictionary<long, CometMessage>();
public long NextMessageId = 1;
}
/// <summary>
/// Cache of clients
/// </summary>
        private Dictionary<string, InProcCometClient> publicClients = new Dictionary<string, InProcCometClient>();
private Dictionary<string, InProcCometClient> privateClients = new Dictionary<string, InProcCometClient>();
private static object state = new object();
#region ICometStateProvider Members
/// <summary>
/// Store the new client in memory
/// </summary>
/// <param name="cometClient"></param>
        public void InitializeClient(CometClient cometClient)
{
if (cometClient == null)
throw new ArgumentNullException("cometClient");
lock (state)
{
//  ok, ensure we dont already exist
                if (publicClients.ContainsKey(cometClient.PublicToken) || privateClients.ContainsKey(cometClient.PrivateToken))
throw CometException.CometClientAlreadyExistsException();
InProcCometClient inProcCometClient = new InProcCometClient()
{
CometClient = cometClient
};
//  stick the client int he arrays
//  ready to be used
                publicClients.Add(cometClient.PublicToken, inProcCometClient);
privateClients.Add(cometClient.PrivateToken, inProcCometClient);
}
//  ok, they are in there ready to be used
        }
/// <summary>
/// Get the messages for a specific client
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <param name="lastMessageId"></param>
/// <returns></returns>
        public CometMessage[] GetMessages(string clientPrivateToken, long lastMessageId)
{
if(string.IsNullOrEmpty(clientPrivateToken))
throw new ArgumentNullException("clientPrivateToken");
lock (state)
{
if (!privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//
//  ok, get the client
                InProcCometClient cometClient = privateClients[clientPrivateToken];
List<long> toDelete = new List<long>();
List<long> toReturn = new List<long>();
//  wicked, we have the client, so we can get its messages from our list
//  we delete any before the last messageId becuase we dont want them
                foreach(long key in cometClient.Messages.Keys)
{
if(key <= lastMessageId)
toDelete.Add(key);
else
toReturn.Add(key);
}
//  delete the ones from the messages
                foreach (long key in toDelete)
{
cometClient.Messages.Remove(key);
}
//  and return the ones in the toReturn array
                List<CometMessage> cometMessages = new List<CometMessage>();
foreach (long key in toReturn)
{
cometMessages.Add(cometClient.Messages[key]);
}
return cometMessages.ToArray();
}
}
/// <summary>
/// Send a message to a specific client
/// </summary>
/// <param name="clientPublicToken"></param>
/// <param name="name"></param>
/// <param name="contents"></param>
        public void SendMessage(string clientPublicToken, string name, object contents)
{
if (string.IsNullOrEmpty(clientPublicToken))
throw new ArgumentNullException("clientPublicToken");
if (contents == null)
throw new ArgumentNullException("contents");
lock (state)
{
if (!publicClients.ContainsKey(clientPublicToken))
throw CometException.CometClientDoesNotExistException();
//
//  ok, get the client
                InProcCometClient cometClient = publicClients[clientPublicToken];
// ok, stick the message in the array
                CometMessage message = new CometMessage();
message.Contents = contents;
message.Name = name;
message.MessageId = cometClient.NextMessageId;
//  increment
                cometClient.NextMessageId++;
cometClient.Messages.Add(message.MessageId, message);
}
}
/// <summary>
/// Send a message to all the clients
/// </summary>
/// <param name="name"></param>
/// <param name="contents"></param>
        public void SendMessage(string name, object contents)
{
if (contents == null)
throw new ArgumentNullException("contents");
lock (state)
{
foreach (InProcCometClient cometClient in publicClients.Values)
{
// ok, stick the message in the array
                    CometMessage message = new CometMessage();
message.Contents = contents;
message.Name = name;
message.MessageId = cometClient.NextMessageId;
//  increment
                    cometClient.NextMessageId++;
cometClient.Messages.Add(message.MessageId, message);
}
}
}
/// <summary>
/// Get the client from the state provider
/// </summary>
/// <param name="clientPrivateToken"></param>
/// <returns></returns>
        public CometClient GetCometClient(string clientPrivateToken)
{
if (!this.privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//  return the client private token
            return this.privateClients[clientPrivateToken].CometClient;
}
/// <summary>
/// Remove an idle client from the memory
/// </summary>
/// <param name="clientPrivateToken"></param>
        public void KillIdleCometClient(string clientPrivateToken)
{
if (!this.privateClients.ContainsKey(clientPrivateToken))
throw CometException.CometClientDoesNotExistException();
//  get the client
            InProcCometClient ipCometClient = this.privateClients[clientPrivateToken];
//  and remove the dictionarys
            this.privateClients.Remove(ipCometClient.CometClient.PrivateToken);
this.publicClients.Remove(ipCometClient.CometClient.PublicToken);
}
#endregion
}
}
           

继续阅读