天天看點

64_elasticSearch 基于document鎖實作悲觀鎖并發控制64_基于document鎖實作悲觀鎖并發控制

64_基于document鎖實作悲觀鎖并發控制

更多幹貨

  • 分布式實戰(幹貨)
  • spring cloud 實戰(幹貨)
  • mybatis 實戰(幹貨)
  • spring boot 實戰(幹貨)
  • React 入門實戰(幹貨)
  • 建構中小型網際網路企業架構(幹貨)
  • python 學習持續更新
  • ElasticSearch 筆記

概述

1、對document level鎖,詳細的講解

  • 全局鎖,一次性就鎖整個index,對這個index的所有增删改操作都會被block住,如果上鎖不頻繁,還可以,比較簡單
  • 細粒度的一個鎖,document鎖,顧名思義,每次就鎖你要操作的,你要執行增删改的那些doc,doc鎖了,其他線程就不能對這些doc執行增删改操作了
  • 但是你隻是鎖了部分doc,其他線程對其他的doc還是可以上鎖和執行增删改操作的

document鎖,是用腳本進行上鎖

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
           
  • /fs/lock,是固定的,就是說fs下的lock type,專門用于進行上鎖
  • /fs/lock/id,比如1,id其實就是你要上鎖的那個doc的id,代表了某個doc資料對應的lock(也是一個doc)
  • update + upsert:執行upsert操作

params,裡面有個process_id,process_id,是你的要執行增删改操作的程序的唯一id,比如說可以在java系統,啟動的時候,給你的每個線程都用UUID自動生成一個thread id,你的系統程序啟動的時候給整個程序也配置設定一個UUID。process_id + thread_id就代表了某一個程序下的某個線程的唯一辨別。可以自己用UUID生成一個唯一ID

process_id很重要,會在lock中,設定對對應的doc加鎖的程序的id,這樣其他程序過來的時候,才知道,這條資料已經被别人給鎖了

assert false,不是目前程序加鎖的話,則抛出異常
ctx.op='noop',不做任何修改
           

如果該document之前沒有被鎖,/fs/lock/1之前不存在,也就是doc id=1沒有被别人上過鎖; upsert的文法,那麼執行index操作,建立一個/fs/lock/id這條資料,而且用params中的資料作為這個lock的資料。process_id被設定為123,script不執行。這個時候象征着process_id=123的程序已經鎖了一個doc了。

如果document被鎖了,就是說/fs/lock/1已經存在了,代表doc id=1已經被某個程序給鎖了。那麼執行update操作,script,此時會比對process_id,如果相同,就是說,某個程序,之前鎖了這個doc,然後這次又過來,就可以直接對這個doc執行操作,說明是該程序之前鎖的doc,則不報錯,不執行任何操作,傳回success; 如果process_id比對不上,說明doc被其他doc給鎖了,此時報錯

/fs/lock/1
{
  "process_id": 123
}
           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
           
  • script:ctx._source.process_id,123
  • process_id:加鎖的upsert請求中帶過來額proess_id
  • 如果兩個process_id相同,說明是一個程序先加鎖,然後又過來嘗試加鎖,可能是要執行另外一個操作,此時就不會block,對同一個process_id是不會block,ctx.op= 'noop',什麼都不做,傳回一個success
  • 如果說已經有一個程序加了鎖了
/fs/lock/1
{
  "process_id": 123
}

           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 234
  }
}
           
"script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
           
ctx._source.process_id:123
process_id: 234
           
  • process_id不相等,說明這個doc之前已經被别人上鎖了,process_id=123上鎖了; process_id=234過來再次嘗試上鎖,失敗,assert false,就會報錯
  • 此時遇到報錯的process,就應該嘗試重新上鎖,直到上鎖成功
  • 有報錯的話,如果有些doc被鎖了,那麼需要重試
  • 直到所有鎖定都成功,執行自己的操作。
  • 釋放所有的鎖

2、上document鎖的完整實驗過程

scripts/judge-lock.groovy: if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';
           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
           
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
           
GET /fs/lock/1
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "found": true,
  "_source": {
    "process_id": 123
  }
}
           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 234 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 234
    }
  }
}
           
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}
           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
           
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "noop",
  "_shards": {
    "total": 0,
    "successful": 0,
    "failed": 0
  }
}
           
POST /fs/file/1/_update
{
  "doc": {
    "name": "README1.txt"
  }
}
           
{
  "_index": "fs",
  "_type": "file",
  "_id": "1",
  "_version": 4,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
           
POST /fs/_refresh 
           
GET /fs/lock/_search?scroll=1m
{
  "query": {
    "term": {
      "process_id": 123
    }
  }
}
           
{
  "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACPkFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAj5RY0b25zVFlWWlRqR3ZJajlfc3BXejJ3AAAAAAAAI-YWNG9uc1RZVlpUakd2SWo5X3NwV3oydwAAAAAAACPnFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAj6BY0b25zVFlWWlRqR3ZJajlfc3BXejJ3",
  "took": 51,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "fs",
        "_type": "lock",
        "_id": "1",
        "_score": 1,
        "_source": {
          "process_id": 123
        }
      }
    ]
  }
}
           
PUT /fs/lock/_bulk
{ "delete": { "_id": 1}}
           
{
  "took": 20,
  "errors": false,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "fs",
        "_type": "lock",
        "_id": "1",
        "_version": 2,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}
           
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 234 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 234
    }
  }
}
           

process_id=234上鎖就成功了

相關文章

  • ElasticSearch 筆記
  • 1_ElasticSearch使用term filter來搜尋資料
  • 2_ElasticSearch filter執行原理 bitset機制與caching機制
  • 3_ElasticSearch 基于bool組合多個filter條件來搜尋資料
  • 4_ElasticSearch 使用terms搜尋多個值
  • 5_ElasticSearch 基于range filter來進行範圍過濾
  • 6_ElasticSearch 控制全文檢索結果的精準度
  • 7_ElasticSearch term+bool實作的multiword搜尋原理
  • 8_基于boost的搜尋條件權重控制
  • 9_ElasticSearch 多shard場景下relevance score不準确
  • 10_ElasticSearch dis_max實作best fields政策進行多字段搜尋
  • 11_ElasticSearch 基于tie_breaker參數優化dis_max搜尋效果
  • 12_ElasticSearch multi_match文法實作dis_max+tie_breaker
  • 13_ElasticSearch multi_match+most fiels政策進行multi-field搜尋
  • 14_ElasticSearch 使用most_fields政策進行cross-fields search
  • 15_ElasticSearch copy_to定制組合field進行cross-fields搜尋
  • 16_ElasticSearch 使用原生cross-fiels 查詢
  • 17_ElasticSearch phrase matching搜尋
  • 18_ElasticSearch 基于slop參數實作近似比對
  • 19_ElasticSearch 使用match和近似比對實作召回率與精準度的平衡
  • 20_ElasticSearch rescoring機制優化近似比對搜尋的性能
  • 21_ElasticSearch 字首搜尋、通配符搜尋、正則搜尋
  • 22_ElasticSearch 搜尋推薦match_phrase_prefix實作search-time
  • 23_ElsaticSearch 搜尋推薦ngram分詞機制實作index-time更多幹貨
  • 24_ElasticSearch TF&IDF算法以及向量空間模型
  • 25_ElasticSearch 揭秘lucene的相關度分數算法
  • 26_ElasticSearch 四種常見的相關度分數優化方法
  • 27_ElasticSearch用function_score自定義相關度分數算法
  • 28_ElasticSearch誤拼寫時的fuzzy模糊搜尋技術
  • 29_ElasticSearchIK中文分詞器的安裝和使用
  • 30_ElasticSearch IK分詞器配置檔案 以及自定義詞庫
  • ElasticSearchIK中文分詞器的安裝和使用
  • 日志管理ELK

繼續閱讀