天天看點

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

介紹

MCP是基于訂閱的配置分發API。配置使用者(即sink)從配置生産者(即source)請求更新資源集合.添加,更新或删除資源時,source會将資源更新推送到sink.sink積極确認資源更新,如果sink接受,則傳回ACK,如果被拒絕則傳回NACK,例如: 因為資源無效。一旦對先前的更新進行了ACK/NACK,則源可以推送其他更新.該源一次隻能運作一個未完成的更新(每個集合).

MCP是一對雙向流gRPC API服務(ResourceSource和ResourceSink)。

  • 當source是伺服器而sink是用戶端時,将使用ResourceSource服務.預設情況下,Galley實作ResourceSource服務,并且Pilot/Mixer連接配接作為用戶端。
  • 當source是用戶端,而接收器是伺服器時,将使用ResourceSink服務.可以将Galley配置為可選地

    dial-out

    到遠端配置sink,例如 Pilot位于另一個群集中,在該群集中,它不能作為用戶端啟動與Galley的連接配接.在這種情況下,Pilot将實作ResourceSink服務,而Galley将作為用戶端進行連接配接。

就消息交換而言,ResourceSource和ResourceSink在語義上是等效的.唯一有意義的差別是誰啟動連接配接并打開grpc流。

資料模型

MCP是一種傳輸機制,可以通過管理器元件配置先導和混合器.MCP定義了每種資源的通用中繼資料格式,而資源特定的内容則在其他位置定義(例如https://github.com/istio/api/tree/master/networking/v1alpha3).

Collections

相同類型的資源被組織到命名集合中.Istio API集合名稱的格式為istio///,其中,和<api由API樣式準則定義.例如,VirtualService的collection名稱為istio/networking/v1alpha3/virtualservices。

中繼資料

建立連接配接

ResourceSource服務-用戶端是reource sink.用戶端dail伺服器并建立新的gRPC流.用戶端發送RequestResources并接收Resources消息。

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

ResourceSink服務-用戶端是資源源.用戶端撥打伺服器并建立新的gRPC流.伺服器發送RequestResources并接收Resources消息。

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

配置更新

以下概述适用于ResourceSink和ResourceSource服務,無論用戶端/伺服器角色如何。

資源更新協定由增量xDS協定派生。除了資源提示已被删除之外,協定交換幾乎是相同的。下面的大多數文本和圖表都是從增量xDS文檔中複制并進行相應調整的。

在MCP中,資源首先按collection進行組織。在每個collection中,資源可以通過中繼資料名稱唯一地辨別。對各個資源進行版本控制,以區分同一命名資源的較新版本。

可以在兩種情況下發送RequestResource消息:

  • MCP雙向更改流中的初始消息
  • 作為對先前資源消息的ACK或NACK響應。在這種情況下,response_nonce設定為資源消息中的現時值.ACK/NACK由後續請求中是否存在error_detail決定。

初始的RequestResources消息包括與所訂閱的資源集(例如VirtualService)相對應的集合,節點接收器辨別符和nonce字段以及initial_resource_version(稍後會詳細介紹)。當請求的資源可用時,source将發送資源消息。處理資源消息後,sink在流上發送新的RequestResources消息,指定成功應用的最後一個版本以及源提供的随機數。

随機數字段用于将每個集合的RequestResources和Resources消息配對。源一次隻能發送一個未完成的資源消息(每個collection),并等待接收器進行ACK/NACK。接收到更新後,接收器将在解碼,驗證更新并将更新持久儲存到其内部配置存儲後,期望相對較快地發送ACK/NACK。

source應忽略具有過期和未知随機數的請求,這些請求與最近發送的Resource消息中的随機數不比對。

成功示例

以下示例顯示接收器接收到已成功ACK的一系列更改。

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

以下示例顯示了與增量更新一起傳遞的所需資源.此示例假定source支援增量.當source不支援增量更新時,考慮到接收器是否請求增量更新,推送的資源将始終将增量設定為false.在任何時候,源都可以決定推送完整狀态更新,而不必考慮接收器的請求.雙方必須協商(即同意)在每個請求/響應的基礎上使用增量,以增量發送更新。

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

錯誤示例

以下示例顯示了無法應用更改時發生的情況

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

接收器僅在特殊情況下應為NACK。例如,如果一組資源無效,格式錯誤或無法解碼。NACK的更新應發出警報,以供人類随後進行調查.源不應該重新發送先前NACK相同的資源集.在将金絲雀推送到更大數量的資源接收器之前,也可以将金絲雀推送到專用接收器,以驗證正确性(非NACK)。

MCP中的随機數用于比對RequestResources和Resource。在重新連接配接時,接收器可以通過為每個集合指定帶有initial_resource_version的已知資源版本來嘗試恢複與同一源的會話。

mcp實作探

接下來以官方的測試用例分析官方的mcp實作,代碼位址:https://github.com/istio/istio/blob/master/pkg/mcp/testing/server.go

對于mcpsource通過

mcp.RegisterResourceSourceServer(gs, srcServer) 
           

進行注冊

Server.EstablishResourceStream

擷取用戶端資訊,并進行鑒權,交給ProcessStream進行處理

func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {
	if s.rateLimiter != nil {
		if err := s.rateLimiter.Wait(stream.Context()); err != nil {
			return err
		}

	}
	var authInfo credentials.AuthInfo
	if peerInfo, ok := peer.FromContext(stream.Context()); ok { //擷取用戶端資訊
		authInfo = peerInfo.AuthInfo
	} else {
		scope.Warnf("No peer info found on the incoming stream.")
	}

	if err := s.authCheck.Check(authInfo); err != nil { // 認證
		return status.Errorf(codes.Unauthenticated, "Authentication failure: %v", err)
	}

	if err := stream.SendHeader(s.metadata); err != nil {
		return err
	}
	err := s.src.ProcessStream(stream) 
	code := status.Code(err)
	if code == codes.OK || code == codes.Canceled || err == io.EOF {
		return nil
	}
	return err
}
           

Source.ProcessStream

  • s.newConnection(stream)

初始化connection,

con := &connection{
    stream:   stream,
    peerAddr: peerAddr,
    requestC: make(chan *mcp.RequestResources),
    watches:  make(map[string]*watch),
    watcher:  s.watcher,
    id:       atomic.AddInt64(&s.nextStreamID, 1),
    reporter: s.reporter,
    limiter:  s.requestLimiter.Create(),
    queue:    internal.NewUniqueScheduledQueue(len(s.collections)),
}
           

watcher即為Server.src的watcher

func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {
	s := &Server{
		src:         New(srcOptions),
		authCheck:   serverOptions.AuthChecker,
		rateLimiter: serverOptions.RateLimiter,
		metadata:    serverOptions.Metadata,
	}
	return s
}
           
  • con.receive()

通過receive不斷将資料寫入requestC channel

  • 從requestC 讀取請求
case req, more := <-con.requestC:
    if !more {
        return con.reqError
    }
    if con.limiter != nil {
        if err := con.limiter.Wait(stream.Context()); err != nil {
            return err
        }

    }
    if err := con.processClientRequest(req); err != nil {
        return err
    }
           
  • 響應入隊
func (con *connection) queueResponse(resp *WatchResponse) {
	if resp == nil {
		con.queue.Close()
	} else {
		con.queue.Enqueue(resp.Collection, resp) //響應入隊
	}
}
           
  • 響應出隊

從queue中擷取需要傳回的response,傳回給sink

collection, item, ok := con.queue.Dequeue() // 從queue讀取處理後的傳回結果
if !ok {
    break
}

resp := item.(*WatchResponse)

w, ok := con.watches[collection]
if !ok {
    scope.Errorf("unknown collection in dequeued watch response: %v", collection)
    break // bug?
}

// the response may have been cleared before we got to it
if resp != nil {
    if err := con.pushServerResponse(w, resp); err != nil { //通過pushServerResponse 将響應發送給sink
        return err
    }
}
           

connection.processClientRequest

通過調用Watch方法處理請求

sr := &Request{
			SinkNode:    req.SinkNode,
			Collection:  collection,
			VersionInfo: versionInfo,
			incremental: req.Incremental,
		}
w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)
           

由此我們可以看出我們實作一個mcp的核心在于實作一個watcher,傳遞給mcp source server

type Watcher interface {
	Watch(*Request, PushResponseFunc, string) CancelWatchFunc
}
           

官方mcp watcher 實作

pkg/mcp/testing/server.go

cache := snapshot.New(groups.DefaultIndexFn)

重點關注cache.Watch

func (c *Cache) Watch(
	request *source.Request,
	pushResponse source.PushResponseFunc,
	peerAddr string) source.CancelWatchFunc {
	group := c.groupIndex(request.Collection, request.SinkNode) // 擷取sink要擷取的group

	c.mu.Lock()
	defer c.mu.Unlock()

	info := c.fillStatus(group, request, peerAddr) // 初始化對應group peer的同步狀态資訊

	collection := request.Collection

	// return an immediate response if a snapshot is available and the
	// requested version doesn't match.
	if snapshot, ok := c.snapshots[group]; ok {  // 擷取對應組的snapshot

		version := snapshot.Version(request.Collection) // 計算版本資訊
		scope.Debugf("Found snapshot for group: %q for %v @ version: %q",
			group, request.Collection, version)

		if version != request.VersionInfo {        // 如果sink的目前版本和source的版本不一緻則推送response
			scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)
			response := &source.WatchResponse{
				Collection: request.Collection,
				Version:    version,
				Resources:  snapshot.Resources(request.Collection),
				Request:    request,
			}
			pushResponse(response)
			return nil
		}
		info.synced[request.Collection][peerAddr] = true
	}

	// 如果版本一緻則.傳回一個cancel,同時記錄對應的watchs,當SetSnapshot時,也就是有更新時會進行調用
	c.watchCount++
	watchID := c.watchCount

	scope.Infof("Watch(): created watch %d for %s from group %q, version %q",
		watchID, collection, group, request.VersionInfo)

	info.mu.Lock()
	info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}
	info.mu.Unlock()

	cancel := func() {
		c.mu.Lock()
		defer c.mu.Unlock()
		if s, ok := c.status[group]; ok {
			s.mu.Lock()
			delete(s.watches, watchID)
			s.mu.Unlock()
		}
	}
	return cancel
}
           

掃描關注我:

istio mcp探究介紹資料模型建立連接配接配置更新mcp實作探Server.EstablishResourceStreamSource.ProcessStreamconnection.processClientRequest官方mcp watcher 實作

繼續閱讀