1.前言
之前不是在搞VictoriaMetrics嘛,自己在公網有兩台2C4G的機器,就跑個VictoriaMetrics叢集上去,但是我上面不止跑這玩意,結果就把VictoriaMetrics叢集跑挂了,報了什麼 cannot handle more than .... possible solutions .... increase
-insert.maxQueueDuration
, increase
-maxConcurrentInserts
, increase server capacity。雖然不知道就是隊列滿了,要等其他請求完成,但是作為一個開發,還是得本着知其然知其是以然,是以繼續fake source code吧。
2.定位核心代碼
直接拷貝源碼搜尋這段相關文本,很快就可以定位到相關位置了
/lib/writeconcurrencylimiter/concurrencylimiter.go#Do
可以看到這段報錯其實就在一個select..case...當中,上面相關的注釋也說明了:所有任務都在忙中,等待maxQueueDuration時長,如果在maxQueueDuration時間内沒有擷取到隊列可以處理資源,那麼就丢棄請求并傳回錯誤資訊。
// Do calls f with the limited concurrency.
func Do(f func() error) error {
// Limit the number of conurrent f calls in order to prevent from excess
// memory usage and CPU thrashing.
select {
// 往通道寫入一個資料,辨別着開始處理一個請求
case ch <- struct{}{}:
// 阻塞進入等待請求完成
err := f()
// 請求處理完之後釋放通道内的一個資料,釋放出來之後才可以接收多一個任務
<-ch
return err
default:
}
// All the workers are busy.
// Sleep for up to *maxQueueDuration.
// 達到處理的最大數,需等其他Do(f func() error)釋放資源
concurrencyLimitReached.Inc()
// 擷取time.Timer,設定逾時等待時間為maxQueueDuration
t := timerpool.Get(*maxQueueDuration)
select {
// 在maxQueueDuration有其他請求釋放了資源
case ch <- struct{}{}:
// 回收time.Timer
timerpool.Put(t)
// 繼續處理任務
err := f()
<-ch
return err
// 逾時
case <-t.C:
// 回收time.Timer
timerpool.Put(t)
// pacelimiter(步長限制器)
concurrencyLimitTimeout.Inc()
// 逾時資訊提示
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot handle more than %d concurrent inserts during %s; possible solutions: "+
"increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity", *maxConcurrentInserts, *maxQueueDuration),
StatusCode: http.StatusServiceUnavailable,
}
}
}
上面的這個方法是在
concurrencylimiter.go
當中的,這個看名字跟看源碼,也沒具體業務,是以我們看下有什麼調用了這個func
可以看到非常多的方法都調用了,我們可以看下限速器是用那個value,看看這個是限速什麼的并發。
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
可以看到這個是給vminsert的使用的,
VictoriaMetrics
當中,
vminsert
負責接收
vmagent
的流量并将其轉發到vmstorage當中,在
vmstorage
出錯、處理過慢、卡死的情況下,有可能會導緻無法轉發流量,進而造成
vminsert
的CPU和記憶體飙升,為了防止
vminsert
過載導緻元件故障,
vminsert
引入了限速器,在犧牲一部分資料的情況下保證了系統的正常運作。
但是!上面雖然那麼多會調用這個方法,實際上我隻有Prometheus的RemoteWrite
那麼問題肯定是出現在
vmstorage
上面了。
3. 鍊路分析
上面是promeremotewrite的調用鍊,到Storage.AddRows時,有一處并發控制,也就是
addRowsConcurrencyCh
而這個變量的初始化我們可以看看
var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
addRowsTimeout = 30 * time.Second
)
可以看到這裡是根據CPU的核數來限制并發的,假設有3個核心,則寫入操作最多3個并發。
// AddRows adds the given mrs to s.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call AddRows.
select {
// 如果可以寫入addRowsConcurrencyCh,證明隊列此時還沒滿,跳過目前的select
case addRowsConcurrencyCh <- struct{}{}:
// 無法寫入addRowsConcurrencyCh,此時并發數已經超過CPU的核數了
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
// 這裡擷取到一個time.Timer來控制逾時
t := timerpool.Get(addRowsTimeout)
// Prioritize data ingestion over concurrent searches.
// 依然是pacelimiter來進行計數
storagepacelimiter.Search.Inc()
select {
// 如果在addRowsTimeout時間内,入隊成功
case addRowsConcurrencyCh <- struct{}{}:
// 歸還time.Timer對象
timerpool.Put(t)
// 計數器減一,然後會一直減1,直到等待數量為0了,調用cond.Broadcast()來通知select工作,可以進去Dec()看看cond.Broadcast()
storagepacelimiter.Search.Dec()
// 等待了addRowsTimeout時間後仍然沒有CPU資源,隻能報錯
case <-t.C:
timerpool.Put(t)
storagepacelimiter.Search.Dec()
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
}
}
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
// 這一步才開始真正的資料插入邏輯
var firstErr error
ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
mrsBlock := mrs
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh
return firstErr
}
4. 總結
到上面那一步就可以發現,這個插入協程數跟cpu核心數的關系比較大,結合我的服務實際情況,因為在安裝VictoriaMetrics時,使用的是operator部署,給他配置設定的cpu限制到了非常小的核心數,這樣就會導緻我的VictoriaMetrics會因為在太多的Prometheus remotewrite而逾時丢棄資訊,進而給出報錯cannot handle more than .... possible solutions .... increase .....