介紹
MCP是基于訂閱的配置分發API。配置使用者(即sink)從配置生産者(即source)請求更新資源集合.添加,更新或删除資源時,source會将資源更新推送到sink.sink積極确認資源更新,如果sink接受,則傳回ACK,如果被拒絕則傳回NACK,例如: 因為資源無效。一旦對先前的更新進行了ACK/NACK,則源可以推送其他更新.該源一次隻能運作一個未完成的更新(每個集合).
MCP是一對雙向流gRPC API服務(ResourceSource和ResourceSink)。
- 當source是伺服器而sink是用戶端時,将使用ResourceSource服務.預設情況下,Galley實作ResourceSource服務,并且Pilot/Mixer連接配接作為用戶端。
- 當source是用戶端,而接收器是伺服器時,将使用ResourceSink服務.可以将Galley配置為可選地
到遠端配置sink,例如 Pilot位于另一個群集中,在該群集中,它不能作為用戶端啟動與Galley的連接配接.在這種情況下,Pilot将實作ResourceSink服務,而Galley将作為用戶端進行連接配接。dial-out
就消息交換而言,ResourceSource和ResourceSink在語義上是等效的.唯一有意義的差別是誰啟動連接配接并打開grpc流。
資料模型
MCP是一種傳輸機制,可以通過管理器元件配置先導和混合器.MCP定義了每種資源的通用中繼資料格式,而資源特定的内容則在其他位置定義(例如https://github.com/istio/api/tree/master/networking/v1alpha3).
Collections
相同類型的資源被組織到命名集合中.Istio API集合名稱的格式為istio///,其中,和<api由API樣式準則定義.例如,VirtualService的collection名稱為istio/networking/v1alpha3/virtualservices。
中繼資料
建立連接配接
ResourceSource服務-用戶端是reource sink.用戶端dail伺服器并建立新的gRPC流.用戶端發送RequestResources并接收Resources消息。

ResourceSink服務-用戶端是資源源.用戶端撥打伺服器并建立新的gRPC流.伺服器發送RequestResources并接收Resources消息。
配置更新
以下概述适用于ResourceSink和ResourceSource服務,無論用戶端/伺服器角色如何。
資源更新協定由增量xDS協定派生。除了資源提示已被删除之外,協定交換幾乎是相同的。下面的大多數文本和圖表都是從增量xDS文檔中複制并進行相應調整的。
在MCP中,資源首先按collection進行組織。在每個collection中,資源可以通過中繼資料名稱唯一地辨別。對各個資源進行版本控制,以區分同一命名資源的較新版本。
可以在兩種情況下發送RequestResource消息:
- MCP雙向更改流中的初始消息
- 作為對先前資源消息的ACK或NACK響應。在這種情況下,response_nonce設定為資源消息中的現時值.ACK/NACK由後續請求中是否存在error_detail決定。
初始的RequestResources消息包括與所訂閱的資源集(例如VirtualService)相對應的集合,節點接收器辨別符和nonce字段以及initial_resource_version(稍後會詳細介紹)。當請求的資源可用時,source将發送資源消息。處理資源消息後,sink在流上發送新的RequestResources消息,指定成功應用的最後一個版本以及源提供的随機數。
随機數字段用于将每個集合的RequestResources和Resources消息配對。源一次隻能發送一個未完成的資源消息(每個collection),并等待接收器進行ACK/NACK。接收到更新後,接收器将在解碼,驗證更新并将更新持久儲存到其内部配置存儲後,期望相對較快地發送ACK/NACK。
source應忽略具有過期和未知随機數的請求,這些請求與最近發送的Resource消息中的随機數不比對。
成功示例
以下示例顯示接收器接收到已成功ACK的一系列更改。
以下示例顯示了與增量更新一起傳遞的所需資源.此示例假定source支援增量.當source不支援增量更新時,考慮到接收器是否請求增量更新,推送的資源将始終将增量設定為false.在任何時候,源都可以決定推送完整狀态更新,而不必考慮接收器的請求.雙方必須協商(即同意)在每個請求/響應的基礎上使用增量,以增量發送更新。
錯誤示例
以下示例顯示了無法應用更改時發生的情況
接收器僅在特殊情況下應為NACK。例如,如果一組資源無效,格式錯誤或無法解碼。NACK的更新應發出警報,以供人類随後進行調查.源不應該重新發送先前NACK相同的資源集.在将金絲雀推送到更大數量的資源接收器之前,也可以将金絲雀推送到專用接收器,以驗證正确性(非NACK)。
MCP中的随機數用于比對RequestResources和Resource。在重新連接配接時,接收器可以通過為每個集合指定帶有initial_resource_version的已知資源版本來嘗試恢複與同一源的會話。
mcp實作探
接下來以官方的測試用例分析官方的mcp實作,代碼位址:https://github.com/istio/istio/blob/master/pkg/mcp/testing/server.go
對于mcpsource通過
mcp.RegisterResourceSourceServer(gs, srcServer)
進行注冊
Server.EstablishResourceStream
擷取用戶端資訊,并進行鑒權,交給ProcessStream進行處理
func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {
if s.rateLimiter != nil {
if err := s.rateLimiter.Wait(stream.Context()); err != nil {
return err
}
}
var authInfo credentials.AuthInfo
if peerInfo, ok := peer.FromContext(stream.Context()); ok { //擷取用戶端資訊
authInfo = peerInfo.AuthInfo
} else {
scope.Warnf("No peer info found on the incoming stream.")
}
if err := s.authCheck.Check(authInfo); err != nil { // 認證
return status.Errorf(codes.Unauthenticated, "Authentication failure: %v", err)
}
if err := stream.SendHeader(s.metadata); err != nil {
return err
}
err := s.src.ProcessStream(stream)
code := status.Code(err)
if code == codes.OK || code == codes.Canceled || err == io.EOF {
return nil
}
return err
}
Source.ProcessStream
- s.newConnection(stream)
初始化connection,
con := &connection{
stream: stream,
peerAddr: peerAddr,
requestC: make(chan *mcp.RequestResources),
watches: make(map[string]*watch),
watcher: s.watcher,
id: atomic.AddInt64(&s.nextStreamID, 1),
reporter: s.reporter,
limiter: s.requestLimiter.Create(),
queue: internal.NewUniqueScheduledQueue(len(s.collections)),
}
watcher即為Server.src的watcher
func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {
s := &Server{
src: New(srcOptions),
authCheck: serverOptions.AuthChecker,
rateLimiter: serverOptions.RateLimiter,
metadata: serverOptions.Metadata,
}
return s
}
- con.receive()
通過receive不斷将資料寫入requestC channel
- 從requestC 讀取請求
case req, more := <-con.requestC:
if !more {
return con.reqError
}
if con.limiter != nil {
if err := con.limiter.Wait(stream.Context()); err != nil {
return err
}
}
if err := con.processClientRequest(req); err != nil {
return err
}
- 響應入隊
func (con *connection) queueResponse(resp *WatchResponse) {
if resp == nil {
con.queue.Close()
} else {
con.queue.Enqueue(resp.Collection, resp) //響應入隊
}
}
- 響應出隊
從queue中擷取需要傳回的response,傳回給sink
collection, item, ok := con.queue.Dequeue() // 從queue讀取處理後的傳回結果
if !ok {
break
}
resp := item.(*WatchResponse)
w, ok := con.watches[collection]
if !ok {
scope.Errorf("unknown collection in dequeued watch response: %v", collection)
break // bug?
}
// the response may have been cleared before we got to it
if resp != nil {
if err := con.pushServerResponse(w, resp); err != nil { //通過pushServerResponse 将響應發送給sink
return err
}
}
connection.processClientRequest
通過調用Watch方法處理請求
sr := &Request{
SinkNode: req.SinkNode,
Collection: collection,
VersionInfo: versionInfo,
incremental: req.Incremental,
}
w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)
由此我們可以看出我們實作一個mcp的核心在于實作一個watcher,傳遞給mcp source server
type Watcher interface {
Watch(*Request, PushResponseFunc, string) CancelWatchFunc
}
官方mcp watcher 實作
pkg/mcp/testing/server.go
cache := snapshot.New(groups.DefaultIndexFn)
重點關注cache.Watch
func (c *Cache) Watch(
request *source.Request,
pushResponse source.PushResponseFunc,
peerAddr string) source.CancelWatchFunc {
group := c.groupIndex(request.Collection, request.SinkNode) // 擷取sink要擷取的group
c.mu.Lock()
defer c.mu.Unlock()
info := c.fillStatus(group, request, peerAddr) // 初始化對應group peer的同步狀态資訊
collection := request.Collection
// return an immediate response if a snapshot is available and the
// requested version doesn't match.
if snapshot, ok := c.snapshots[group]; ok { // 擷取對應組的snapshot
version := snapshot.Version(request.Collection) // 計算版本資訊
scope.Debugf("Found snapshot for group: %q for %v @ version: %q",
group, request.Collection, version)
if version != request.VersionInfo { // 如果sink的目前版本和source的版本不一緻則推送response
scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)
response := &source.WatchResponse{
Collection: request.Collection,
Version: version,
Resources: snapshot.Resources(request.Collection),
Request: request,
}
pushResponse(response)
return nil
}
info.synced[request.Collection][peerAddr] = true
}
// 如果版本一緻則.傳回一個cancel,同時記錄對應的watchs,當SetSnapshot時,也就是有更新時會進行調用
c.watchCount++
watchID := c.watchCount
scope.Infof("Watch(): created watch %d for %s from group %q, version %q",
watchID, collection, group, request.VersionInfo)
info.mu.Lock()
info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}
info.mu.Unlock()
cancel := func() {
c.mu.Lock()
defer c.mu.Unlock()
if s, ok := c.status[group]; ok {
s.mu.Lock()
delete(s.watches, watchID)
s.mu.Unlock()
}
}
return cancel
}
掃描關注我: