天天看點

實時資料交換平台 - BottledWater-pg with confluent

postgresql , bottled water , kafka , confluent , iot

想必大家都在圖書館借過書,小時候有好看的書也會在小夥伴之間傳閱。

借書和資料泵有點類似,一份資料通過資料泵實時的分享給訂閱者。

例如在iot的場景中,有流式分析的需求,也有存儲曆史資料的需求,同時還有資料挖掘的需求,搜尋引擎可能也需要同一份資料,還有一些業務可能也要用到同一份資料。

但是如果把資料統統放到一個地方,這麼多的業務,它們有的要求實時處理,有的要求批量處理,有的可能需要實時的更新資料,有的可能要對大資料進行分析。

顯然一個産品可能無法滿足這麼多的需求。

就好比資料庫就分了關系資料庫,nosql,oltp場景,olap場景一樣。 也是因為一個産品無法滿足所有的業務需求。

在企業中通常是借助資料備援來解決各類場景的需求。

那麼如何才能夠更好的分享資料,保證資料的一緻性,提高分享的實時性呢?

<a href="http://docs.confluent.io/3.1.0/platform.html">http://docs.confluent.io/3.1.0/platform.html</a>

實時資料交換平台 - BottledWater-pg with confluent

confluent 是一個實時的資料中轉服務,來自各個平台的資料可以使用confluent進行流轉,達到分享和交換資料的目的。

例如來自物聯網傳感器的資料,來自資料庫的資料,來自http,移動app的資料,來自應用日志的資料,來自一些事件觸發的資料 等等。

confluent需要依賴一些基本的元件,核心元件如kafka.

使用者可以自定義消息的生産者和消費者,在confluent提供的平台上交換資料。

實時資料交換平台 - BottledWater-pg with confluent
實時資料交換平台 - BottledWater-pg with confluent
實時資料交換平台 - BottledWater-pg with confluent
實時資料交換平台 - BottledWater-pg with confluent

bottledwater-pg是confluent平台的一種消息生産者,針對postgresql資料庫,即将postgresql資料庫的資料寫入confluent kafka,進而實時的分享給消息訂閱者。

支援postgresql 9.4以及以上版本,支援全量快照,以及持續的增量資料寫入kafka。

bottledwater-pg使用postgresql快照技術,可以讀取一緻性的快照寫入kafka。使用資料庫logical decode技術,從postgresql的wal日志中,解析為row資料寫入kafka。

在kafka中,每個topic代表一張資料庫的表。

資料在使用decode從wal取出後,寫入kafka之前,使用avro将資料row打包成json, or protobuf, or thrift, or any number of formats,再寫入kafka。

avro支援的資料類型比較豐富,可以很好的支撐postgresql豐富的資料類型。

為什麼使用avro請參考

<a href="http://radar.oreilly.com/2014/11/the-problem-of-managing-schemas.html">http://radar.oreilly.com/2014/11/the-problem-of-managing-schemas.html</a>

bottledwater-pg是pg的一個插件,它的目的是解析wal,同時使用avro封裝為json/protobuf/thrift/其他formats。 并寫入kafka。

是以它依賴這些庫或軟體

最好部署較新版本的,否則可能會有編譯問題。

可選,一種比較高效的壓縮和解壓縮庫。

由于avro還支援xz,可不安裝snappy

json parser,必須安裝,建議測試時安裝在預設目錄,否則可能遇到編譯問題,或者設定rpath。

建議測試時安裝在預設路徑中,如下。

可以不裝,如果你要安裝avro的doc時才需要安裝boost。

<a href="http://avro.apache.org/">http://avro.apache.org/</a>

<a href="http://www.apache.org/dyn/closer.cgi/avro/">http://www.apache.org/dyn/closer.cgi/avro/</a>

測試時建議按照在預設目錄,否則可能又會有編譯錯誤的問題。

安裝略

由于bottledwater-pg是postgresql的一個插件,是以首先要安裝postgresql。

可能需要重新開機資料庫,加載bottledwater.so

在資料庫中建立插件

這一步是基礎,搭建好confluent,後面才能測試一下bottledwater-pg生産消息,以及從confluent platform消費消息的過程。

<a href="http://docs.confluent.io/3.1.0/installation.html">http://docs.confluent.io/3.1.0/installation.html</a>

bottledwater-pg需要從wal解析logical row,是以必須配置wal_level=logical級别。

同時wal sender程序數必須&gt;=1。

worker process數必須&gt;=1。

同時由于bottledwater為了保證可以支援斷點續傳,以及確定沒有轉換的wal日志不會被主庫删掉或覆寫掉,需要用到replication slot,是以需要配置replication_slots&gt;=1。

同時為了保證資料庫的wal訂閱者可以通過流複制協定連接配接到資料庫,需要配置pg_hba.conf

建立replication角色使用者

bottledwater-pg用戶端指令的目的是從wal解析日志,寫入kafka。

command option需要配置如何連接配接到資料庫(使用流複制連接配接),output格式,topic-prefix(建議為庫名),是否需要初始化快照,是否允許沒有主鍵的表,kafka broker的連接配接位址和端口,schema-registry的連接配接位址和端口。

以及一些kafka相關的配置。

bottledwater配置檔案說明如下

由于confluent中存儲的是avro封裝的binary格式,是以消費時,需要使用解析avro的消費者。

1. 首次連接配接資料庫時,會自動建立slot,同時自動開始将快照資料寫入kafka,如果資料庫很大,這個過程會很漫長。

2. 為了得到一緻的資料,會開啟repeatable read的事務隔離級别,如果是9.6,并且配置了snapshot too old參數,可能導緻快照拷貝失敗。

3. 由于是邏輯decode,被複制的表必須包含逐漸,或指定非空唯一限制列,作為複制時的key。

4. 增,删,改在wal中被解析為:

如果使用--allow-unkeyed跳過了主鍵,那麼delete該表時,不會将任何資料寫入kafka,插入和更新則将所有列發給kafka。

5. ddl操作不會記錄到wal日志中,如果你需要将ddl也寫入kafka怎麼辦?

你可以使用event trigger,發生ddl時,将ddl封裝并寫入表中,然後這些表的dml會寫入kafka,進而實作ddl的傳遞。

6. 如果要删除生産者,務必删除資料庫中對應的slot ,否則postgresql會一直保留slot未讀取的日志。 導緻wal目錄撐爆。

7. 如果資料庫産生的redo沒有被及時的解析并寫入kafka,可能導緻未取走的資料庫的wal檔案一直留在資料庫伺服器,甚至導緻資料庫空間撐爆。

請謹慎使用slot,同時請将監控做得健壯。

8. kafka topic與table一一對應,命名規則如下

由于命名中隻有三個部分 <code>[topic_prefix].[postgres_schema_name].table_name</code> 沒有考慮庫名,是以如果有多個資料庫時,建議配置top_prefix,和庫名對應即可。