天天看點

mongo-connector實作MongoDB與elasticsearch實時同步深入詳解

引言:

驗證表明:mongo-connector工具支援MongoDB與ES之間的實時增insert、删delete、改update操作。

對于曆史資料,mongo-connector工具不能同步到ES中,根因是本身工具不支援(初步界定),還是沒有這種場景,待查(進一步研究後再更新)。

1. mongo-connector 位址:

https://github.com/mongodb-labs/mongo-connector

2、 mongo-connector 工具簡介

mongo-connector工具建立一個從MongoDB簇到一個或多個目标系統的管道,目标系統包括:Solr,Elasticsearch,或MongoDB簇。

該工具在MongoDB與目标系統間同步資料,并跟蹤MongoDB的oplog,保持操作與MongoDB的實時同步。

該工具已經在python2.6,2.7,3.3+下進行驗證。

mongo-connector工具是基于python開發的實時同步服務工具。它要求mongo運作在replica-set模式,且需要 elastic2_doc_manager将資料寫入ES。

mongo-connector實作MongoDB與elasticsearch實時同步深入詳解

3、 elastic2-doc-manager 工具簡介

這是Elastic2.x版本的文檔管理器。對應Elastic1.x版本需要使用 elastic-doc-manager。

4、ES與MongoDB同步步驟:

####(1)安裝 mongo-connector。

pip install mongo-connector           

(2)安裝 elastic2-doc-manager。

pip install elastic2-doc-manager           

注意:

如果不安裝(2)直接進入(3)、(4)則會報錯:

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager
Logging to mongo-connector.log.
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
  self.run()           

(3)mongo端啟動

MongoDB 必須開啟複制集,如果已經開啟請忽略這一步:

1)通過 –replSet 設定副本集名稱。
[root@b48eafd69929 bin]# ./mongod --replSet "rs0"           

2)将mongo與副本內建員連接配接

[root@b48eafd69929 bin]# ./mongo
MongoDB shell version: 3.2.4
connecting to: test
Server has startup warnings:
2016-07-05T09:49:01.330+0100 I CONTROL [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
2016-07-05T09:49:01.330+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten] ** WARNING: You are running on a NUMA machine.
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten] ** We suggest launching mongod like this to avoid performance problems:
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** numactl --interleave=all mongod [other options]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/enabled is 'always'.
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/defrag is 'always'.
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]           

3)初始化副本集

> rs.initiate()
{
  "info2" : "no configuration specified. Using a default configuration for the set",
  "me" : "b48eafd69929:27017",
  "ok" : 1
}           

4)【驗證】初始化副本集的配置

rs0:SECONDARY> rs.conf()
{
  "_id" : "rs0",
  "version" : 1,
  "protocolVersion" : NumberLong(1),
  "members" : [
  {
  "_id" : 0,
  "host" : "b48eafd69929:27017",
  "arbiterOnly" : false,
  "buildIndexes" : true,
  "hidden" : false,
  "priority" : 1,
  "tags" : {

  },
  "slaveDelay" : NumberLong(0),
  "votes" : 1
  }
  ],
  "settings" : {
  "chainingAllowed" : true,
  "heartbeatIntervalMillis" : 2000,
  "heartbeatTimeoutSecs" : 10,
  "electionTimeoutMillis" : 10000,
  "getLastErrorModes" : {

  },
  "getLastErrorDefaults" : {
  "w" : 1,
  "wtimeout" : 0
  },
  "replicaSetId" : ObjectId("577b74bd0ba41a313110ad62")
  }
}           

5)【驗證】副本集的狀态。

rs0:PRIMARY> rs.status()
{
  "set" : "rs0",
  "date" : ISODate("2016-07-05T08:50:55.272Z"),
  "myState" : 1,
  "term" : NumberLong(1),
  "heartbeatIntervalMillis" : NumberLong(2000),
  "members" : [
  {
  "_id" : 0,
  "name" : "b48eafd69929:27017",
  "health" : 1,
  "state" : 1,
  "stateStr" : "PRIMARY",
  "uptime" : 115,
  "optime" : {
  "ts" : Timestamp(1467708606, 1),
  "t" : NumberLong(1)
  },
  "optimeDate" : ISODate("2016-07-05T08:50:06Z"),
  "infoMessage" : "could not find member to sync from",
  "electionTime" : Timestamp(1467708605, 2),
  "electionDate" : ISODate("2016-07-05T08:50:05Z"),
  "configVersion" : 1,
  "self" : true
  }
  ],
  "ok" : 1
}           

(4)ES端同步操作

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager
Logging to mongo-connector.log.           

參數含義:

-m: mongodb的位址與端口,端口預設為27017。

-t:ES的位址與端口,端口預設為9200。

-d:doc manager的名稱,2.x版本為: elastic2-doc-manager。

5、ES與MongoDB Insert插入操作的同步驗證

(1)Mongo端插入資料操作:

#Mongo建立資料庫(對應ES的Index)
rs0:PRIMARY> use zhang_index
switched to db zhang_index

#Mongo中插入資料(其中col_02對應ES中的Type)
rs0:PRIMARY> db.col_02.insert({name:"laoluo", birth:"1964-03-21", sex:"man", company:"chuizi"});
WriteResult({ "nInserted" : 1 })
rs0:PRIMARY> db.col_02.insert({name:"renzhengfei", birth:"1954-03-21", sex:"man", company:"huawei"});           

(2)Es端檢索驗證

[root@5b9dbaaa148a test_log]# curl -XGET

http://10.8.5.101:9200/zhang_index/col_02/_search?pretty

{

"took" : 4,

"timed_out" : false,

"_shards" : {

"total" : 8,

"successful" : 8,

"failed" : 0

},

"hits" : {

"total" : 2,

"max_score" : 1.0,

"hits" : [ {

"_index" : "zhang_index",

"_type" : "col_02",

"_id" : "577b7d8ceb8e3dc2d1db12a9",

"_score" : 1.0,

"_source" : {

"company" : "huawei",

"name" : "renzhengfei",

"birth" : "1954-03-21",

"sex" : "man"

}

}, {

"_id" : "577b7d4aeb8e3dc2d1db12a7",

"company" : "chuizi",

"name" : "laoluo",

"birth" : "1964-03-21",

} ]

}

##6、 ES與MongoDB Update更新操作的同步驗證

(1)MongoDB的更新update操作
rs0:PRIMARY> db.col_02.update({'name':'laoluo'}, {$set:{'name':'luoyonghao'}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
rs0:PRIMARY>
rs0:PRIMARY> db.col_02.find().pretty()
{
  "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"),
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man",
  "company" : "chuizi"
}
{
  "_id" : ObjectId("577b7d8ceb8e3dc2d1db12a9"),
  "name" : "renzhengfei",
  "birth" : "1954-03-21",
  "sex" : "man",
  "company" : "huawei"
}           

(2)Es端檢索更新後結果

[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 2,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d8ceb8e3dc2d1db12a9",
  "_score" : 1.0,
  "_source" : {
  "company" : "huawei",
  "name" : "renzhengfei",
  "birth" : "1954-03-21",
  "sex" : "man"
  }
  }, {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d4aeb8e3dc2d1db12a7",
  "_score" : 1.0,
  "_source" : {
  "company" : "chuizi",
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man"
  }
  } ]
  }
}           

7、 ES與MongoDB delete删除操作的同步驗證

(1) MongoDB的删除delete操作

rs0:PRIMARY> db.col_02.remove({'name':'renzhengfei'})
WriteResult({ "nRemoved" : 1 })
rs0:PRIMARY> db.col_02.find()
{ "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man", "company" : "chuizi" }
rs0:PRIMARY> db.col_02.find().pretty()
{
  "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"),
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man",
  "company" : "chuizi"
}           

(2)ES端檢索删除後結果

結果表明,MongoDB删除的内容,ES端已經同步删除。

[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 1,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d4aeb8e3dc2d1db12a7",
  "_score" : 1.0,
  "_source" : {
  "company" : "chuizi",
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man"
  }
  } ]
  }
}           
mongo-connector實作MongoDB與elasticsearch實時同步深入詳解

參見詳細介紹:

https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

Mongo與ES同步的5種方式:

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao

常見Bug:

How to setup a MongoDB replica set for the connector?

作者:銘毅天下

轉載請标明出處,原文位址:

http://blog.csdn.net/laoyang360/article/details/51842822