postgresql , bottled water , kafka , confluent , iot
想必大家都在圖書館借過書,小時候有好看的書也會在小夥伴之間傳閱。
借書和資料泵有點類似,一份資料通過資料泵實時的分享給訂閱者。
例如在iot的場景中,有流式分析的需求,也有存儲曆史資料的需求,同時還有資料挖掘的需求,搜尋引擎可能也需要同一份資料,還有一些業務可能也要用到同一份資料。
但是如果把資料統統放到一個地方,這麼多的業務,它們有的要求實時處理,有的要求批量處理,有的可能需要實時的更新資料,有的可能要對大資料進行分析。
顯然一個産品可能無法滿足這麼多的需求。
就好比資料庫就分了關系資料庫,nosql,oltp場景,olap場景一樣。 也是因為一個産品無法滿足所有的業務需求。
在企業中通常是借助資料備援來解決各類場景的需求。
那麼如何才能夠更好的分享資料,保證資料的一緻性,提高分享的實時性呢?
<a href="http://docs.confluent.io/3.1.0/platform.html">http://docs.confluent.io/3.1.0/platform.html</a>

confluent 是一個實時的資料中轉服務,來自各個平台的資料可以使用confluent進行流轉,達到分享和交換資料的目的。
例如來自物聯網傳感器的資料,來自資料庫的資料,來自http,移動app的資料,來自應用日志的資料,來自一些事件觸發的資料 等等。
confluent需要依賴一些基本的元件,核心元件如kafka.
使用者可以自定義消息的生産者和消費者,在confluent提供的平台上交換資料。
bottledwater-pg是confluent平台的一種消息生産者,針對postgresql資料庫,即将postgresql資料庫的資料寫入confluent kafka,進而實時的分享給消息訂閱者。
支援postgresql 9.4以及以上版本,支援全量快照,以及持續的增量資料寫入kafka。
bottledwater-pg使用postgresql快照技術,可以讀取一緻性的快照寫入kafka。使用資料庫logical decode技術,從postgresql的wal日志中,解析為row資料寫入kafka。
在kafka中,每個topic代表一張資料庫的表。
資料在使用decode從wal取出後,寫入kafka之前,使用avro将資料row打包成json, or protobuf, or thrift, or any number of formats,再寫入kafka。
avro支援的資料類型比較豐富,可以很好的支撐postgresql豐富的資料類型。
為什麼使用avro請參考
<a href="http://radar.oreilly.com/2014/11/the-problem-of-managing-schemas.html">http://radar.oreilly.com/2014/11/the-problem-of-managing-schemas.html</a>
bottledwater-pg是pg的一個插件,它的目的是解析wal,同時使用avro封裝為json/protobuf/thrift/其他formats。 并寫入kafka。
是以它依賴這些庫或軟體
最好部署較新版本的,否則可能會有編譯問題。
可選,一種比較高效的壓縮和解壓縮庫。
由于avro還支援xz,可不安裝snappy
json parser,必須安裝,建議測試時安裝在預設目錄,否則可能遇到編譯問題,或者設定rpath。
建議測試時安裝在預設路徑中,如下。
可以不裝,如果你要安裝avro的doc時才需要安裝boost。
<a href="http://avro.apache.org/">http://avro.apache.org/</a>
<a href="http://www.apache.org/dyn/closer.cgi/avro/">http://www.apache.org/dyn/closer.cgi/avro/</a>
測試時建議按照在預設目錄,否則可能又會有編譯錯誤的問題。
安裝略
由于bottledwater-pg是postgresql的一個插件,是以首先要安裝postgresql。
可能需要重新開機資料庫,加載bottledwater.so
在資料庫中建立插件
這一步是基礎,搭建好confluent,後面才能測試一下bottledwater-pg生産消息,以及從confluent platform消費消息的過程。
<a href="http://docs.confluent.io/3.1.0/installation.html">http://docs.confluent.io/3.1.0/installation.html</a>
bottledwater-pg需要從wal解析logical row,是以必須配置wal_level=logical級别。
同時wal sender程序數必須>=1。
worker process數必須>=1。
同時由于bottledwater為了保證可以支援斷點續傳,以及確定沒有轉換的wal日志不會被主庫删掉或覆寫掉,需要用到replication slot,是以需要配置replication_slots>=1。
同時為了保證資料庫的wal訂閱者可以通過流複制協定連接配接到資料庫,需要配置pg_hba.conf
建立replication角色使用者
bottledwater-pg用戶端指令的目的是從wal解析日志,寫入kafka。
command option需要配置如何連接配接到資料庫(使用流複制連接配接),output格式,topic-prefix(建議為庫名),是否需要初始化快照,是否允許沒有主鍵的表,kafka broker的連接配接位址和端口,schema-registry的連接配接位址和端口。
以及一些kafka相關的配置。
bottledwater配置檔案說明如下
由于confluent中存儲的是avro封裝的binary格式,是以消費時,需要使用解析avro的消費者。
1. 首次連接配接資料庫時,會自動建立slot,同時自動開始将快照資料寫入kafka,如果資料庫很大,這個過程會很漫長。
2. 為了得到一緻的資料,會開啟repeatable read的事務隔離級别,如果是9.6,并且配置了snapshot too old參數,可能導緻快照拷貝失敗。
3. 由于是邏輯decode,被複制的表必須包含逐漸,或指定非空唯一限制列,作為複制時的key。
4. 增,删,改在wal中被解析為:
如果使用--allow-unkeyed跳過了主鍵,那麼delete該表時,不會将任何資料寫入kafka,插入和更新則将所有列發給kafka。
5. ddl操作不會記錄到wal日志中,如果你需要将ddl也寫入kafka怎麼辦?
你可以使用event trigger,發生ddl時,将ddl封裝并寫入表中,然後這些表的dml會寫入kafka,進而實作ddl的傳遞。
6. 如果要删除生産者,務必删除資料庫中對應的slot ,否則postgresql會一直保留slot未讀取的日志。 導緻wal目錄撐爆。
7. 如果資料庫産生的redo沒有被及時的解析并寫入kafka,可能導緻未取走的資料庫的wal檔案一直留在資料庫伺服器,甚至導緻資料庫空間撐爆。
請謹慎使用slot,同時請将監控做得健壯。
8. kafka topic與table一一對應,命名規則如下
由于命名中隻有三個部分 <code>[topic_prefix].[postgres_schema_name].table_name</code> 沒有考慮庫名,是以如果有多個資料庫時,建議配置top_prefix,和庫名對應即可。