前言
上周六馬上就下班了,正興高采烈的想着下班吃什麼呢!突然QA找到我,說我們的DB與es無法同步資料了,真是令人頭皮發秃,好不容易休一天,啊啊啊,難受呀,沒辦法,還是趕緊找吧。下面我就把我這次的
bug
原因分享給大家,避免踩坑~。
bug
bug原因之 bulk
隐藏錯誤資訊
bulk
第一時間,我去看了一下錯誤日志,竟然沒有錯誤日志,很是神奇,既然這樣,那我們就
DEBUG
一下吧,
DEBUG
之前我先貼一段代碼:
func (es *UserES) batchAdd(ctx context.Context, user []*model.UserEs) error {
req := es.client.Bulk().Index(es.index)
for _, u := range user {
u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
u.CreateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
doc := elastic.NewBulkIndexRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
req.Add(doc)
}
if req.NumberOfActions() < 0 {
return nil
}
if _, err := req.Do(ctx); err != nil {
return err
}
return nil
}
就是上面這段代碼,使用
es
的
bulk
批量操作,經過
DEBUG
仍然沒有發現任何問題,卧槽!!!沒有頭緒了,那就看一看
es
源碼吧,裡面是不是有什麼隐藏的點沒有注意到。還真被我找到了,我們先看一下
req.Do(ctx)
的實作:
// Do sends the batched requests to Elasticsearch. Note that, when successful,
// you can reuse the BulkService for the next batch as the list of bulk
// requests is cleared on success.
func (s *BulkService) Do(ctx context.Context) (*BulkResponse, error) {
/**
...... 省略部分代碼
**/
// Get response
res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
Method: "POST",
Path: path,
Params: params,
Body: body,
ContentType: "application/x-ndjson",
Retrier: s.retrier,
Headers: s.headers,
})
if err != nil {
return nil, err
}
// Return results
ret := new(BulkResponse)
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}
// Reset so the request can be reused
s.Reset()
return ret, nil
}
我隻把重要部分代碼貼出來,看這一段就好了,我來解釋一下:
- 首先建構
請求Http
- 發送
請求并分析,并解析Http
response
- 重置
可以重複使用request
這裡的重點就是
ret := new(BulkResponse)
,
new
了一個
BulkResponse
結構,他的結構如下:
type BulkResponse struct {
Took int `json:"took,omitempty"`
Errors bool `json:"errors,omitempty"`
Items []map[string]*BulkResponseItem `json:"items,omitempty"`
}
// BulkResponseItem is the result of a single bulk request.
type BulkResponseItem struct {
Index string `json:"_index,omitempty"`
Type string `json:"_type,omitempty"`
Id string `json:"_id,omitempty"`
Version int64 `json:"_version,omitempty"`
Result string `json:"result,omitempty"`
Shards *ShardsInfo `json:"_shards,omitempty"`
SeqNo int64 `json:"_seq_no,omitempty"`
PrimaryTerm int64 `json:"_primary_term,omitempty"`
Status int `json:"status,omitempty"`
ForcedRefresh bool `json:"forced_refresh,omitempty"`
Error *ErrorDetails `json:"error,omitempty"`
GetResult *GetResult `json:"get,omitempty"`
}
先來解釋一個每個字段的意思:
-
:總共耗費了多長時間,機關是毫秒took
-
:如果其中任何子請求失敗,該 Errors
标志被設定為 errors
,并且在相應的請求報告出錯誤明細(看下面的Items解釋)true
-
:這個裡就是存儲每一個子請求的Items
,這裡的response
存儲的是詳細的錯誤資訊Error
現在我想大家應該知道為什麼我們的代碼沒有報
err
資訊了,
bulk
的每個請求都是獨立的執行,是以某個子請求的失敗不會對其他子請求的成功與否造成影響,是以其中某一條出現錯誤我們需要從
BulkResponse
解出來。現在我們把代碼改正确:
func (es *UserES) batchAdd(ctx context.Context, user []*model.UserEs) error {
req := es.client.Bulk().Index(es.index)
for _, u := range user {
u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
u.CreateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
doc := elastic.NewBulkIndexRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
req.Add(doc)
}
if req.NumberOfActions() < 0 {
return nil
}
res, err := req.Do(ctx)
if err != nil {
return err
}
// 任何子請求失敗,該 `errors` 标志被設定為 `true` ,并且在相應的請求報告出錯誤明細
// 是以如果沒有出錯,說明全部成功了,直接傳回即可
if !res.Errors {
return nil
}
for _, it := range res.Failed() {
if it.Error == nil {
continue
}
return &elastic.Error{
Status: it.Status,
Details: it.Error,
}
}
return nil
}
這裡再解釋一下
res.Failed
方法,這裡會把
items
中
bulk response
帶錯誤的傳回,是以在這裡面找錯誤資訊就可以了。
至此,這個
bug
原因終于被我找到了,接下來可以看下一個
bug
了,我們先簡單總結一下:
bulk
API 允許在單個步驟中進行多次
create
、
index
、
update
或
delete
請求,每個子請求都是獨立執行,是以某個子請求的失敗不會對其他子請求的成功與否造成影響。
bulk
的response結構中
Erros
字段,如果其中任何子請求失敗,該
errors
标志被設定為
true
,并且在相應的請求報告出錯誤明細,
items
字段是一個數組,,這個數組的内容是以請求的順序列出來的每個請求的結果。是以在使用
bulk
時一定要從
response
中判斷是否有
err
。
bug原因之數值範圍越界
這裡完全是自己使用不當造成,但還是想說一說
es
的映射數字類型範圍的問題:
數字類型有如下分類:
類型 | 說明 |
byte | 有符号的8位整數, 範圍: [-128 ~ 127] |
short | 有符号的16位整數, 範圍: [-32768 ~ 32767] |
integer | 有符号的32位整數, 範圍: [−231−231 ~ 231231-1] |
long | 有符号的64位整數, 範圍: [−263−263 ~ 263263-1] |
float | 32位單精度浮點數 |
double | 64位雙精度浮點數 |
half_float | 16位半精度IEEE 754浮點類型 |
scaled_float | 縮放類型的的浮點數, 比如price字段隻需精确到分, 57.34縮放因子為100, 存儲結果為5734 |
這裡都是有符号類型的,無符号在
es7.10.1
版本才開始支援,有興趣的同學戳這裡。
這裡把這些數字類型及範圍列出來就是友善說我的
bug
原因,這裡直接解釋一下:
我在DB設定字段的類型是
tinyint unsigned
,
tinyint
是一個位元組存儲,無符号的話範圍是
0-255
,而我在
es
中映射類型選擇的是
byte
,範圍是
-128~127
,當DB中數值超過這個範圍是,在進行同步時就會出現這個問題,這裡需要大家注意一下數值範圍的問題,不要像我一樣,因為這個還排查了好久的
bug
,有些空間沒必要省,反正也占不了多少空間。
總結
這篇文章就是簡單總結一下我在工作中遇到的問題,發表出來就是給大家提個醒,有人踩過的坑,就不要在踩了,浪費時間!!!!