天天看點

異構SOA系統架構之Asp.net實作(相容dubbo)

原文: 異構SOA系統架構之Asp.net實作(相容dubbo)

我們公司技術部門情況比較複雜,分到多個集團,每個集團又可能分為幾個部門,每個部門又可能分為多個小組,組織架構比較複雜,開發人員比較多。

使用的程式設計語言也有點複雜,主流語言有.net(C#)、Java、PHP等。

是以SOA架構需要的是異構SOA。

有的同學可能說這個簡單嗎?“把部門合并扁平化合并為一個團隊,把語言統一一種,要麼.net要麼Java。”

其實這樣的簡單粗暴并不能很好的解決問題的

首先公司組織架構就是不能随便修改的,一個公司的組織架構就是服務于這個公司的經營理念和營銷模式,技術部門是服務機構并不直接産生價值,技術部門架構和公司組織架構高度一緻能帶來業務的高效性。

其次多語言技術體系也有其可取性

      某個項目哪種語言能做的更快更好就用哪種語言

      哪種語言的程式員好招,就多招一些,能在各種技術方向的變化中立于不敗之地

現在繼續說SOA,說起公司對SOA選型對于.net程式員開始還是一件挺悲催的事情,因為公司選的是dubbo

dubbo是阿裡巴巴公司開源的一個高性能優秀的服務架構,說它是個偉大的開源項目并不為過,在很多網際網路公司都有運用。

但是,dubbo是個Java項目,.net程式員就悲催了

為了更好的支援多語言的異構系統現狀,具體選型是dubbox+ZooKeeper+Thrift,其中Thrift是facebook開發的高效RPC,支援語言非常多, C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml等。

有了Thrift,.net程式員的“春天”是不是就來了呢?

還是挺悲催,Java程式員幾乎不用寫額外代碼配置一下就可以調用SOA服務或者釋出服務,.net程式員要自己維護和ZooKeeper的通信和Thrift通信及日志統計和報送。

.net程式員苦不堪言,有些人質疑SOA選型(對.net程式員不公平),有些人"喊"着要.net程式員使用其他架構單幹 ......

後來機緣巧合,.net的SOA這個事情就落在我的身上

上司把這個任務交給我的時候,我輕松的說沒問題,但是時間證明這個事情比我原來想象的複雜得多,我也走了一些彎路,有過一些不太現實的想法,最終還是有了一個比較滿意的結果

一、先說ZooKeeper

1、ZooKeeper是開源項目,其原理和作用這裡不說,自行度娘

2、ZooKeeper的.net用戶端使用nuget就可以安裝使用

異構SOA系統架構之Asp.net實作(相容dubbo)

ZooKeeper用戶端庫也有很多開源項目支援,我這裡選的是Apache基金會的官方版本

3、本地啟動一個ZooKeeper來測試

異構SOA系統架構之Asp.net實作(相容dubbo)

ZooKeeper服務是Java開發的,ZooKeeper是個非常優秀的中間件,使用.net和Java調用差別并不大

這裡檢視ZooKeeper的工具也是Java開發的ZooInspector,正式環境我們有專門的背景來管理,本地調試ZooInspector就夠用了。

二、再說Thrift

1、Thrift也是開源項目,其原理和作用這裡不說,自行度娘

2、Thrift的.net庫使用nuget就可以安裝使用

異構SOA系統架構之Asp.net實作(相容dubbo)

Thrift用戶端庫也有很多開源項目支援,我這裡還是選Apache基金會的官方版本

三、使用.net開發一個HelloWord服務

1、按Thrift的IDL規範定義接口Thrift檔案

namespace java SOATest
namespace csharp SOATest
namespace php SOATest

service  HelloWorldService {
  string sayHello(1:string username)
}      

注:Thrift規範還是自行度娘

2、使用Thrift.exe生成代碼

異構SOA系統架構之Asp.net實作(相容dubbo)

Thrift.exe使用方法可以使用Thrift的help指令檢視,最好的方法還是自行度娘

異構SOA系統架構之Asp.net實作(相容dubbo)

3、到gen-csharp中找到剛生成的代碼複制到項目中使用

異構SOA系統架構之Asp.net實作(相容dubbo)
異構SOA系統架構之Asp.net實作(相容dubbo)
/**
 * Autogenerated by Thrift Compiler (0.9.3)
 *
 * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
 *  @generated
 */
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.IO;
using Thrift;
using Thrift.Collections;
using System.Runtime.Serialization;
using Thrift.Protocol;
using Thrift.Transport;

namespace SOATest
{
  public partial class HelloWorldService {
    public interface Iface {
      string sayHello(string username);
      #if SILVERLIGHT
      IAsyncResult Begin_sayHello(AsyncCallback callback, object state, string username);
      string End_sayHello(IAsyncResult asyncResult);
      #endif
    }

    public class Client : IDisposable, Iface {
      public Client(TProtocol prot) : this(prot, prot)
      {
      }

      public Client(TProtocol iprot, TProtocol oprot)
      {
        iprot_ = iprot;
        oprot_ = oprot;
      }

      protected TProtocol iprot_;
      protected TProtocol oprot_;
      protected int seqid_;

      public TProtocol InputProtocol
      {
        get { return iprot_; }
      }
      public TProtocol OutputProtocol
      {
        get { return oprot_; }
      }


      #region " IDisposable Support "
      private bool _IsDisposed;

      // IDisposable
      public void Dispose()
      {
        Dispose(true);
      }
      

      protected virtual void Dispose(bool disposing)
      {
        if (!_IsDisposed)
        {
          if (disposing)
          {
            if (iprot_ != null)
            {
              ((IDisposable)iprot_).Dispose();
            }
            if (oprot_ != null)
            {
              ((IDisposable)oprot_).Dispose();
            }
          }
        }
        _IsDisposed = true;
      }
      #endregion


      
      #if SILVERLIGHT
      public IAsyncResult Begin_sayHello(AsyncCallback callback, object state, string username)
      {
        return send_sayHello(callback, state, username);
      }

      public string End_sayHello(IAsyncResult asyncResult)
      {
        oprot_.Transport.EndFlush(asyncResult);
        return recv_sayHello();
      }

      #endif

      public string sayHello(string username)
      {
        #if !SILVERLIGHT
        send_sayHello(username);
        return recv_sayHello();

        #else
        var asyncResult = Begin_sayHello(null, null, username);
        return End_sayHello(asyncResult);

        #endif
      }
      #if SILVERLIGHT
      public IAsyncResult send_sayHello(AsyncCallback callback, object state, string username)
      #else
      public void send_sayHello(string username)
      #endif
      {
        oprot_.WriteMessageBegin(new TMessage("sayHello", TMessageType.Call, seqid_));
        sayHello_args args = new sayHello_args();
        args.Username = username;
        args.Write(oprot_);
        oprot_.WriteMessageEnd();
        #if SILVERLIGHT
        return oprot_.Transport.BeginFlush(callback, state);
        #else
        oprot_.Transport.Flush();
        #endif
      }

      public string recv_sayHello()
      {
        TMessage msg = iprot_.ReadMessageBegin();
        if (msg.Type == TMessageType.Exception) {
          TApplicationException x = TApplicationException.Read(iprot_);
          iprot_.ReadMessageEnd();
          throw x;
        }
        sayHello_result result = new sayHello_result();
        result.Read(iprot_);
        iprot_.ReadMessageEnd();
        if (result.__isset.success) {
          return result.Success;
        }
        throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "sayHello failed: unknown result");
      }

    }
    public class Processor : TProcessor {
      public Processor(Iface iface)
      {
        iface_ = iface;
        processMap_["sayHello"] = sayHello_Process;
      }

      protected delegate void ProcessFunction(int seqid, TProtocol iprot, TProtocol oprot);
      private Iface iface_;
      protected Dictionary<string, ProcessFunction> processMap_ = new Dictionary<string, ProcessFunction>();

      public bool Process(TProtocol iprot, TProtocol oprot)
      {
        try
        {
          TMessage msg = iprot.ReadMessageBegin();
          ProcessFunction fn;
          processMap_.TryGetValue(msg.Name, out fn);
          if (fn == null) {
            TProtocolUtil.Skip(iprot, TType.Struct);
            iprot.ReadMessageEnd();
            TApplicationException x = new TApplicationException (TApplicationException.ExceptionType.UnknownMethod, "Invalid method name: '" + msg.Name + "'");
            oprot.WriteMessageBegin(new TMessage(msg.Name, TMessageType.Exception, msg.SeqID));
            x.Write(oprot);
            oprot.WriteMessageEnd();
            oprot.Transport.Flush();
            return true;
          }
          fn(msg.SeqID, iprot, oprot);
        }
        catch (IOException)
        {
          return false;
        }
        return true;
      }

      public void sayHello_Process(int seqid, TProtocol iprot, TProtocol oprot)
      {
        sayHello_args args = new sayHello_args();
        args.Read(iprot);
        iprot.ReadMessageEnd();
        sayHello_result result = new sayHello_result();
        result.Success = iface_.sayHello(args.Username);
        oprot.WriteMessageBegin(new TMessage("sayHello", TMessageType.Reply, seqid)); 
        result.Write(oprot);
        oprot.WriteMessageEnd();
        oprot.Transport.Flush();
      }

    }


    #if !SILVERLIGHT
    [Serializable]
    #endif
    public partial class sayHello_args : TBase
    {
      private string _username;

      public string Username
      {
        get
        {
          return _username;
        }
        set
        {
          __isset.username = true;
          this._username = value;
        }
      }


      public Isset __isset;
      #if !SILVERLIGHT
      [Serializable]
      #endif
      public struct Isset {
        public bool username;
      }

      public sayHello_args() {
      }

      public void Read (TProtocol iprot)
      {
        iprot.IncrementRecursionDepth();
        try
        {
          TField field;
          iprot.ReadStructBegin();
          while (true)
          {
            field = iprot.ReadFieldBegin();
            if (field.Type == TType.Stop) { 
              break;
            }
            switch (field.ID)
            {
              case 1:
                if (field.Type == TType.String) {
                  Username = iprot.ReadString();
                } else { 
                  TProtocolUtil.Skip(iprot, field.Type);
                }
                break;
              default: 
                TProtocolUtil.Skip(iprot, field.Type);
                break;
            }
            iprot.ReadFieldEnd();
          }
          iprot.ReadStructEnd();
        }
        finally
        {
          iprot.DecrementRecursionDepth();
        }
      }

      public void Write(TProtocol oprot) {
        oprot.IncrementRecursionDepth();
        try
        {
          TStruct struc = new TStruct("sayHello_args");
          oprot.WriteStructBegin(struc);
          TField field = new TField();
          if (Username != null && __isset.username) {
            field.Name = "username";
            field.Type = TType.String;
            field.ID = 1;
            oprot.WriteFieldBegin(field);
            oprot.WriteString(Username);
            oprot.WriteFieldEnd();
          }
          oprot.WriteFieldStop();
          oprot.WriteStructEnd();
        }
        finally
        {
          oprot.DecrementRecursionDepth();
        }
      }

      public override string ToString() {
        StringBuilder __sb = new StringBuilder("sayHello_args(");
        bool __first = true;
        if (Username != null && __isset.username) {
          if(!__first) { __sb.Append(", "); }
          __first = false;
          __sb.Append("Username: ");
          __sb.Append(Username);
        }
        __sb.Append(")");
        return __sb.ToString();
      }

    }


    #if !SILVERLIGHT
    [Serializable]
    #endif
    public partial class sayHello_result : TBase
    {
      private string _success;

      public string Success
      {
        get
        {
          return _success;
        }
        set
        {
          __isset.success = true;
          this._success = value;
        }
      }


      public Isset __isset;
      #if !SILVERLIGHT
      [Serializable]
      #endif
      public struct Isset {
        public bool success;
      }

      public sayHello_result() {
      }

      public void Read (TProtocol iprot)
      {
        iprot.IncrementRecursionDepth();
        try
        {
          TField field;
          iprot.ReadStructBegin();
          while (true)
          {
            field = iprot.ReadFieldBegin();
            if (field.Type == TType.Stop) { 
              break;
            }
            switch (field.ID)
            {
              case 0:
                if (field.Type == TType.String) {
                  Success = iprot.ReadString();
                } else { 
                  TProtocolUtil.Skip(iprot, field.Type);
                }
                break;
              default: 
                TProtocolUtil.Skip(iprot, field.Type);
                break;
            }
            iprot.ReadFieldEnd();
          }
          iprot.ReadStructEnd();
        }
        finally
        {
          iprot.DecrementRecursionDepth();
        }
      }

      public void Write(TProtocol oprot) {
        oprot.IncrementRecursionDepth();
        try
        {
          TStruct struc = new TStruct("sayHello_result");
          oprot.WriteStructBegin(struc);
          TField field = new TField();

          if (this.__isset.success) {
            if (Success != null) {
              field.Name = "Success";
              field.Type = TType.String;
              field.ID = 0;
              oprot.WriteFieldBegin(field);
              oprot.WriteString(Success);
              oprot.WriteFieldEnd();
            }
          }
          oprot.WriteFieldStop();
          oprot.WriteStructEnd();
        }
        finally
        {
          oprot.DecrementRecursionDepth();
        }
      }

      public override string ToString() {
        StringBuilder __sb = new StringBuilder("sayHello_result(");
        bool __first = true;
        if (Success != null && __isset.success) {
          if(!__first) { __sb.Append(", "); }
          __first = false;
          __sb.Append("Success: ");
          __sb.Append(Success);
        }
        __sb.Append(")");
        return __sb.ToString();
      }

    }

  }
}      

HelloWorldService

     注:強烈建議大家别去修改Thrift生成的代碼

4、建立類實作生成代碼的服務接口(實際邏輯調用類)

    實作接口HelloWorldService.Iface

public class HelloWorldImp : HelloWorldService.Iface
    {
        public string sayHello(string username)
        {
            if (string.IsNullOrWhiteSpace(username))
                return null;
            string msg = string.Concat("Hello ", username);
            Console.WriteLine(msg);
            return msg;
        }
    }      

5、釋出并注冊服務到ZooKeeper

public class ServeTest
    {
        public static void Test()
        {
            ZKConsumer zooKeeper = ZKInit();
            string serviceName = "com.fang.HelloWorld$Iface";//服務名
            HelloWorldService.Iface service = new HelloWorldImp();//服務實作邏輯
            string serviceIp = "192.168.109.166";//釋出服務使用ip
            int servicePort = 5000;//釋出服務使用端口
            string group = "kg";//應用程式分組
            string serviceVersion = "1.0.0";//服務版本
            int serviceTimeOut = 5000; //服務逾時門檻值(機關Millisecond)
            int alertElapsed = 3000; //服務執行耗時監控報警門檻值(機關Millisecond)
            int alertFailure = 10; //服務每分鐘出錯次數監控報警門檻值
            //注冊并釋出服務
            zooKeeper.RegistService<HelloWorldService.Iface>(serviceName, service, serviceIp, servicePort, group, serviceVersion, serviceTimeOut, alertElapsed, alertFailure);
        }
        /// <summary>
        /// 初始化zooKeeper
        /// </summary>
        /// <returns></returns>
        private static ZKConsumer ZKInit()
        {
            ZKConsumer zooKeeper = new ZKConsumer();
            zooKeeper.Connectstring = "192.168.109.166:2181";
            zooKeeper.Logger = Fang.Log.Loger.CreateDayLog("ServeTest");
            zooKeeper.Init();
            return zooKeeper;
        }
    }      

注:其中ZKConsumer就是我定義的和ZooKeeper互動的類,也幾乎是.net SOA直接互動的唯一一個類,使用起來是不是非常簡單,其實實作還是比較複雜的,随後再說

6、啟動服務看一下

6.1 其實執行時候就是開了一個socket監聽,很簡單

異構SOA系統架構之Asp.net實作(相容dubbo)

6.2 看一下日志資訊

11:46:21
ZooKeeper Init
11:46:21
ZooKeeper Connect
11:46:21
ZK觸發了None事件(path:)!
11:46:21
ZooKeeper CONNECTED
11:46:21
Collecter Start
11:46:21
Consumer Subcribe:/dubbo/com.alibaba.dubbo.monitor.gen.thrift.MonitorService%24Iface/providers
11:46:22
Collecter Run
11:46:22
Collecter OnFail
11:46:27
Collecter Run
11:46:27
Collecter OnFail      

以上是日志檔案,有ZooKeeper連接配接資訊和訂閱日志收集服務資訊及日志收集資訊

日志收集是一個線程排程,由于還沒有連接配接沒有日志,是以Collecter都是Fail

6.3 看一下ZooKeeper的變化

異構SOA系統架構之Asp.net實作(相容dubbo)

ZooKeeper在dubbo節點下多出了一個節點"com.fang.HelloWorld%24Iface"及其多個子節點,其中重點是其providers子節點下有一個很長的節點,那個節點就是表示目前服務資訊的,如果服務關閉,這個資訊也會消失

都在dubbo下不難了解,因為我們選型就是dubbo,.net要相容dubbo的一些特性

四、做個用戶端來調用HelloWorld服務

1、使用Thrift.exe生成代碼并複制到項目中

  服務端和用戶端生成代碼是沒有差別的,這個就不再展開,需要再了解參考服務端生成代碼部分

2、調用調用HelloWorld服務的源碼

異構SOA系統架構之Asp.net實作(相容dubbo)
異構SOA系統架構之Asp.net實作(相容dubbo)
public class HelloWorldTest
    {
        public static void Test()
        {
            ZKConsumer zooKeeper = ZKInit();
            Subcribe(zooKeeper);//訂閱com.fang.HelloWorld
            string str = null;
            do
            {
                str = Console.ReadLine();
                if (string.Equals(str, "Exit", StringComparison.CurrentCultureIgnoreCase))
                    return;
                Console.WriteLine("callDemo");
                CallService();//調用服務
            } while (true);
        }
        /// <summary>
        /// 訂閱AskSearchService
        /// </summary>
        /// <param name="zooKeeper"></param>
        private static void Subcribe(ZKConsumer zooKeeper)
        {
            string serviceName = "com.fang.HelloWorld$Iface";//服務名
            string serviceGroup = "kg";//服務分組
            string serviceVersion = "1.0.0.0";//服務版本
            int serviceTimeOut = 5000; //服務逾時門檻值(機關Millisecond)
            int alertElapsed = 3000; //服務執行耗時監控報警門檻值(機關Millisecond)
            int alertFailure = 10; //服務每分鐘出錯次數監控報警門檻值
            //訂閱服務
            bool state = zooKeeper.SubcribeService<HelloWorldService.Iface>(serviceName, serviceGroup, serviceVersion, serviceTimeOut, alertElapsed, alertFailure);
            Console.WriteLine(string.Concat("SubcribeService(", serviceName, ") is ", state.ToString()));
        }
        /// <summary>
        /// 初始化zooKeeper
        /// </summary>
        /// <returns></returns>
        private static ZKConsumer ZKInit()
        {
            ZKConsumer zooKeeper = new ZKConsumer();
            zooKeeper.Connectstring = "192.168.109.166:2181";
            zooKeeper.Logger = Fang.Log.Loger.CreateDayLog("HelloWorldTest");
            zooKeeper.Init();
            return zooKeeper;
        }
        /// <summary>
        /// 調用服務
        /// </summary>
        private static void CallService()
        {
            using (var resource = ZKConsumer.GetServiceByContainer<HelloWorldService.Iface>())
            {
                HelloWorldService.Iface service = resource.Service;
                if (service == null)
                    Console.WriteLine("service is null");
                string results = null;
                try
                {
                    results = service.sayHello("Word");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
                if (results != null)
                    Console.WriteLine(results.ToString());
            }
        }
    }      

HelloWorldTest

   注:以上看上去洋洋灑灑幾十行,貌似很複雜,其實不然

3、以上代碼解析

 2.1 初始化和訂閱服務

  ZKInit是初始化ZooKeeper的,很簡單

  Subcribe是訂閱服務,看上去很複雜,其實就是一行代碼,隻是為了便于了解拆分寫成這樣 

  以上初始化對于每個應用程式都隻需要一次

    web應用程式(站點)可以在Global.asax的Application_Start中初始化一次,也可以配置一個IHttpModule來初始化(在Init)

    控制台和windows服務在Main方法中的開始部分初始化即可

   2.2 Test是便于測試寫了一個while循環,實際開發可以無視

   2.3 CallService是實際調用服務代碼

    核心就是一個using及其中的GetServiceByContainer方法,及調用sayHello方法,其他都是安全檢測異常處理測試代碼,算下來核心代碼也就是兩三行

    應該說還是挺簡單的吧,當然沒有java同學用dubbo簡單,但至少這裡封裝了ZooKeeper、Thrift和日志等。讓大家盡量少和業務無關的東西打交道

4、運作測試一下

4.1 執行之後效果如下

異構SOA系統架構之Asp.net實作(相容dubbo)

4.2 看一下ZooKeeper的變化

異構SOA系統架構之Asp.net實作(相容dubbo)

這次在consumers下增加了一個很長的節點,證明用戶端和服務端都和ZooKeeper連接配接上了

5、調用幾次試試

5.1 用戶端截圖

異構SOA系統架構之Asp.net實作(相容dubbo)

5.2 服務端截圖

異構SOA系統架構之Asp.net實作(相容dubbo)

以上測試證明是服務端和用戶端通信沒有問題

實際上我和Java的同僚也聯測了,Java調用.net的服務也沒有問題,.net調用dubbo(Java)的服務也沒有問題

另外,多個服務端和多個用戶端也是測試通過了,限于篇幅這個就不再舉例

五、主要源碼解析

1、項目截圖

異構SOA系統架構之Asp.net實作(相容dubbo)

2、逐個解析一下

  2.1 ApplicationInfo很簡單就是讀取一些應用程式配置資訊

        AppSettings["ZooKeeperConnectstring"]是ZooKeeper連接配接位址

    AppSettings["ApplicationName"]是應用程式名用來程式定位及服務依賴關系圖繪制

        AppSettings["ApplicationOwner"]是項目負責人等

     2.2 Collecter收集日志的邏輯及其線程排程

     2.3 Connecter用來連接配接ZooKeeper及維護ZooKeeper連接配接(重連)

     2.4 ConsumAop是客戶AOP攔截,記錄日志到隊列

     2.5 Consumer用來客戶服務訂閱及服務路由管理

     2.6 HostStat用于服務主機資訊解析

     2.7 ISubcribe是ZooKeeper訂閱接口

     2.8 MethodReport是方法執行日志

     2.9 Monitor是日志定時報送作業(線程排程)

     2.10 Provider用于服務端啟動Socket服務及注冊到ZooKeeper

     2.11 ReportAopHandler是方法執行Aop攔截,用于服務端執行攔截,ConsumAop繼承該類

     2.12 ReportStorage日志存儲器,并維護一個Collecter和一個Monitor線程排程

     2.13 ServiceConcurrent用于服務執行并發統計(用戶端及服務端)

     2.14 ServiceConfig是用戶端和服務端的公共配置

     2.15 ServiceFactor是用戶端服務工廠及Socket連接配接池調用

     2.16 ServiceHost是單個服務主機的Socket工廠及連接配接池

     2.17 ServiceHostManager用于服務主機叢集管理

     2.18 ServiceResource用于服務資源管理,現在用于回收Socket主機(以後可能要做成服務對象也可以回收)

     2.19 ServiceSocket,本來用于維護Socket連接配接,重連的,現在隻是Socket包裝類

     2.20 Statistics用于日志統計彙總

     2.21 ZKConsumer是ZooKeeper消費者,用來維護ZooKeeper連接配接及基于ZooKeeper的功能

     注:另外Aop、容器、資源池。線程排程、類型轉化等來源于面向接口主架構

     以上功能雖然可以達到.net使用和dubbo相容的服務功能,但是離dubbo在功能和穩定性上還有差距,這個建設過程需要持續下去

    最後,我暢想到一個夢境。一個.net小夥深情的望着Java小姑娘說,我做好準備了,我們做朋友吧。Java小姑娘點點頭。此時響起了優美的華爾茲。.net和Java手拉手在舞池裡翩翩起舞...