天天看點

VictoriaMetrics報錯cannot handle more than..... (源碼分析)

作者:雲原生驿站

1.前言

之前不是在搞VictoriaMetrics嘛,自己在公網有兩台2C4G的機器,就跑個VictoriaMetrics叢集上去,但是我上面不止跑這玩意,結果就把VictoriaMetrics叢集跑挂了,報了什麼 cannot handle more than .... possible solutions .... increase

-insert.maxQueueDuration

, increase

-maxConcurrentInserts

, increase server capacity。雖然不知道就是隊列滿了,要等其他請求完成,但是作為一個開發,還是得本着知其然知其是以然,是以繼續fake source code吧。

2.定位核心代碼

直接拷貝源碼搜尋這段相關文本,很快就可以定位到相關位置了

/lib/writeconcurrencylimiter/concurrencylimiter.go#Do

可以看到這段報錯其實就在一個select..case...當中,上面相關的注釋也說明了:所有任務都在忙中,等待maxQueueDuration時長,如果在maxQueueDuration時間内沒有擷取到隊列可以處理資源,那麼就丢棄請求并傳回錯誤資訊。

// Do calls f with the limited concurrency.
func Do(f func() error) error {
// Limit the number of conurrent f calls in order to prevent from excess
// memory usage and CPU thrashing.
select {
// 往通道寫入一個資料,辨別着開始處理一個請求
case ch <- struct{}{}:
// 阻塞進入等待請求完成
 err := f()
// 請求處理完之後釋放通道内的一個資料,釋放出來之後才可以接收多一個任務
 <-ch
return err
default:
 }

// All the workers are busy.
// Sleep for up to *maxQueueDuration.
// 達到處理的最大數,需等其他Do(f func() error)釋放資源
 concurrencyLimitReached.Inc()
// 擷取time.Timer,設定逾時等待時間為maxQueueDuration
 t := timerpool.Get(*maxQueueDuration)
select {
// 在maxQueueDuration有其他請求釋放了資源
case ch <- struct{}{}:
// 回收time.Timer
 timerpool.Put(t)
// 繼續處理任務
 err := f()
 <-ch
return err
// 逾時
case <-t.C:
// 回收time.Timer
 timerpool.Put(t)
// pacelimiter(步長限制器)
 concurrencyLimitTimeout.Inc()
// 逾時資訊提示
return &httpserver.ErrorWithStatusCode{
 Err: fmt.Errorf("cannot handle more than %d concurrent inserts during %s; possible solutions: "+
"increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity", *maxConcurrentInserts, *maxQueueDuration),
 StatusCode: http.StatusServiceUnavailable,
 }
 }
}
           

上面的這個方法是在

concurrencylimiter.go

當中的,這個看名字跟看源碼,也沒具體業務,是以我們看下有什麼調用了這個func

VictoriaMetrics報錯cannot handle more than..... (源碼分析)

可以看到非常多的方法都調用了,我們可以看下限速器是用那個value,看看這個是限速什麼的并發。

concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
           

可以看到這個是給vminsert的使用的,

VictoriaMetrics

當中,

vminsert

負責接收

vmagent

的流量并将其轉發到vmstorage當中,在

vmstorage

出錯、處理過慢、卡死的情況下,有可能會導緻無法轉發流量,進而造成

vminsert

的CPU和記憶體飙升,為了防止

vminsert

過載導緻元件故障,

vminsert

引入了限速器,在犧牲一部分資料的情況下保證了系統的正常運作。

但是!上面雖然那麼多會調用這個方法,實際上我隻有Prometheus的RemoteWrite

VictoriaMetrics報錯cannot handle more than..... (源碼分析)

那麼問題肯定是出現在

vmstorage

上面了。

3. 鍊路分析

VictoriaMetrics報錯cannot handle more than..... (源碼分析)

上面是promeremotewrite的調用鍊,到Storage.AddRows時,有一處并發控制,也就是

addRowsConcurrencyCh

而這個變量的初始化我們可以看看

var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
 addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
 addRowsTimeout = 30 * time.Second
)
           

可以看到這裡是根據CPU的核數來限制并發的,假設有3個核心,則寫入操作最多3個并發。

// AddRows adds the given mrs to s.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
 }

// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call AddRows.
select {
// 如果可以寫入addRowsConcurrencyCh,證明隊列此時還沒滿,跳過目前的select
case addRowsConcurrencyCh <- struct{}{}:
// 無法寫入addRowsConcurrencyCh,此時并發數已經超過CPU的核數了
default:
// Sleep for a while until giving up
 atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
// 這裡擷取到一個time.Timer來控制逾時
 t := timerpool.Get(addRowsTimeout)

// Prioritize data ingestion over concurrent searches.
// 依然是pacelimiter來進行計數
 storagepacelimiter.Search.Inc()

select {
// 如果在addRowsTimeout時間内,入隊成功
case addRowsConcurrencyCh <- struct{}{}:
// 歸還time.Timer對象
 timerpool.Put(t)
// 計數器減一,然後會一直減1,直到等待數量為0了,調用cond.Broadcast()來通知select工作,可以進去Dec()看看cond.Broadcast()
 storagepacelimiter.Search.Dec()
// 等待了addRowsTimeout時間後仍然沒有CPU資源,隻能報錯
case <-t.C:
 timerpool.Put(t)
 storagepacelimiter.Search.Dec()
 atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
 atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
 }
 }

// Add rows to the storage in blocks with limited size in order to reduce memory usage.
// 這一步才開始真正的資料插入邏輯
var firstErr error
 ic := getMetricRowsInsertCtx()
 maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
 mrsBlock := mrs
if len(mrs) > maxBlockLen {
 mrsBlock = mrs[:maxBlockLen]
 mrs = mrs[maxBlockLen:]
 } else {
 mrs = nil
 }
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
 firstErr = err
 }
continue
 }
 atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
 }
 putMetricRowsInsertCtx(ic)

 <-addRowsConcurrencyCh

return firstErr
}
           

4. 總結

到上面那一步就可以發現,這個插入協程數跟cpu核心數的關系比較大,結合我的服務實際情況,因為在安裝VictoriaMetrics時,使用的是operator部署,給他配置設定的cpu限制到了非常小的核心數,這樣就會導緻我的VictoriaMetrics會因為在太多的Prometheus remotewrite而逾時丢棄資訊,進而給出報錯cannot handle more than .... possible solutions .... increase .....

繼續閱讀