天天看點

設計資料密集型應用第三部分:派生資料

  《Designing Data-Intensive Applications》的第一部分,基于單點(single node)介紹了資料系統的基礎理論與知識;在第二部分,則是将視野擴充到了分布式資料系統,主要是Partition和Repliacation。在第三部分,則聚焦于派生資料系統。

integrating multiple different data systems, potentially with different data models and optimized for different access patterns, into one coherent application architecture.

  對于目前日益複雜的應用,沒有哪一種單一的資料系統可以滿足應用的所有需求,是以本章就是介紹如何将不同的資料系統整合到單一應用中。資料系統可以分為兩類

  system of record:原始資料,source of truth

  derived data system:派生資料系統,即資料來自其他資料系統。派生資料系統包括但不限于:cache、索引、視圖,本質上派生資料是原始資料的備援,為了性能而做的備援。

  值得注意的是,原始資料系統與派生資料系統的差別并不在于對應的工具,而在于在應用中的具體使用方式。

The distinction between system of record and derived data system depends not on the tool, but on how you use it in your application.

本文位址:https://www.cnblogs.com/xybaby/p/9895725.html

  在某個層面,可以資料系統進行以下分類

Services (online systems)

    追求response time

Batch processing systems (offline systems)

    追求throughput

Stream processing systems (near-real-time systems)

    sth between online and batch system

  本章讨論的是批處理系統(Batch processing),MapReduce是批處理系統的典型代表,在MapReduce的諸多設計中,都可以看到unix的一些影子。

一個程式隻做一件事,做好這件事,如果有新需求,那麼重新寫一個程式,而不是在原來的程式上修修補補

盡量讓每個程式的輸出做其他程式的輸入,即輸出要通用、普适性,不要堅持互動式輸入

設計和構造軟體,即使是作業系統,都要及早嘗試,對于不合理的地方,要毫不猶豫的推到重構

盡早借助工具來避免重複性的勞動,自動化

automation, rapid prototyping, incremental iteration, being friendly to experimentation, and breaking down large projects into manageable chunks Separating the input/output wiring from the program logic makes

  unix pipeline的最大缺陷在于這些組合工具隻能在單個機器上運作,需要擴充到多個節點時,就需要Hadoop這樣的分布式系統

  與unix差別

從單機到多機

stdin stdout 到 file

  關于MapReduce的原理與架構,之前在《典型分布式系統分析:MapReduce》一文中描述過。下面關注一些不是在MapReduce論文中出現的一些讨論。

  批進行中,經常也需要join操作,通過join操作來補充完整一個事件(event)。在批進行中,既可以在Map的時候join,也可以在Reduce的時候join,如下所示

  

設計資料密集型應用第三部分:派生資料

  event log中隻記錄uid,而其他屬性需要從user database(profile information)讀取,這樣避免了profile資料的備援

  每次通過網路去讀取user profile 顯然是不切實際的,拖慢批處理速度;而且由于profile 是可變的,導緻批處理 結果不是确定性的。一個友好的解決辦法是:備援一份資料,放到批處理系統中。

  下面是一個reduce_side join的例子。稱之為sort-merge join,因為不管是User event 還是 User profile都按照userID進行hash,是以都一個使用者的event 和 profile會配置設定到都一個reducer。

設計資料密集型應用第三部分:派生資料

搜尋引擎 search index

  關于增量建立search index,寫入新的segment檔案,背景批量合并壓縮。

  new segment files and asynchronously merges and compacts segment files in the background.

機器學習與推薦

  一般來說,将資料寫入到一個key value store,然後給使用者查詢

  怎麼講批處理的結果導入到kvs? 直接導入是不大可能的。寫入到一個新的db,然後切換。

  build a brand-new database inside the batch job and write it as files to the job’s output directory in the distributed filesystem

  MapReduce的問題

  (1)比較底層,需要寫大量代碼:using the raw MapReduce APIs is actually quite hard and laborious

  解決辦法:higher-level programming models (Pig, Hive, Cascading, Crunch) were created as abstractions on top of MapReduce.

  (2) mapreduce execution model的問題,如下

  materiallization(物化)是指:每一個MapReduce的輸出都需要寫入到檔案再給下一個MapReduce Task Job。

  顯然,materiallization是提前計算,而不是按需計算。而Unix pipleline 是通過stream按需計算,隻占用少量記憶體空間。

  MapReduce相比unix pipeline缺陷

  MapReduce job完成之後才能進行下一個,而unix pipeline是同時執行的

  Mapper經常是多餘的:很多時候僅僅是出去上一個reducer的輸出

  中間狀态的存儲也是要備援的,有點浪費

  dataflow engines如Spark、Tez、Flink試圖解決Mapreduce問題的

they handle an entire workflow as one job, rather than breaking it up into independent subjobs. 

  dataflow engines 沒有明顯的map reduce , 而是一個接一個的operator。其優勢:

避免了無謂的sort(mr 在map和reduce之間總是要sort)

較少非必需的map task

由于知道整個流程,可以實作locality optimizations

中間狀态寫入記憶體或者本地檔案,而不是HDFS

operator流水線工作,不同等到上一個stage完全結束

在運作新的operator時,可以複用JVM

  批處理與流處理的最大差別在于,批處理的輸入是确定的、有限的,而流處理的輸入是源源不斷的,是以流處理系統一般比批處理系統有更好的實時性。

  流處理相關術語

  event:In a stream processing context, a record is more commonly known as an event

  producer、publisher、sender

  consumer、subscriber、recipient

  topic、stream,一組相關event

  用于事件發生時,通知消費者,對于某個topic 一般是多生産者 多消費者。

  如何對消息系統分類:

  (1)What happens if the producers send messages faster than the consumers can process them?

  第一個問題,生産速度大于消費速度,對應的處理方式包括:丢包、緩存、流控(限制寫入速度)

  (2)What happens if nodes crash or temporarily go offline—are any messages lost?

  第二個問題,當節點crash或者臨時故障,消息會不會丢

(1)直達消息系統(沒有中間商)

  即一個event直接從producer到達consumer,如UDP廣播,brokerless : zeroMQ,這樣的系統有消息丢失的風險。

(2)message broker(message queue)

  定制化的DB

  異步過程

  保證消息可靠性

  shared subscriptions,一條消息任意一個consumer處理即可;負載均衡;可擴充性

  topic subscriptions 一條消息需要被不同的comsumer消費

設計資料密集型應用第三部分:派生資料

  上圖(a)中的event隻需要被任意一個consumer消費即可,而(b)中的每一個event則需要被所有關注該topic的consumer處理

  需要consumer的ack來保證消息已被消費,消息可能會被重複投遞,是以需要幂等性

  當 load balancing遇上redeliver,可能會出現messgae 亂序

  一般的消息隊列都是一次性消費,基于log的消息隊列可以重複消費

The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log

  其優點在于:持久化且immutable的日志允許comsumer重新處理所有的事件

This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization

  在log-based message broker有資料庫的影子,即資料在log中,那麼反過來呢,能否将message的思想應用于db,或者說db中是否本身就有message的思想?

  其實是有的,在primary-secondary 中,primary寫oplog, produce event;secondary讀oplog, consume event。

  一份資料以不同的形式儲存多分,db、cache、search index、recommend system、OLAP

  很多都是使用full database dumps(batch process),這個速度太慢,又有滞後; 多寫(dual write)也是不現實的,增加應用層負擔、耦合嚴重。

  一般來說,應用(db_client)按照db的限制來使用db,而不是直接讀取、解析replication log。但如果可以直接讀取,則有很多用處,例如用來建立serach index、cache、data warehouse。

  如下圖所示

設計資料密集型應用第三部分:派生資料

  前面是DB(leader),中間是log-based message broker,後面是derived data system(cache, data warehouse) as followers

  這樣做的潛在問題是,日志會越來越多,耗光磁盤,直接删除就的log也是不行的,可以周期性的log compaction:處理對一個key重複的操作,或者說已經被删除的key。這樣也能解決新增加一個consumber,且consumber需要所有完整資料的情況。

event sourcing involves storing all changes to the application state as a log of change events.

  CDC在資料層記錄,增删改查,一個event可能對應多個data change;mutable

  event sourcing 在應用層記錄,immutable(不應該修改 删除)

  event soucing 一般隻記錄操作,不記錄操作後的結果,是以需要所有資料才能恢複目前的狀态

  周期性的snapshot有助于性能

  Commands and events: 二者并不等價,Command隻是意圖(比如想預定座位),隻有通過檢查,執行成功,才會生成對應的event,event 代表 fact

設計資料密集型應用第三部分:派生資料

  上圖非常有意思:state是event stream的累計值,積分的效果,而stream是state的瞬時值,微分的效果

  Advantages of immutable events

immutable event log 有利于追溯到任意時間點,也可以更容易從錯誤中恢複

immutable event log 比目前狀态有更多的資訊:使用者添加物品到購物籃,然後從購物籃移除;從狀态來看,什麼都沒有發生,但event log卻意義豐富

Deriving several views from the same event log  當有event log,很容易回放event,産生新的資料視圖,而不用冒險修改目前使用的資料視圖,做到灰階更新

  資料流應用廣泛:

寫到其他資料系統:db、cache等

推送給使用者,或者實時展示

産生其他的資料流,形成鍊路

  stream processing 通常用于監控:風控、實時交易系統、機器狀态、軍事系統

  CEP(Complex event processing)是對特定事件的監控,對于stream,設定比對規則,滿足條件則觸發 complex event

  In these systems, the relationship between queries and data is reversed compared to normal databases.

  DB持久化資料,查詢是臨時的

  而CEP持久化的是查詢語句,資料時源源不斷的

  批處理一般使用event time,而流處理可能采用本地時間(stream processing time),這可能導緻不準确的時間視窗(尤其兩個時間差抖動的時候)

  以event time作為時間視窗的問題:不确定是否收到了某個window内所有的event。

  通常,需要結合使用本地時鐘與伺服器時鐘,考慮一個情況,用戶端采集日志發送到伺服器,在未聯網的時候本地緩存。如果用本地時間,本地時間可能不準,用伺服器時間,不能反映事件發生的時刻(可能過了很長時間才從緩存發送到伺服器),解決辦法:

  用device clock記錄事件發生時間;

  用device clock記錄事件上傳時間;

  用server clock記錄伺服器收到event的時間

  用(3)減去(2)可以得到時間偏差(忽略了網絡延時),在用(1)加上這個時間偏差就得到了事件的真正發生時間。

滾動 tumbling window

  正交的時間塊,一個5分鐘,接下來又一個5分鐘

跳動 hopping window

  相交的時間塊,5分鐘,然後前進1分鐘,有一個5分鐘

滑動 Sliding window

  無固定的邊界,一點點向前滑

  流處理系統中也是需要一些join操作

stream-stream joins

  for example.click-through rate 網頁搜尋、點選事件

stream-table joins(stream enrichment)

  a set of user activity events and a database of user profiles

Time-dependence of joins

  if events on different streams happen around a similar time, in which order are they processed?

  比如跨境交易,匯率是實時變化的,那麼交易事件與事件發生時間的匯率綁定。解決辦法是 交易事件裡面維護當時的匯率id,但這導緻沒法做log compaction

  stream processing system的容錯性,batch processing 可以做到 exactly-once semantics,雖然可能會有失敗-重試。對于流處理

Microbatching and checkpointing 做到了可重試

Idempotence 保證了可重複執行

  the most appropriate choice of software tool also depends on the circumstances.   in complex applications, data is often used in several different ways. There is unlikely to be one piece of software that is suitable for all the different circumstances in which the data is used

  複雜的應用可能需要內建多個資料系統,讓單個資料系統各司其職,那麼如何保證多個資料系統資料的一緻性:以其中一個資料系統為準(Primary),然後通過CDC或者event sourcing複制到其他資料系統。

  分布式事務通過互斥鎖決定寫操作順序;而CDC 使用log來保證順序

  分布式事務通過atomic保證隻生效一次;而log based依賴于确定性的重試與幂等

batch processing is used to reprocess historical data, and stream processing is used to process recent updates, then how do you combine the two?

  結合批處理與流處理

  批處理:慢但是準确

  流處理:快速但不一定精确

  潛在的問題:

  在batch、stream processing framework上維護兩份一樣的邏輯

  batch pipeline、stream pipeline輸出不同,導緻讀取的時候需要merge

  增量batch 需要解決時間視窗、stragglers 問題

  解決辦法如下:Unifying batch and stream processing

Unix and relational databases have approached the information management problem with very different philosophies.

  Unix 提供了是比較底層的對硬體的封裝,thin wrapper; 而relationaldb 對程式員提供high-level抽象

it seems that there are parallels between the features that are built into databases and the derived data systems that people are building with batch and stream processors.

  資料庫的feature(secondary index、view、replication log, full-text search index)與 derived data system有一些類似之處

  以建立新索引為例:

快照,然後處理已有資料

處理在上一步過程中新加入的資料

索引建立完畢,處理後續資料

  這個過程類似于

增加新的secondary(follower)

在流處理系統中增加新的消費者

設計資料密集型應用第三部分:派生資料

  write path:precomputed; eager evaluation

   whenever some piece of information is written to the system, it may go through multiple stages of batch and stream processing, and eventually every derived dataset is updated to incorporate the data that was written

  read path:lazy evaluation

   when serving a user request you read from the derived dataset, perhaps perform some more processing on the results, and construct the response to the user.
  The derived dataset is the place where the write path and the read path meet, as illustrated in Figure 12-1. It represents a trade-off between the amount of work that needs to be done at write time and the mount that needs to be done at read time.

  derived dataset是write path與read path連接配接的地方,是寫入時處理與讀取時工作量的折中。caches, indexes, and materialized views 都是在write path上多做一些工作,減輕read path負擔

  寫 與 讀的折中;twinter的例子,名人 普通人政策不一樣

  目前網際網路應用都是client server模式,client無狀态,資料都在server;但single path application或者mobile app在斷網的時候也能使用,提供更好的使用者體驗;而且,web-socket等技術提供了server主動向client推送的能力,這就是的write-path 進一步擴充到了用戶端

  大多數的db,lib,framework、protocol都是按照staleless and request/response的思想來設計的,根深蒂固

In order to extend the write path all the way to the end user, we would need to fundamentally rethink the way we build many of these systems: moving away from request response interaction and toward publish/subscribe dataflow
  build applications that are reliable and correct   In this section I will suggest some ways of thinking about correctness in the context of dataflow architectures.

  即使使用了強事務性,也不能保證資料不會有問題,因為由于代碼bug、人為錯誤,導緻資料的損壞 丢失,immutable and append-only data helps

  考慮一個複雜的問題 exactly-once

  其中一個解決辦法:Idempotent(幂等)。 但需要額外的一些工作,而且需要非常細心的實作

  TCP的seq number也是為了保證excat once, 但這隻對單個TCP連接配接生效

In many databases, a transaction is tied to a client connection。 If the client suffers a network interruption and connection timeout after sending the COMMIT, but before hearing back from the database server, it does not know whether the transaction has been committed or aborted。

  2pc break the 1:1 mapping between a TCP connection and a transaction,是以 suppress duplicate transactions between the database client and server;但是end-user與 application server之間呢

  終極解決辦法,Unique Operation ID

設計資料密集型應用第三部分:派生資料
  Besides suppressing duplicate requests, the requests table in Example 12-2 acts as a kind of event log, hinting in the direction of event sourcing

  除了保證點到點的限制,也充當了event log,可以用于event sourcing

  限制:如unique constraint,賬戶餘額不能為負等

  通過consume log來實作限制:

  Its fundamental principle is that any writes that may conflict are routed to the same partition and processed sequentially

  consistency可能包含兩重意義

  及時性(Timeliness ):user讀取到的是實時狀态

  完整性(Integrity):user讀取到的是完整的狀态

violations of timeliness are “eventual consistency,” whereas violations of integrity are “perpetual inconsistency.”

  ACID同時保證及時性與完整性,但基于時間的資料流一般不保證及時性,exactly-once保證完整性

  在資料流系統如何保證完整性?

  (1) 寫操作是一個單一的message,原子性寫入

  (2)derived datasystem從單一消息确定性地提取狀态

  (3)用戶端生成reqid,在整個流程用整個reqid保證幂等性

  (4)單一消息不可變,持久化,允許derived datasystem重新處理所有消息

  盡量避免協調的資料系統 Coordination-avoiding data systems

  (1)資料流系統通過derived data,無需原子性送出、線性、跨分片的同步協調就能保證完整性

  (2)雖然嚴格的unique 限制要求實時性(timeliness)和協調,很多應用可以通過事後補償放松限制

  對資料完整性的校驗,防止資料默默出錯silent corruption,多副本不能解決這個問題

  基于事件的系統提供了更好的可審計性, 如記錄A給B轉賬,比記錄A扣錢,B加錢更好

  Checking the integrity of data systems is best done in an end-to-end fashion

  軟體和資料大大影響了我們生存的世界,對于我們這些工程師,需要承擔起責任:建立一個充滿人文關懷和尊重的世界。

  科技是放大鏡:放大了善與惡

本文版權歸作者xybaby(博文位址:http://www.cnblogs.com/xybaby/)所有,歡迎轉載和商用,請在文章頁面明顯位置給出原文連結并保留此段聲明,否則保留追究法律責任的權利,其他事項,可留言咨詢。

繼續閱讀