前言
哈喽,大家好,我是asong,這次給大家介紹一個go的異步任務架構machinery。使用過python的同學們都知道架構,
Celery
架構就類似于
machinery
架構。下面我們就來學習一下
Celery
machinery
的基本使用。
github下載下傳:https://github.com/asong2020/Golang_Dream/tree/master/machinery
抛磚引玉
我們在使用某些APP時,登陸系統後一般會收到一封郵件或者一個短信提示我們在某個時間某某地點登陸了。而郵件或短信都是在我們已經登陸後才收到,這裡就是采用的異步機制。大家有沒有想過這裡為什麼沒有使用同步機制實作呢?我們來分析一下。假設我們現在采用同步的方式實作,使用者在登入時,首先會去檢驗一下賬号密碼是否正确,驗證通過後去給使用者發送登陸提示資訊,假如在這一步出錯了,那麼就會導緻使用者登陸失敗,這樣是大大影響使用者的體驗感的,一個登陸提示的優先級别并不是很高,是以我們完全可以采用異步的機制實作,即使失敗了也不會影響使用者的體驗。前面說了這麼多,那麼異步機制該怎麼實作呢?對,沒錯,就是
machinery
架構,聽說你們還不會使用它,今天我就寫一個小例子,我們一起來學習一下他吧。
特性
上面隻是簡單舉了個例子,任務隊列有着廣泛的應用場景,比如大批量的計算任務,當有大量資料插入,通過拆分并分批插入任務隊列,進而實作串行鍊式任務處理或者實作分組并行任務處理,提高系統魯棒性,提高系統并發度;或者對資料進行預處理,定期的從後端存儲将資料同步到到緩存系統,進而在查詢請求發生時,直接去緩存系統中查詢,提高查詢請求的響應速度。适用任務隊列的場景有很多,這裡就不一一列舉了。回歸本文主題,既然我們要學習
machinery
,就要先了解一下他都有哪些特性呢?
- 任務重試機制
- 延遲任務支援
- 任務回調機制
- 任務結果記錄
- 支援Workflow模式:Chain,Group,Chord
- 多Brokers支援:Redis, AMQP, AWS SQS
- 多Backends支援:Redis, Memcache, AMQP, MongoDB
架構
任務隊列,簡而言之就是一個放大的生産者消費者模型,使用者請求會生成任務,任務生産者不斷的向隊列中插入任務,同時,隊列的處理器程式充當消費者不斷的消費任務。基于這種架構設計思想,我們來看下machinery的簡單設計結構圖例:
- Sender:業務推送子產品,生成具體任務,可根據業務邏輯中,按互動進行拆分;
- Broker:存儲具體序列化後的任務,machinery中目前支援到Redis, AMQP,和SQS;
- Worker:工作程序,負責消費者功能,處理具體的任務;
- Backend:後端存儲,用于存儲任務執行狀态的資料;
e.g
學習一門新東西,我都習慣先寫一個demo,先學會了走,再學會跑。是以先來看一個例子,功能很簡單,異步計算1到10的和。
先看一下配置檔案代碼:
broker: redis://localhost:6379
default_queue: "asong"
result_backend: redis://localhost:6379
redis:
max_idle: 3
max_active: 3
max_idle_timeout: 240
wait: true
read_timeout: 15
write_timeout: 15
connect_timeout: 15
normal_tasks_poll_period: 1000
delayed_tasks_poll_period: 500
delayed_tasks_key: "asong"
複制
這裡
broker
與
result_backend
來實作。
主代碼,完整版github擷取:
func main() {
cnf,err := config.NewFromYaml("./config.yml",false)
if err != nil{
log.Println("config failed",err)
return
}
server,err := machinery.NewServer(cnf)
if err != nil{
log.Println("start server failed",err)
return
}
// 注冊任務
err = server.RegisterTask("sum",Sum)
if err != nil{
log.Println("reg task failed",err)
return
}
worker := server.NewWorker("asong", 1)
go func() {
err = worker.Launch()
if err != nil {
log.Println("start worker error",err)
return
}
}()
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
}
asyncResult, err := server.SendTask(signature)
if err != nil {
log.Fatal(err)
}
res, err := asyncResult.Get(1)
if err != nil {
log.Fatal(err)
}
log.Printf("get res is %v\n", tasks.HumanReadableResults(res))
}
複制
運作結果:
INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
複制
好啦,現在我們開始講一講上面的代碼流程,
- 讀取配置檔案,這一步是為了配置
和broker
,這裡我都選擇的是result_backend
,因為電腦正好有這個環境,就直接用了。redis
-
庫必須在使用前執行個體化。實作方法是建立一個Machinery
執行個體。Server
是Server
配置和注冊任務的基本對象。Machinery
- 在你的
能消費一個任務前,你需要将它注冊到伺服器。這是通過給任務配置設定一個唯一的名稱來實作的。workders
- 為了消費任務,你需有有一個或多個worker正在運作。運作worker所需要的隻是一個具有已注冊任務的
執行個體。每個worker将隻使用已注冊的任務。對于隊列中的每個任務,Worker.Process()方法将在一個goroutine中運作。可以使用Server
的第二參數來限制并發運作的worker.Process()調用的數量(每個worker)。server.NewWorker
- 可以通過将
執行個體傳遞給Signature
執行個體來調用任務。Server
- 調用
這個方法可以處理反射值,擷取到最終的結果。HumanReadableResults
多功能
1. 延時任務
上面的代碼隻是一個簡單
machinery
使用示例,其實
machiney
也支援延時任務的,可以通過在任務
signature
上設定ETA時間戳字段來延遲任務。
eta := time.Now().UTC().Add(time.Second * 20)
signature.ETA = &eta
複制
2. 重試任務
在将任務聲明為失敗之前,可以設定多次重試嘗試。斐波那契序列将用于在一段時間内分隔重試請求。這裡可以使用兩種方法,第一種直接對
tsak signature
中的
retryTimeout
和
RetryCount
字段進行設定,就可以,重試時間将按照斐波那契數列進行疊加。
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
RetryTimeout: 100,
RetryCount: 3,
}
複制
或者,你可以使用
return.tasks.ErrRetryTaskLater
傳回任務并指定重試的持續時間。
func Sum(args []int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, tasks.NewErrRetryTaskLater("我說他錯了", 4 * time.Second)
}
複制
3. 工作流
上面我們講的都是運作一個異步任務,但是我們往往做項目時,一個需求是需要多個異步任務以編排好的方式執行的,是以我們就可以使用
machinery
的工作流來完成。
3.1 Groups
Group
是一組任務,它們将互相獨立地并行執行。還是畫個圖吧,這樣看起來更明了:
一起來看一個簡單的例子:
// group
group,err :=tasks.NewGroup(signature1,signature2,signature3)
if err != nil{
log.Println("add group failed",err)
}
asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
if err != nil {
log.Println(err)
}
for _, asyncResult := range asyncResults{
results,err := asyncResult.Get(1)
if err != nil{
log.Println(err)
continue
}
log.Printf(
"%v %v %v\n",
asyncResult.Signature.Args[0].Value,
tasks.HumanReadableResults(results),
)
}
複制
group
中的任務是并行執行的。
3.2 chords
我們在做項目時,往往會有一些回調場景,
machiney
也為我們考慮到了這一點,
Chord
允許你定一個回調任務在
groups
中的所有任務執行結束後被執行。
來看一段代碼:
callback := &tasks.Signature{
Name: "call",
}
group, err := tasks.NewGroup(signature1, signature2, signature3)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chord, err := tasks.NewChord(group, callback)
if err != nil {
log.Printf("Error creating chord: %s", err)
return
}
chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
if err != nil {
log.Printf("Could not send chord: %s", err.Error())
return
}
results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chord result failed with error: %s", err.Error())
return
}
log.Printf("%v\n", tasks.HumanReadableResults(results))
複制
上面的例子并行執行task1、task2、task3,聚合它們的結果并将它們傳遞給callback任務。
3.3 chains
chain
就是一個接一個執行的任務集,每個成功的任務都會觸發
chain
中的下一個任務。
看這樣一段代碼:
//chain
chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
if err != nil {
log.Printf("Could not send chain: %s", err.Error())
return
}
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chain result failed with error: %s", err.Error())
}
log.Printf(" %v\n", tasks.HumanReadableResults(results))
複制
上面的例子執行task1,然後是task2,然後是task3。當一個任務成功完成時,結果被附加到
chain
中下一個任務的參數清單的末尾,最終執行
callback
任務。
文中代碼位址:https://github.com/asong2020/Golang_Dream/tree/master/machinery/example
總結
這一篇文章到這裡就結束了,
machinery
還有很多用法,比如定時任務、定時任務組等等,就不在這一篇文章介紹了。更多使用方法解鎖可以觀看
machinery
文檔。因為
machiney
沒有中文文檔,是以我在學習的過程自己翻譯了一篇中文文檔,需要的小夥伴們自取。