天天看點

分布式事務架構 seata-golang 通信模型詳解

Java 的世界裡,大家廣泛使用的一個高性能網絡通信架構 netty,很多 RPC 架構都是基于 netty 來實作的。在 golang 的世界裡,getty 也是一個類似 netty 的高性能網絡通信庫。getty 最初由 dubbogo 項目負責人于雨開發,作為底層通信庫在 dubbo-go 中使用。随着 dubbo-go 捐獻給 apache 基金會,在社群小夥伴的共同努力下,getty 也最終進入到 apache 這個大家庭,并改名 dubbo-getty 。

分布式事務架構 seata-golang 通信模型詳解
作者 | 劉曉敏 于雨

一、簡介

18 年的時候,我在公司裡實踐微服務,當時遇到最大的問題就是分布式事務問題。同年,阿裡在社群開源他們的分布式事務解決方案,我也很快關注到這個項目,起初還叫 fescar,後來更名 seata。由于我對開源技術很感興趣,加了很多社群群,當時也很關注 dubbo-go 這個項目,在裡面默默潛水。随着對 seata 的了解,逐漸萌生了做一個 go 版本的分布式事務架構的想法。

要做一個 golang 版的分布式事務架構,首要的一個問題就是如何實作 RPC 通信。dubbo-go 就是很好的一個例子擺在眼前,遂開始研究 dubbo-go 的底層 getty。

二、如何基于 getty 實作 RPC 通信

getty 架構的整體模型圖如下:

分布式事務架構 seata-golang 通信模型詳解

下面結合相關代碼,詳述 seata-golang 的 RPC 通信過程。

1. 建立連接配接

實作 RPC 通信,首先要建立網絡連接配接吧,我們從 client.go 開始看起。

func (c *client) connect() {
	var (
		err error
		ss  Session
	)

	for {
        // 建立一個 session 連接配接
		ss = c.dial()
		if ss == nil {
			// client has been closed
			break
		}
		err = c.newSession(ss)
		if err == nil {
            // 收發封包
			ss.(*session).run()
			// 此處省略部分代碼
      
			break
		}
		// don't distinguish between tcp connection and websocket connection. Because
		// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
		ss.Conn().Close()
	}
}
           

connect()

方法通過

dial()

方法得到了一個 session 連接配接,進入 dial() 方法:

func (c *client) dial() Session {
	switch c.endPointType {
	case TCP_CLIENT:
		return c.dialTCP()
	case UDP_CLIENT:
		return c.dialUDP()
	case WS_CLIENT:
		return c.dialWS()
	case WSS_CLIENT:
		return c.dialWSS()
	}

	return nil
}
           

我們關注的是 TCP 連接配接,是以繼續進入

c.dialTCP()

方法:

func (c *client) dialTCP() Session {
	var (
		err  error
		conn net.Conn
	)

	for {
		if c.IsClosed() {
			return nil
		}
		if c.sslEnabled {
			if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
				d := &net.Dialer{Timeout: connectTimeout}
				// 建立加密連接配接
				conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
			}
		} else {
            // 建立 tcp 連接配接
			conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
		}
		if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
			conn.Close()
			err = errSelfConnect
		}
		if err == nil {
            // 傳回一個 TCPSession
			return newTCPSession(conn, c)
		}

		log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
		<-wheel.After(connectInterval)
	}
}
           

至此,我們知道了 getty 如何建立 TCP 連接配接,并傳回 TCPSession。

2. 收發封包

那它是怎麼收發封包的呢,我們回到 connection 方法接着往下看,有這樣一行

ss.(*session).run()

,在這行代碼之後代碼都是很簡單的操作,我們猜測這行代碼運作的邏輯裡面一定包含收發封包的邏輯,接着進入

run()

func (s *session) run() {
	// 省略部分代碼
  
	go s.handleLoop()
	go s.handlePackage()
}
           

這裡起了兩個 goroutine,

handleLoop

handlePackage

,看字面意思符合我們的猜想,進入

handleLoop()

func (s *session) handleLoop() {
    // 省略部分代碼
  
	for {
		// A select blocks until one of its cases is ready to run.
		// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
		select {
		// 省略部分代碼
      
		case outPkg, ok = <-s.wQ:
			// 省略部分代碼

			iovec = iovec[:0]
			for idx := 0; idx < maxIovecNum; idx++ {
        // 通過 s.writer 将 interface{} 類型的 outPkg 編碼成二進制的比特
				pkgBytes, err = s.writer.Write(s, outPkg)
				// 省略部分代碼
        
				iovec = append(iovec, pkgBytes)

                //省略部分代碼
			}
            // 将這些二進制比特發送出去
			err = s.WriteBytesArray(iovec[:]...)
			if err != nil {
				log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
					s.sessionToken(), len(iovec), perrors.WithStack(err))
				s.stop()
				// break LOOP
				flag = false
			}

		case <-wheel.After(s.period):
			if flag {
				if wsFlag {
					err := wsConn.writePing()
					if err != nil {
						log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
					}
				}
                // 定時執行的邏輯,心跳等
				s.listener.OnCron(s)
			}
		}
	}
}
           

通過上面的代碼,我們不難發現,

handleLoop()

方法處理的是發送封包的邏輯,RPC 需要發送的消息首先由

s.writer

編碼成二進制比特,然後通過建立的 TCP 連接配接發送出去。這個

s.writer

對應的 Writer 接口是 RPC 架構必須要實作的一個接口。

繼續看

handlePackage()

func (s *session) handlePackage() {
    // 省略部分代碼

	if _, ok := s.Connection.(*gettyTCPConn); ok {
		if s.reader == nil {
			errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
			log.Error(errStr)
			panic(errStr)
		}

		err = s.handleTCPPackage()
	} else if _, ok := s.Connection.(*gettyWSConn); ok {
		err = s.handleWSPackage()
	} else if _, ok := s.Connection.(*gettyUDPConn); ok {
		err = s.handleUDPPackage()
	} else {
		panic(fmt.Sprintf("unknown type session{%#v}", s))
	}
}
           

進入

handleTCPPackage()

func (s *session) handleTCPPackage() error {
    // 省略部分代碼

	conn = s.Connection.(*gettyTCPConn)
	for {
		// 省略部分代碼

		bufLen = 0
		for {
			// for clause for the network timeout condition check
			// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
            // 從 TCP 連接配接中收到封包
			bufLen, err = conn.recv(buf)
			// 省略部分代碼
      
			break
		}
		// 省略部分代碼
    
        // 将收到的封包二進制比特寫入 pkgBuf
		pktBuf.Write(buf[:bufLen])
		for {
			if pktBuf.Len() <= 0 {
				break
			}
            // 通過 s.reader 将收到的封包解碼成 RPC 消息
			pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
			// 省略部分代碼

      s.UpdateActive()
            // 将收到的消息放入 TaskQueue 供 RPC 消費端消費
			s.addTask(pkg)
			pktBuf.Next(pkgLen)
			// continue to handle case 5
		}
		if exit {
			break
		}
	}

	return perrors.WithStack(err)
}
           

從上面的代碼邏輯我們分析出,RPC 消費端需要将從 TCP 連接配接收到的二進制比特封包解碼成 RPC 能消費的消息,這個工作由 s.reader 實作,是以,我們要建構 RPC 通信層也需要實作 s.reader 對應的 Reader 接口。

3. 底層處理網絡封包的邏輯如何與業務邏輯解耦

我們都知道,netty 通過 boss 線程和 worker 線程實作了底層網絡邏輯和業務邏輯的解耦。那麼,getty 是如何實作的呢?

handlePackage()

方法最後,我們看到,收到的消息被放入了

s.addTask(pkg)

這個方法,接着往下分析:

func (s *session) addTask(pkg interface{}) {
	f := func() {
		s.listener.OnMessage(s, pkg)
		s.incReadPkgNum()
	}
	if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
		taskPool.AddTaskAlways(f)
		return
	}
	f()
}
           

pkg

參數傳遞到了一個匿名方法,這個方法最終放入了

taskPool

。這個方法很關鍵,在我後來寫 seata-golang 代碼的時候,就遇到了一個坑,這個坑後面分析。

接着我們看一下 taskPool 的定義:

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
	if size < 1 {
		size = runtime.NumCPU() * 100
	}
	return &taskPoolSimple{
		work: make(chan task),
		sem:  make(chan struct{}, size),
		done: make(chan struct{}),
	}
}
           

建構了一個緩沖大小為 size (預設為  

runtime.NumCPU() * 100

) 的 channel

sem

。再看方法

AddTaskAlways(t task)

func (p *taskPoolSimple) AddTaskAlways(t task) {
	select {
	case <-p.done:
		return
	default:
	}

	select {
	case p.work <- t:
		return
	default:
	}
	select {
	case p.work <- t:
	case p.sem <- struct{}{}:
		p.wg.Add(1)
		go p.worker(t)
	default:
		goSafely(t)
	}
}
           

加入的任務,會先由 len(p.sem) 個 goroutine 去消費,如果沒有 goroutine 空閑,則會啟動一個臨時的 goroutine 去運作 t()。相當于有  len(p.sem) 個 goroutine 組成了 goroutine pool,pool 中的 goroutine 去處理業務邏輯,而不是由處理網絡封包的 goroutine 去運作業務邏輯,進而實作了解耦。寫 seata-golang 時遇到的一個坑,就是忘記設定 taskPool 造成了處理業務邏輯和處理底層網絡封包邏輯的 goroutine 是同一個,我在業務邏輯中阻塞等待一個任務完成時,阻塞了整個 goroutine,使得阻塞期間收不到任何封包。

4. 具體實作

下面的代碼見 getty.go:

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
	Read(Session, []byte) (interface{}, int, error)
}

// Writer is used to marshal pkg and write to session
type Writer interface {
	// if @Session is udpGettySession, the second parameter is UDPContext.
	Write(Session, interface{}) ([]byte, error)
}

// ReadWriter interface use for handle application packages
type ReadWriter interface {
	Reader
	Writer
}
           
// EventListener is used to process pkg that received from remote session
type EventListener interface {
	// invoked when session opened
	// If the return error is not nil, @Session will be closed.
	OnOpen(Session) error

	// invoked when session closed.
	OnClose(Session)

	// invoked when got error.
	OnError(Session, error)

	// invoked periodically, its period can be set by (Session)SetCronPeriod
	OnCron(Session)

	// invoked when getty received a package. Pls attention that do not handle long time
	// logic processing in this func. You'd better set the package's maximum length.
	// If the message's length is greater than it, u should should return err in
	// Reader{Read} and getty will close this connection soon.
	//
	// If ur logic processing in this func will take a long time, u should start a goroutine
	// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
	// can do the logic processing in other asynchronous way.
	// !!!In short, ur OnMessage callback func should return asap.
	//
	// If this is a udp event listener, the second parameter type is UDPContext.
	OnMessage(Session, interface{})
}
           

通過對整個 getty 代碼的分析,我們隻要實作  

ReadWriter

來對 RPC  消息編解碼,再實作

EventListener

來處理 RPC 消息的對應的具體邏輯,将

ReadWriter

實作和

EventLister

實作注入到 RPC 的 Client 和 Server 端,則可實作 RPC 通信。

4.1 編解碼協定實作

下面是 seata 協定的定義:

分布式事務架構 seata-golang 通信模型詳解

在 ReadWriter 接口的實作

RpcPackageHandler

中,調用 Codec 方法對消息體按照上面的格式編解碼:

// 消息編碼為二進制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
	switch codecType {
	case SEATA:
		return SeataEncoder(in)
	default:
		log.Errorf("not support codecType, %s", codecType)
		return nil
	}
}

// 二進制比特解碼為消息體
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
	switch codecType {
	case SEATA:
		return SeataDecoder(in)
	default:
		log.Errorf("not support codecType, %s", codecType)
		return nil, 0
	}
}
           

4.2 Client 端實作

再來看 client 端

EventListener

的實作

RpcRemotingClient

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
	go func() 
		request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
			ApplicationId:           client.conf.ApplicationId,
			TransactionServiceGroup: client.conf.TransactionServiceGroup,
		}}
    // 建立連接配接後向 Transaction Coordinator 發起注冊 TransactionManager 的請求
		_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
		if err == nil {
      // 将與 Transaction Coordinator 建立的連接配接儲存在連接配接池供後續使用
			clientSessionManager.RegisterGettySession(session)
			client.GettySessionOnOpenChannel <- session.RemoteAddr()
		}
	}()

	return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
	clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
	clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
	log.Info("received message:{%v}", pkg)
	rpcMessage, ok := pkg.(protocal.RpcMessage)
	if ok {
		heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
		if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
			log.Debugf("received PONG from %s", session.RemoteAddr())
		}
	}

	if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
		rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
		log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
      
		// 處理事務消息,送出 or 復原
		client.onMessage(rpcMessage, session.RemoteAddr())
	} else {
		resp, loaded := client.futures.Load(rpcMessage.Id)
		if loaded {
			response := resp.(*getty2.MessageFuture)
			response.Response = rpcMessage.Body
			response.Done <- true
			client.futures.Delete(rpcMessage.Id)
		}
	}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
  // 發送心跳
	client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}
           

clientSessionManager.RegisterGettySession(session)

的邏輯将在下文中分析。

4.3 Server 端 Transaction Coordinator 實作

代碼見

DefaultCoordinator

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
	log.Infof("got getty_session:%s", session.Stat())
	return nil
}

func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
	// 釋放 TCP 連接配接
  SessionManager.ReleaseGettySession(session)
	session.Close()
	log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
	log.Info("getty_session{%s} is closing......", session.Stat())
}

func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
	log.Debugf("received message:{%v}", pkg)
	rpcMessage, ok := pkg.(protocal.RpcMessage)
	if ok {
		_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
		if isRegTM {
      // 将 TransactionManager 資訊和 TCP 連接配接建立映射關系
			coordinator.OnRegTmMessage(rpcMessage, session)
			return
		}

		heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
		if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
			coordinator.OnCheckMessage(rpcMessage, session)
			return
		}

		if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
			rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
			log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
			_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
			if isRegRM {
        // 将 ResourceManager 資訊和 TCP 連接配接建立映射關系
				coordinator.OnRegRmMessage(rpcMessage, session)
			} else {
				if SessionManager.IsRegistered(session) {
					defer func() {
						if err := recover(); err != nil {
							log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
						}
					}()
          // 處理事務消息,全局事務注冊、分支事務注冊、分支事務送出、全局事務復原等
					coordinator.OnTrxMessage(rpcMessage, session)
				} else {
					session.Close()
					log.Infof("close a unhandled connection! [%v]", session)
				}
			}
		} else {
			resp, loaded := coordinator.futures.Load(rpcMessage.Id)
			if loaded {
				response := resp.(*getty2.MessageFuture)
				response.Response = rpcMessage.Body
				response.Done <- true
				coordinator.futures.Delete(rpcMessage.Id)
			}
		}
	}
}

func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {

}
           

coordinator.OnRegTmMessage(rpcMessage, session)

注冊 Transaction Manager,

coordinator.OnRegRmMessage(rpcMessage, session)

注冊 Resource Manager。具體邏輯分析見下文。

消息進入

coordinator.OnTrxMessage(rpcMessage, session)

方法,将按照消息的類型碼路由到具體的邏輯當中:

switch msg.GetTypeCode() {
	case protocal.TypeGlobalBegin:
		req := msg.(protocal.GlobalBeginRequest)
		resp := coordinator.doGlobalBegin(req, ctx)
		return resp
	case protocal.TypeGlobalStatus:
		req := msg.(protocal.GlobalStatusRequest)
		resp := coordinator.doGlobalStatus(req, ctx)
		return resp
	case protocal.TypeGlobalReport:
		req := msg.(protocal.GlobalReportRequest)
		resp := coordinator.doGlobalReport(req, ctx)
		return resp
	case protocal.TypeGlobalCommit:
		req := msg.(protocal.GlobalCommitRequest)
		resp := coordinator.doGlobalCommit(req, ctx)
		return resp
	case protocal.TypeGlobalRollback:
		req := msg.(protocal.GlobalRollbackRequest)
		resp := coordinator.doGlobalRollback(req, ctx)
		return resp
	case protocal.TypeBranchRegister:
		req := msg.(protocal.BranchRegisterRequest)
		resp := coordinator.doBranchRegister(req, ctx)
		return resp
	case protocal.TypeBranchStatusReport:
		req := msg.(protocal.BranchReportRequest)
		resp := coordinator.doBranchReport(req, ctx)
		return resp
	default:
		return nil
	}
           

4.4 session manager 分析

Client 端同 Transaction Coordinator 建立連接配接起連接配接後,通過

clientSessionManager.RegisterGettySession(session)

将連接配接儲存在

serverSessions = sync.Map{}

這個 map 中。map 的 key 為從 session 中擷取的 RemoteAddress 即 Transaction Coordinator 的位址,value 為 session。這樣,Client 端就可以通過 map 中的一個 session 來向 Transaction Coordinator 注冊 Transaction Manager 和 Resource Manager 了。具體代碼見

getty_client_session_manager.go

Transaction Manager 和 Resource Manager 注冊到 Transaction Coordinator 後,一個連接配接既有可能用來發送 TM 消息也有可能用來發送 RM 消息。我們通過 RpcContext 來辨別一個連接配接資訊:

type RpcContext struct {
	Version                 string
	TransactionServiceGroup string
	ClientRole              meta.TransactionRole
	ApplicationId           string
	ClientId                string
	ResourceSets            *model.Set
	Session                 getty.Session
}
           

當收到事務消息時,我們需要構造這樣一個 RpcContext 供後續事務處理邏輯使用。是以,我們會構造下列 map 來緩存映射關系:

var (
	// session -> transactionRole
	// TM will register before RM, if a session is not the TM registered,
	// it will be the RM registered
	session_transactionroles = sync.Map{}

	// session -> applicationId
	identified_sessions = sync.Map{}

	// applicationId -> ip -> port -> session
	client_sessions = sync.Map{}

	// applicationId -> resourceIds
	client_resources = sync.Map{}
)
           

這樣,Transaction Manager 和 Resource Manager 分别通過

coordinator.OnRegTmMessage(rpcMessage, session)

coordinator.OnRegRmMessage(rpcMessage, session)

注冊到 Transaction Coordinator 時,會在上述 client_sessions map 中緩存 applicationId、ip、port 與 session 的關系,在 client_resources map 中緩存 applicationId 與 resourceIds(一個應用可能存在多個 Resource Manager) 的關系。在需要時,我們就可以通過上述映射關系構造一個 RpcContext。這部分的實作和 java 版 seata 有很大的不同,感興趣的可以深入了解一下。具體代碼見

getty_session_manager.go

至此,我們就分析完了 seata-golang 整個 RPC 通信模型的機制。

三、seata-golang 的未來

seata-golang  從今年 4 月份開始開發,到 8 月份基本實作和 java 版 seata 1.2 協定的互通,對 mysql 資料庫實作了 AT 模式(自動協調分布式事務的送出復原),實作了 TCC 模式,TC 端使用 mysql 存儲資料,使 TC 變成一個無狀态應用支援高可用部署。下圖展示了 AT 模式的原理:

分布式事務架構 seata-golang 通信模型詳解

後續,還有許多工作可以做,比如:對注冊中心的支援、對配置中心的支援、和 java 版 seata 1.4 的協定互通、其他資料庫的支援、raft transaction coordinator 的實作等,希望對分布式事務問題感興趣的開發者可以加入進來一起來打造一個完善的 golang 的分布式事務架構。如果你有任何疑問,歡迎加入交流群【釘釘群号 33069364】。

另外,歡迎對 dubbogo 感興趣的朋友到 dubbogo 社群釘釘群(釘釘群号 31363295)溝通 dubbogo 技術問題。

參考資料

  • seata 官方:https://seata.io
  • java 版 seata:https://github.com/seata/seata
  • seata-golang 項目位址:https://github.com/opentrx/seata-golang
  • seata-golang go 夜讀 b站分享:https://www.bilibili.com/video/BV1oz411e72T

作者簡介

劉曉敏 (GitHubID dk-lockdown),目前就職于 h3c 成都分公司,擅長使用 Java/Go 語言,在雲原生和微服務相關技術方向均有涉獵,目前專攻分布式事務。

于雨(github @AlexStocks),dubbo-go 項目和社群負責人,一個有十多年服務端基礎架構研發一線工作經驗的程式員,陸續參與改進過 Muduo/Pika/Dubbo/Sentinel-go 等知名項目,目前在螞蟻金服可信原生部從事容器編排和 service mesh 工作。