天天看點

踩坑日志之elasticSearch

前言

上周六馬上就下班了,正興高采烈的想着下班吃什麼呢!突然QA找到我,說我們的DB與es無法同步資料了,真是令人頭皮發秃,好不容易休一天,啊啊啊,難受呀,沒辦法,還是趕緊找​

​bug​

​​吧。下面我就把我這次的​

​bug​

​原因分享給大家,避免踩坑~。

bug原因之​

​bulk​

​隐藏錯誤資訊

第一時間,我去看了一下錯誤日志,竟然沒有錯誤日志,很是神奇,既然這樣,那我們就​

​DEBUG​

​​一下吧,​

​DEBUG​

​之前我先貼一段代碼:

func (es *UserES) batchAdd(ctx context.Context, user []*model.UserEs) error {
 req := es.client.Bulk().Index(es.index)
 for _, u := range user {
  u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
  u.CreateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
  doc := elastic.NewBulkIndexRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
  req.Add(doc)
 }
 if req.NumberOfActions() < 0 {
  return nil
 }
 if _, err := req.Do(ctx); err != nil {
  return err
 }
 return nil
}      

就是上面這段代碼,使用​

​es​

​​的​

​bulk​

​​批量操作,經過​

​DEBUG​

​​仍然沒有發現任何問題,卧槽!!!沒有頭緒了,那就看一看​

​es​

​​源碼吧,裡面是不是有什麼隐藏的點沒有注意到。還真被我找到了,我們先看一下​

​req.Do(ctx)​

​的實作:

// Do sends the batched requests to Elasticsearch. Note that, when successful,
// you can reuse the BulkService for the next batch as the list of bulk
// requests is cleared on success.
func (s *BulkService) Do(ctx context.Context) (*BulkResponse, error) {
 /**
 ...... 省略部分代碼
  **/
 // Get response
 res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
  Method:      "POST",
  Path:        path,
  Params:      params,
  Body:        body,
  ContentType: "application/x-ndjson",
  Retrier:     s.retrier,
  Headers:     s.headers,
 })
 if err != nil {
  return nil, err
 }

 // Return results
 ret := new(BulkResponse)
 if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  return nil, err
 }

 // Reset so the request can be reused
 s.Reset()

 return ret, nil
}      

我隻把重要部分代碼貼出來,看這一段就好了,我來解釋一下:

  • 首先建構​

    ​Http​

    ​請求
  • 發送​

    ​Http​

    ​請求并分析,并解析​

    ​response​

  • 重置​

    ​request​

    ​可以重複使用

這裡的重點就是​

​ret := new(BulkResponse)​

​​,​

​new​

​​了一個​

​BulkResponse​

​結構,他的結構如下:

type BulkResponse struct {
 Took   int                            `json:"took,omitempty"`
 Errors bool                           `json:"errors,omitempty"`
 Items  []map[string]*BulkResponseItem `json:"items,omitempty"`
}
// BulkResponseItem is the result of a single bulk request.
type BulkResponseItem struct {
 Index         string        `json:"_index,omitempty"`
 Type          string        `json:"_type,omitempty"`
 Id            string        `json:"_id,omitempty"`
 Version       int64         `json:"_version,omitempty"`
 Result        string        `json:"result,omitempty"`
 Shards        *ShardsInfo   `json:"_shards,omitempty"`
 SeqNo         int64         `json:"_seq_no,omitempty"`
 PrimaryTerm   int64         `json:"_primary_term,omitempty"`
 Status        int           `json:"status,omitempty"`
 ForcedRefresh bool          `json:"forced_refresh,omitempty"`
 Error         *ErrorDetails `json:"error,omitempty"`
 GetResult     *GetResult    `json:"get,omitempty"`
}      

先來解釋一個每個字段的意思:

  • ​took​

    ​:總共耗費了多長時間,機關是毫秒
  • ​Errors​

    ​​:如果其中任何子請求失敗,該 ​

    ​errors​

    ​ 标志被設定為 ​

    ​true​

    ​ ,并且在相應的請求報告出錯誤明細(看下面的Items解釋)
  • ​Items​

    ​​:這個裡就是存儲每一個子請求的​

    ​response​

    ​,這裡的​

    ​Error​

    ​存儲的是詳細的錯誤資訊

現在我想大家應該知道為什麼我們的代碼沒有報​

​err​

​​資訊了,​

​bulk​

​的每個請求都是獨立的執行,是以某個子請求的失敗不會對其他子請求的成功與否造成影響,是以其中某一條出現錯誤我們需要從​

​BulkResponse​

​解出來。現在我們把代碼改正确:

func (es *UserES) batchAdd(ctx context.Context, user []*model.UserEs) error {
 req := es.client.Bulk().Index(es.index)
 for _, u := range user {
  u.UpdateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
  u.CreateTime = uint64(time.Now().UnixNano()) / uint64(time.Millisecond)
  doc := elastic.NewBulkIndexRequest().Id(strconv.FormatUint(u.ID, 10)).Doc(u)
  req.Add(doc)
 }
 if req.NumberOfActions() < 0 {
  return nil
 }
 res, err := req.Do(ctx)
 if err != nil {
  return err
 }
 // 任何子請求失敗,該 `errors` 标志被設定為 `true` ,并且在相應的請求報告出錯誤明細
 // 是以如果沒有出錯,說明全部成功了,直接傳回即可
 if !res.Errors {
  return nil
 }
 for _, it := range res.Failed() {
  if it.Error == nil {
   continue
  }
  return &elastic.Error{
   Status:  it.Status,
   Details: it.Error,
  }
 }
 return nil
}      

這裡再解釋一下​

​res.Failed​

​​方法,這裡會把​

​items​

​​中​

​bulk response​

​帶錯誤的傳回,是以在這裡面找錯誤資訊就可以了。

至此,這個​

​bug​

​​原因終于被我找到了,接下來可以看下一個​

​bug​

​了,我們先簡單總結一下:

​bulk​

​ API 允許在單個步驟中進行多次 ​

​create​

​ 、 ​

​index​

​ 、 ​

​update​

​ 或 ​

​delete​

​ 請求,每個子請求都是獨立執行,是以某個子請求的失敗不會對其他子請求的成功與否造成影響。​

​bulk​

​的response結構中​

​Erros​

​字段,如果其中任何子請求失敗,該 ​

​errors​

​ 标志被設定為 ​

​true​

​ ,并且在相應的請求報告出錯誤明細,​

​items​

​字段是一個數組,,這個數組的内容是以請求的順序列出來的每個請求的結果。是以在使用​

​bulk​

​時一定要從​

​response​

​中判斷是否有​

​err​

​。

bug原因之數值範圍越界

這裡完全是自己使用不當造成,但還是想說一說​

​es​

​的映射數字類型範圍的問題:

數字類型有如下分類:

類型 說明
byte 有符号的8位整數, 範圍: [-128 ~ 127]
short 有符号的16位整數, 範圍: [-32768 ~ 32767]
integer 有符号的32位整數, 範圍: [−231−231 ~ 231231-1]
long 有符号的64位整數, 範圍: [−263−263 ~ 263263-1]
float 32位單精度浮點數
double 64位雙精度浮點數
half_float 16位半精度IEEE 754浮點類型
scaled_float 縮放類型的的浮點數, 比如price字段隻需精确到分, 57.34縮放因子為100, 存儲結果為5734

這裡都是有符号類型的,無符号在​

​es7.10.1​

​版本才開始支援,有興趣的同學戳這裡。

這裡把這些數字類型及範圍列出來就是友善說我的​

​bug​

​原因,這裡直接解釋一下:

我在DB設定字段的類型是​

​tinyint unsigned​

​​,​

​tinyint​

​​是一個位元組存儲,無符号的話範圍是​

​0-255​

​​,而我在​

​es​

​​中映射類型選擇的是​

​byte​

​​,範圍是​

​-128~127​

​​,當DB中數值超過這個範圍是,在進行同步時就會出現這個問題,這裡需要大家注意一下數值範圍的問題,不要像我一樣,因為這個還排查了好久的​

​bug​

​,有些空間沒必要省,反正也占不了多少空間。

總結

這篇文章就是簡單總結一下我在工作中遇到的問題,發表出來就是給大家提個醒,有人踩過的坑,就不要在踩了,浪費時間!!!!

繼續閱讀