天天看點

事件來源Event sourcing

1.事件來源基本思想:當一個持久化actor接收到一個(非持久化)指令,首先它要驗證(這個指令)是否可以運用到目前狀态。

如果指令驗證成功,根據這個指令産生一個事件。在事件成功持久化之後,可以用來改變actor的狀态。

當持久化actor需要恢複時,因為之前已經驗證過可以運用到目前狀态,我們可以直接将持久化的事件進行重放。

Akka持久化通過

Abstract

PersistentActor

支援事件來源。actor可以使用persist方法持久化和處理事件。通過實作

CreateReceiveRecover

CreateReceive

定義

AbstractPersistentActor

的行為。

CreateReceiveRecover

方法是在恢複過程中處理事件和快照消息。

CreateReceive

方法是用來處理普通的Actor的消息。若Actor收到指令的話會調用persist方法

persist方法是異步的方式持久化事件。它有兩個參數 ,一個是事件 ,另一個是事件處理程式(event handler)。

事件處理程式是将之前持久化過的事件進行處理,該事件在内部作為獨立消息發送回持久化actor 來使事件處理程式執行,來

改變或關閉持久化actor的狀态。持久化事件的發送者也是相應指令的發送者,是以當指令的發送者沒顯出時事件處理程式也可以回複。

當使用persist方法來持久化事件時在調用和執行相關事件處理程式的過程中,要保證持久化actor不會收到下一步的指令,否則會受到影響。當在某個指令的上下文中多次調用

persist方法時。這個過程中收到的消息一直被

​​暫存​​直到

presist方法運作結束。

如果執行個體化事件失敗,onPersistFailure方法将被調用(預設記錄為error),并且actor将無條件地被停止。如果持久化事件在存儲之前被拒絕,比如事件發生連續錯誤,onPersistRejected将被調用(預設記錄為warning)并且actor繼續下一條消息

辨別符

   一個持久化actor必須有個辨別符 這個辨別符必須用

persistenceId

 方法來定義。

恢複

PersistentActor在啟動和重新開機時通過重放之前持久化的日志消息來實作自動恢複,如果在恢複過程中收到新的消息,會将新消息先存儲起來等恢複完成後,在收到新的消息。

可以限制同一時間并發的恢複的數量,來限制系統和後端的資料存儲不超載,如果超過限制actor将等待到其他恢複都完成後才開始。配置方式:

akka.persistence.max-concurrent-recoveries = 50      

自定義恢複

在應用程式中有時也需要依照客戶具體要求來恢複,通過傳回

recovery 方法中的自定義

Recovery對象來執行自定義恢複。

recovery 是

PersistentActor的一個方法。

你可以使用

SnapshotSelectionCriteria.None.

 來跳過加載快照和重放所有事件。它用于将快照序列化格式變成互不相容的方式時。不适宜用于事件被删除的情況下。

@Overridepublic Recovery recovery() { return Recovery.create(SnapshotSelectionCriteria.none());}      

另一個可能的自定義恢複是設定重放的上界,對debug很有幫助,使得actor僅在過去的某個點重放。

@Overridepublic Recovery recovery() { return Recovery.create(457L);}      

在PersistentActor的recovery 方法中傳回Recovery.none()可以使恢複失效。

@Overridepublic Recovery recovery() { return Recovery.none();}      

恢複狀态

持久化actor可以通過以下方法查詢它自己的恢複狀态

public boolean recoveryRunning();public boolean recoveryFinished();      

持久化actor在回複完成後會收到一個特殊的RecoveryCompleted 消息。然後再執行下一步操作

如果actor從日志中的恢複狀态有問題,onRecoveryFailure

 會被調用(記錄為error)并且actor将被停止。

内部暫存(stash)

持久化actor有一個私有的

​​暫存​​用來緩存整個恢複過程中進來的消息或者暫存

persist\persistAll方法持久化的事件。内部暫存通過挂鈎到

unstashAll 與普通暫存協作

你應該控制消息的産出不要超過持久化actor的處理能力,否則暫存消息的數量将無限增長。是以我們要在mailbox配置中定義暫存的容量來保護暫存并防止發生

OutOfMemoryError

akka.actor.default-mailbox.stash-capacity=10000

注意,如果你有很多持久化actor,要定義一個小的暫存容量,防止占用過多的記憶體

持久化actor定義了三個政策來處理内部暫存容量超出的故障。預設的溢出政策是

ThrowOverflowExceptionStrategy,具體内容是丢棄目前的資訊,抛出

StashOverflowException異常,造成actor重新開機。

你可以覆寫

internalStashOverflowStrategy 方法為了“獨特的”持久化actor來傳回

DiscardToDeadLetterStrategy 或者

ReplyToStrategy

 或者通過提供FQCN(Fully Qualified Class Name完全限定類名)來給所有的持久化actor來定義“預設值”。

在persistence 的配置中:

akka.persistence.internal-stash-overflow-strategy="akka.persistence.ThrowExceptionConfigurator"      

DiscardToDeadLetterStrategy 政策也有一個打包好的配akka.persistence.DiscardConfigurator.

你也可以查詢預設政策:

Persistence(context.system).defaultInternalStashOverflowStrategy      

放寬的局部一緻性要求和高吞吐量的用例

如果面臨放寬的局部一緻性要求和高吞吐量,有時

PersistentActor及其

persist在處理大量湧入的指令時可能會不夠,有時你可能會放寬一緻性要求——例如你會想要盡可能快速地處理指令,假設事件最終會持久化并在背景恰當處理,并在需要時追溯性地回應持久性故障。

persistAsync方法提供了一個工具,用于實作高吞吐量的持久化actor。當日志仍在緻力于持久化和執行使用者事件回調代碼時,它

不會暫存傳入的指令。

推遲操作,直到持久化處理程式已經執行

PersistentActor 提供了一個實用的方法

deferAsync(延遲異步),它工作起來類似于

persistAsync但是不持久化傳遞過的事件,它将保留在記憶體中,并在調用處理程式時使用。建議将其用于

讀取操作,以及在domain模型中沒有相應事件的操作。

請注意,

sender()在處理程式回調中是安全的,将指向該指令的原始發送方,該指令将調用這個

deferAsync處理程式。

持久化嵌套調用

可以在各自的回調塊中調用

persistAsync 和

persist,它們将适當地保留線程安全性(包括sender的正确值)和存儲保證。