天天看點

如何基于日志,同步實作資料的一緻性和實時抽取?

本文根據dbaplus社群第85期線上分享整理而成

講師介紹

如何基于日志,同步實作資料的一緻性和實時抽取?

王東

宜信技術研發中心架構師

目前就職于宜信技術研發中心,任架構師,負責流式計算和大資料業務産品解決方案。

曾任職于naver china(南韓最大搜尋引擎公司)中國研發中心資深工程師,多年從事cubrid分布式資料庫叢集開發和cubrid資料庫引擎開發

http://www.cubrid.org/blog/news/cubrid-cluster-introduction/

主題簡介:

dws 的背景介紹

dbus+wormhole 總體架構和技術實作方案

dws的實際運用案例

前言  

大家好,我是王東,來自宜信技術研發中心,這是我來社群的第一次分享,如果有什麼不足,請大家多多指正、包涵。

本次分享的主題是《基于日志的dws平台實作和應用》, 主要是分享一下目前我們在宜信做的一些事情。這個主題裡面包含到2個團隊很多兄弟姐妹的努力的結果(我們團隊和山巍團隊的成果)。這次就由我代為執筆,盡我努力給大家介紹一下。

其實整個實作從原理上來說是比較簡單的,當然也涉及到不少技術。我會嘗試用盡量簡單的方式來表達,讓大家了解這個事情的原理和意義。在過程中,大家有問題可以随時提出,我會盡力去解答。

dws是一個簡稱,是由3個子項目組成,我稍後做解釋。

一、背景

事情是從公司前段時間的需求說起,大家知道宜信是一個網際網路金融企業,我們的很多資料與标準網際網路企業不同,大緻來說就是:

如何基于日志,同步實作資料的一緻性和實時抽取?

玩資料的人都知道資料是非常有價值的,然後這些資料是儲存在各個系統的資料庫中,如何讓需要資料的使用方得到一緻性、實時的資料呢?

過去的通用做法有幾種是:

dba開放各個系統的備庫,在業務低峰期(比如夜間),使用方各自抽取所需資料。由于抽取時間不同,各個資料使用方資料不一緻,資料發生沖突,而且重複抽取,相信不少dba很頭疼這個事情。

公司統一的大資料平台,通過sqoop 在業務低峰期到各個系統統一抽取資料, 并儲存到hive表中, 然後為其他資料使用方提供資料服務。這種做法解決了一緻性問題,但時效性差,基本是t+1的時效。

基于trigger的方式擷取增量變更,主要問題是業務方侵入性大,而且trigger也帶來性能損失。

這些方案都不算完美。我們在了解和考慮了不同實作方式後,最後借鑒了 linkedin的思想,認為要想同時解決資料一緻性和實時性,比較合理的方法應該是來自于log。

如何基于日志,同步實作資料的一緻性和實時抽取?

(此圖來自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)

把增量的log作為一切系統的基礎。後續的資料使用方,通過訂閱kafka來消費log。

比如:

大資料的使用方可以将資料儲存到hive表或者parquet檔案給hive或spark查詢;

提供搜尋服務的使用方可以儲存到elasticsearch或hbase 中;

提供緩存服務的使用方可以将日志緩存到redis或alluxio中;

資料同步的使用方可以将資料儲存到自己的資料庫中;

由于kafka的日志是可以重複消費的,并且緩存一段時間,各個使用方可以通過消費kafka的日志來達到既能保持與資料庫的一緻性,也能保證明時性;

為什麼使用log和kafka作為基礎,而不使用sqoop進行抽取呢? 因為:

如何基于日志,同步實作資料的一緻性和實時抽取?

為什麼不使用dual write(雙寫)呢?,請參考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

我這裡就不多做解釋了。

二、總體架構

于是我們提出了建構一個基于log的公司級的平台的想法。

下面解釋一下dws平台, dws平台是有3個子項目組成:

dbus(資料總線):負責實時将資料從源端實時抽出,并轉換為約定的自帶schema的json格式資料(ums 資料),放入kafka中;

wormhole(資料交換平台):負責從kafka讀出資料 将資料寫入到目标中;

swifts(實時計算平台):負責從kafka中讀出資料,實時計算,并将資料寫回kafka中。

如何基于日志,同步實作資料的一緻性和實時抽取?

圖中:

log extractor和dbus共同完成資料抽取和資料轉換,抽取包括全量和增量抽取。

wormhole可以将所有日志資料儲存到hdfs中; 還可以将資料落地到所有支援jdbc的資料庫,落地到hbash,elasticsearch,cassandra等;

swifts支援以配置和sql的方式實作對進行流式計算,包括支援流式join,look up,filter,window aggregation等功能;

dbus web是dbus的配置管理端,rider除了配置管理以外,還包括對wormhole和swifts運作時管理,資料品質校驗等。

由于時間關系,我今天主要介紹dws中的dbus和wormhole,在需要的時候附帶介紹一下swifts。

三、dbus解決方案

日志解析

如前面所說,dbus主要解決的是将日志從源端實時的抽出。 這裡我們以mysql為例子,簡單說明如何實作。

我們知道,雖然mysql innodb有自己的log,mysql主備同步是通過binlog來實作的。如下圖:

如何基于日志,同步實作資料的一緻性和實時抽取?

圖檔來自:https://github.com/alibaba/canal

而binlog有三種模式:

row 模式:日志中會記錄成每一行資料被修改的形式,然後在slave端再對相同的資料進行修改。

statement 模式: 每一條會修改資料的sql都會記錄到 master的bin-log中。slave在複制的時候sql程序會解析成和原來master端執行過的相同的sql來再次執行。

mixed模式: mysql會根據執行的每一條具體的sql語句來區分對待記錄的日志形式,也就是在statement和row之間選擇一種。

他們各自的優缺點如下:

如何基于日志,同步實作資料的一緻性和實時抽取?

此處來自:http://www.jquerycn.cn/a_13625

由于statement 模式的缺點,在與我們的dba溝通過程中了解到,實際生産過程中都使用row 模式進行複制。這使得讀取全量日志成為可能。

通常我們的mysql布局是采用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,由于容災庫通常是用于異地容災,實時性不高也不便于部署。

為了最小化對源端産生影響,顯然我們讀取binlog日志應該從slave從庫讀取。

讀取binlog的方案比較多,github上不少,參考https://github.com/search?utf8=%e2%9c%93&q=binlog。最終我們選用了阿裡的canal做位日志抽取方。

canal最早被用于阿裡中美機房同步, canal原理相對比較簡單:

canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql slave發送dump協定

mysql master收到dump請求,開始推送binary log給slave(也就是canal)

canal解析binary log對象(原始為byte流)

如何基于日志,同步實作資料的一緻性和實時抽取?

解決方案

dbus 的mysql版主要解決方案如下:

如何基于日志,同步實作資料的一緻性和實時抽取?

對于增量的log,通過訂閱canal server的方式,我們得到了mysql的增量日志:

按照canal的輸出,日志是protobuf格式,開發增量storm程式,将資料實時轉換為我們定義的ums格式(json格式,稍後我會介紹),并儲存到kafka中;

增量storm程式還負責捕獲schema變化,以控制版本号;

增量storm的配置資訊儲存在zookeeper中,以滿足高可用需求。

kafka既作為輸出結果也作為處理過程中的緩沖器和消息解構區。

在考慮使用storm作為解決方案的時候,我們主要是認為storm有以下優點:

技術相對成熟,比較穩定,與kafka搭配也算标準組合;

實時性比較高,能夠滿足實時性需求;

滿足高可用需求;

通過配置storm并發度,可以活動性能擴充的能力;

全量抽取

對于流水表,有增量部分就夠了,但是許多表需要知道最初(已存在)的資訊。這時候我們需要initial load(第一次加載)。

對于initial load(第一次加載),同樣開發了全量抽取storm程式通過jdbc連接配接的方式,從源端資料庫的備庫進行拉取。initial load是拉全部資料,是以我們推薦在業務低峰期進行。好在隻做一次,不需要每天都做。

全量抽取,我們借鑒了sqoop的思想。将全量抽取storm分為了2 個部分:

資料分片

實際抽取

資料分片需要考慮分片列,按照配置和自動選擇列将資料按照範圍來分片,并将分片資訊儲存到kafka中。

如何基于日志,同步實作資料的一緻性和實時抽取?

下面是具體的分片政策:

如何基于日志,同步實作資料的一緻性和實時抽取?

全量抽取的storm程式是讀取kafka的分片資訊,采用多個并發度并行連接配接資料庫備庫進行拉取。因為抽取的時間可能很長。抽取過程中将實時狀态寫到zookeeper中,便于心跳程式監控。

如何基于日志,同步實作資料的一緻性和實時抽取?

統一消息格式

無論是增量還是全量,最終輸出到kafka中的消息都是我們約定的一個統一消息格式,稱為ums(unified message schema)格式。

如下圖所示:

如何基于日志,同步實作資料的一緻性和實時抽取?
如何基于日志,同步實作資料的一緻性和實時抽取?

消息中schema部分,定義了namespace 是由 類型+資料源名+schema名+表名+版本号+分庫号+分表号 能夠描述整個公司的所有表,通過一個namespace就能唯一定位。

_ums_op_ 表明資料的類型是i(insert),u(update),d(删除);

_ums_ts_ 發生增删改的事件的時間戳,顯然新的資料發生的時間戳更新;

_ums_id_ 消息的唯一id,保證消息是唯一的,但這裡我們保證了消息的先後順序(稍後解釋);

payload是指具體的資料,一個json包裡面可以包含1條至多條資料,提高資料的有效載荷。

ums中支援的資料類型,參考了hive類型并進行簡化,基本上包含了所有資料類型。

全量和增量的一緻性

在整個資料傳輸中,為了盡量的保證日志消息的順序性,kafka我們使用的是1個partition的方式。在一般情況下,基本上是順序的和唯一的。

但是我們知道寫kafka會失敗,有可能重寫,storm也用重做機制,是以,我們并不嚴格保證exactly once和完全的順序性,但保證的是at least once。

是以_ums_id_變得尤為重要。

對于全量抽取,_ums_id_是唯一的,從zk中每個并發度分别取不同的id片區,保證了唯一性和性能,填寫負數,不會與增量資料沖突,也保證他們是早于增量消息的。

對于增量抽取,我們使用的是mysql的日志檔案号 + 日志偏移量作為唯一id。id作為64位的long整數,高7位用于日志檔案号,低12位作為日志偏移量。

例如:000103000012345678。 103 是日志檔案号,12345678 是日志偏移量。

這樣,從日志層面保證了實體唯一性(即便重做也這個id号也不變),同時也保證了順序性(還能定位日志)。通過比較_ums_id_ 消費日志就能通過比較_ums_id_知道哪條消息更新。

其實_ums_ts_與_ums_id_意圖是類似的,隻不過有時候_ums_ts_可能會重複,即在1毫秒中發生了多個操作,這樣就得靠比較_ums_id_了。

心跳監控和預警

整個系統涉及到資料庫的主備同步,canal server,多個并發度storm程序等各個環節。

是以對流程的監控和預警就尤為重要。

通過心跳子產品,例如每分鐘(可配置)對每個被抽取的表插入一條心态資料并儲存發送時間,這個心跳表也被抽取,跟随着整個流程下來,與被同步表在實際上走相同的邏輯(因為多個并發的的storm可能有不同的分支),當收到心跳包的時候,即便沒有任何增删改的資料,也能證明整條鍊路是通的。

storm程式和心跳程式将資料發送公共的統計topic,再由統計程式儲存到influxdb中,使用grafana進行展示,就可以看到如下效果:

如何基于日志,同步實作資料的一緻性和實時抽取?

圖中是某業務系統的實時監控資訊。上面是實時流量情況,下面是實時延時情況。可以看到,實時性還是很不錯的,基本上1~2秒資料就已經到末端kafka中。

granfana提供的是一種實時監控能力。

如果出現延時,則是通過dbus的心跳子產品發送郵件報警或短信報警。

實時脫敏

考慮到資料安全性,對于有脫敏需求的場景,dbus的全量storm和增量storm程式也完成了實時脫敏的功能。脫敏方式有3種:

如何基于日志,同步實作資料的一緻性和實時抽取?

總結一下:簡單的說,dbus就是将各種源的資料,實時的導出,并以ums的方式提供訂閱, 支援實時脫敏,實際監控和報警。

四、wormhole解決方案

說完dbus,該說一下wormhole,為什麼兩個項目不是一個,而要通過kafka來對接呢?

其中很大一個原因就是解耦,kafka具有天然的解耦能力,程式直接可以通過kafka做異步的消息傳遞。dbus和wornhole内部也使用了kafka做消息傳遞和解耦。

另外一個原因就是,ums是自描述的,通過訂閱kafka,任何有能力的使用方來直接消費ums來使用。

雖然ums的結果可以直接訂閱,但還需要開發的工作。wormhole解決的是:提供一鍵式的配置,将kafka中的資料落地到各種系統中,讓沒有開發能力的資料使用方通過wormhole來實作使用資料。

如何基于日志,同步實作資料的一緻性和實時抽取?

如圖所示,wormhole 可以将kafka中的ums 落地到各種系統,目前用的最多的hdfs,jdbc的資料庫和hbase。

在技術棧上, wormhole選擇使用spark streaming來進行。

在wormhole中,一條flow是指從一個namaspace從源端到目标端。一個spark streaming服務于多條flow。

如何基于日志,同步實作資料的一緻性和實時抽取?

選用spark的理由是很充分的:

spark天然的支援各種異構存儲系統;

雖然spark stream比storm延時稍差,但spark有着更好的吞吐量和更好的計算性能;

spark在支援并行計算方面有更強的靈活性;

spark提供了一個技術棧内解決sparking job,spark streaming,spark sql的統一功能,便于後期開發;

這裡補充說一下swifts的作用:

swifts的本質是讀取kafka中的ums資料,進行實時計算,将結果寫入到kafka的另外一個topic。

實時計算可以是很多種方式:比如過濾filter,projection(投影),lookup, 流式join window aggregation,可以完成各種具有業務價值的流式實時計算。

wormhole和swifts對比如下:

如何基于日志,同步實作資料的一緻性和實時抽取?

落hdfs

通過wormhole wpark streaming程式消費kafka的ums,首先ums log可以被儲存到hdfs上。

kafka一般隻儲存若幹天的資訊,不會儲存全部資訊,而hdfs中可以儲存所有的曆史增删改的資訊。這就使得很多事情變為可能:

通過重放hdfs中的日志,我們能夠還原任意時間的曆史快照。

可以做拉連結清單,還原每一條記錄的曆史資訊,便于分析;

當程式出現錯誤是,可以通過回灌(backfill),重新消費消息,重新形成新的快照。

可以說hdfs中的日志是很多的事情基礎。

介于spark原生對parquet支援的很好,spark sql能夠對parquet提供很好的查詢。ums落地到hdfs上是儲存到parquet檔案中的。parquet的内容是所有log的增删改資訊以及_ums_id_,_ums_ts_都存下來。

wormhole spark streaming根據namespace 将資料分布存儲到不同的目錄中,即不同的表和版本放在不同目錄中。

如何基于日志,同步實作資料的一緻性和實時抽取?

由于每次寫的parquet都是小檔案,大家知道hdfs對于小檔案性能并不好,是以另外還有一個job,每天定時将這些的parquet檔案進行合并成大檔案。

每個parquet檔案目錄都帶有檔案資料的起始時間和結束時間。這樣在回灌資料時,可以根據選取的時間範圍來決定需要讀取哪些parquet檔案,不必讀取全部資料。

插入或更新資料的幂等性

常常我們遇到的需求是,将資料經過加工落地到資料庫或hbase中。那麼這裡涉及到的一個問題就是,什麼樣的資料可以被更新到資料?

這裡最重要的一個原則就是資料的幂等性。

無論是遇到增删改任何的資料,我們面臨的問題都是:

該更新哪一行;

更新的政策是什麼。

對于第一個問題,其實就需要定位資料要找一個唯一的鍵,常見的有:

使用業務庫的主鍵;

由業務方指定幾個列做聯合唯一索引;

對于第二個問題,就涉及到_ums_id_了,因為我們已經保證了_ums_id_大的值更新,是以在找到對應資料行後,根據這個原則來進行替換更新。

如何基于日志,同步實作資料的一緻性和實時抽取?

之是以要軟删除和加入_is_active_列,是為了這樣一種情況:

如果已經插入的_ums_id_比較大,是删除的資料(表明這個資料已經删除了), 如果不是軟删除,此時插入一個_ums_id_小的資料(舊資料),就會真的插入進去。

這就導緻舊資料被插入了。不幂等了。是以被删除的資料依然保留(軟删除)是有價值的,它能被用于保證資料的幂等性。

hbase的儲存

插入資料到hbase中,相當要簡單一些。不同的是hbase可以保留多個版本的資料(當然也可以隻保留一個版本)預設是保留3個版本;

是以插入資料到hbase,需要解決的問題是:

選擇合适的rowkey:rowkey的設計是可以選的,使用者可以選擇源表的主鍵,也可以選擇若幹列做聯合主鍵。

選擇合适的version:使用_ums_id_+ 較大的偏移量(比如100億) 作為row的version。

version的選擇很有意思,利用_ums_id_的唯一性和自增性,與version自身的比較關系一緻:即version較大等價于_ums_id_較大,對應的版本較新。

從提高性能的角度,我們可以将整個spark streaming的dataset集合直接插入到hbase,不需要比較。讓hbase基于version自動替我們判斷哪些資料可以保留,哪些資料不需要保留。

jdbc的插入資料:

插入資料到資料庫中,保證幂等的原理雖然簡單,要想提高性能在實作上就變得複雜很多,總不能一條一條的比較然後在插入或更新。

我們知道spark的rdd/dataset都是以集合的方式來操作以提高性能,同樣的我們需要以集合操作的方式實作幂等性。

具體思路是:

首先根據集合中的主鍵到目标資料庫中查詢,得到一個已有資料集合;

與dataset中的集合比較,分出兩類:

      a:不存在的資料,即這部分資料insert就可以;

      b:存在的資料,比較_ums_id_, 最終隻将哪些_ums_id_更新較大row到目标資料庫,小的直接抛棄。

使用spark的同學都知道,rdd/dataset都是可以partition的,可以使用多個worker并進行操作以提高效率。

在考慮并發情況下,插入和更新都可能出現失敗,那麼還有考慮失敗後的政策。

比如:因為别的worker已經插入,那麼因為唯一性限制插入失敗,那麼需要改為更新,還要比較_ums_id_看是否能夠更新。

對于無法插入其他情況(比如目标系統有問題),wormhole還有重試機制。說起來細節特别多。這裡就不多介紹了。

有些還在開發中。

插入到其他存儲中的就不多介紹了,總的原則是:根據各自存儲自身特性,設計基于集合的,并發的插入資料實作。這些都是wormhole為了性能而做的努力,使用Wormhole的使用者不必關心 。

五、運用案例

實時營銷

說了那麼多,dws有什麼實際運用呢?下面我來介紹某系統使用dws實作了的實時營銷。

如何基于日志,同步實作資料的一緻性和實時抽取?

如上圖所示:

系統a的資料都儲存到自己的資料庫中,我們知道,宜信提供很多金融服務,其中包括借款,而借款過程中很重要的就是信用稽核。

借款人需要提供證明具有信用價值的資訊,比如央行征信報告,是具有最強信用資料的資料。 而銀行流水,網購流水也是具有較強的信用屬性的資料。

借款人通過web或手機app在系統a中填寫信用資訊時,可能會某些原因無法繼續,雖然可能這個借款人是一個優質潛在客戶,但以前由于無法或很久才能知道這個資訊,是以實際上這樣的客戶是流失了。

應用了dws以後,借款人已經填寫的資訊已經記錄到資料庫中,并通過dws實時的進行抽取、計算和落地到目标庫中。根據對客戶的打分,評價出優質客戶。然後立刻将這個客戶的資訊輸出到客服系統中。

客服人員在很短的時間(幾分鐘以内)就通過打電話的方式聯系上這個借款人(潛客),進行客戶關懷,将這個潛客轉換為真正的客戶。我們知道借款是有時效性的,如果時間太久就沒有價值了。

如果沒有實時抽取/計算/落庫的能力,那麼這一切都無法實作。

實時報表系統

另外一個實時報表的應用如下:

如何基于日志,同步實作資料的一緻性和實時抽取?

我們資料使用方的資料來自多個系統,以前是通過t+1的方式獲得報表資訊,然後指導第二天的營運,這樣時效性很差。

通過dws,将資料從多個系統中實時抽取,計算和落地,并提供報表展示,使得營運可以及時作出部署和調整,快速應對。

六、總結

說了那麼多,大緻總結一下:

dws技術上基于主流實時流式大資料技術架構,高可用大吞吐強水準擴容,低延遲高容錯最終一緻。

dws能力上支援異構多源多目标系統,支援多資料格式(結構化半結構化非結構化資料)和實時技術能力。

dws将三個子項目合并作為一個平台推出,使得我們具備了實時的能力, 驅動各種實時場景應用。

适合場景包括:實時同步/實時計算/實時監控/實時報表/實時分析/實時洞察/實時管理/實時營運/實時決策

感謝大家的聆聽,此次分享到此為止。

q&a

q1:oracle log reader有開源方案嗎?

A1:對于oracle業界也有許多商業解決方案,例如:oracle goldengate(原來的goldengate), oracle xstream, ibm infosphere change data capture(原來的datamirror),dell shareplex (原來的quest),國内的dsg supersync等,開源的方案好用的很少。

q2:這個項目投入了多少人力物力?感覺有點複雜。

q2:dws是三個子項目組成,平均每個項目5~7人。是有點複雜,其實也是試圖使用大資料技術來解決我們公司目前遇到的困難。

因為是搞大資料相關技術,所有團隊裡面的兄弟姐妹都還是比較happy的:)

其實這裡面,dbus和wormhole相對固定模式化,容易輕松複用。swifts實時計算是與每個業務相關比較大的,自定義比較強,相對比較麻煩一些。

q3:宜信的這個dws系統會開源麼?

a3:我們也考慮過向社群貢獻,就像宜信的其他開源項目一樣,目前項目剛剛成形,還有待進一步磨煉,我相信未來的某個時候,我們會給它開源出來。

q4:架構師怎麼了解,是不是系統工程師?

a4:不是系統工程師,在我們宜信有多位架構師,應該算是以技術驅動業務的技術管理人員。包含産品設計,技術管理等。

q5:複制方案是否是ogg?

a5:ogg與上面提到的其他商業解決方案都是可選方案。

原文釋出時間為:2016-12-19

本文來自雲栖社群合作夥伴dbaplus