天天看點

開源微服務編排架構:Netflix Conductor

開源微服務編排架構:Netflix Conductor

作者 | 夜陽

來源 | 阿裡技術公衆号

本文主要介紹netflix conductor的基本概念和主要運作機制。

一 簡介

netflix conductor是基于JAVA語言編寫的開源流程引擎,用于架構基于微服務的流程。它具備如下特性:

  • 允許建立複雜的業務流程,流程中每個獨立的任務都是由一個微服務所實作。
  • 基于JSON DSL 建立工作流,對任務的執行進行編排。
  • 工作流在執行的過程中可見、可追溯。
  • 提供暫停、恢複、重新開機等多種控制模型。
  • 提供一種簡單的方式來最大限度重用微服務。
  • 擁有擴充到百萬流程并發運作的服務能力。
  • 通過隊列服務實作用戶端與服務端的分離。
  • 支援 HTTP 或其他RPC協定進行資料傳送

二 基本概念

1 Task

Task是最小執行單元,承載了一段執行邏輯,如發送HTTP請求等。

  • System Task:被conductor服務執行,這些任務的執行與引擎在同一個JVM中。
  • Worker Task:被worker服務執行,執行與引擎隔離開,worker通過隊列擷取任務後,執行并更新結果狀态到引擎。Worker的實作是跨語言的,其使用Http協定與Server通信。

conductor提供了若幹内置SystemTask:

  • 功能性Task:
    • HTTP:發送http請求
    • JSON_JQ_TRANSFORM:jq指令執行,一般使用者json的轉換,具體可見jq官方文檔
    • KAFKA_PUBLISH: 釋出kafka消息
  • 流程控制Task:
    • SWITCH(原Decision):條件判斷分支,類似于代碼中的switch case
    • FORK:啟動并行分支,用于排程并行任務
    • JOIN:彙總并行分支,用于彙總并行任務
    • DO_WHILE:循環,類似于代碼中的do while
    • WAIT:一直在運作中,直到外部時間觸發更新節點狀态,可用于等待外部操作
    • SUB_WORKFLOW:子流程,執行其他的流程
    • TERMINATE:結束流程,以指定輸出提前結束流程,可以與SWITCH節點配合使用,類似代碼中的提前return語句
  • 自定義Task:
    • 對于System Task,Conductor提供了WorkflowSystemTask 抽象類,可以自定義擴充實作。
    • 對于Worker Task,可以實作conductor的client Worker接口實作執行邏輯。

2 Workflow

  • Workflow由一系列需要執行的Task組成,conductor采用json來描述Task的流轉關系。
  • 除基本的順序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務,還能實作分支、并行、循環、提前結束等流程控制。

3 Input&Output

Task的輸入是一種映射,其作為工作流執行個體化的一部分或某些其他Task的輸出。允許将來自工作流或其他Task的輸入/輸出作為随後執行的Task的輸入。

  • Task有自己的輸入和輸出,輸入輸出都是jsonobject類型。
  • Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用文法為json-path,除最基礎的${taskxxx.output}的值解析方式外,還支援其他複雜操作,如過濾等,具體見json-path文法。
  • 啟動Workflow時可以傳入流程的輸入資料,Task可以通過${workflow.input}的方式引用。

Task實作原子操作的處理以及流程控制操作,Workflow定義描述Task的流轉關系,Task引用Workflow或者其它Task的輸入輸出。通過這些機制,conductor實作了JSON DSL對流程的描述。

三 整體架構

開源微服務編排架構:Netflix Conductor

主要分為幾個部分:

  • Orchestrator: 負責流程的流轉排程工作;
  • Management/Execution Service: 提供流程、任務的管理更新等操作;
  • TaskQueues: 任務隊列,Orchestrator解析出來的待執行Task會放到隊列中;
  • Worker: 任務執行worker,從TaskQueues中擷取任務,通過Execution Service更新任務狀态與結果資料;
  • Database: 中繼資料&運作時資料庫,用于儲存運作時的Workflow、Task等狀态資訊,以及流程任務定義的等原資訊;
  • Index: 索引資料庫,用于存儲執行曆史;

四 運作模型

1 Task狀态轉移

  • SCHEDULED:待排程,task放到隊列中還沒有被poll出來執行時的狀态
  • IN_PROGRESS:執行中,被poll出來執行但還沒有完成時的狀态
  • COMPLETED:執行完成
  • FAILED:執行失敗
  • CANCELLED:被中止時為此狀态,一般出現在兩種情況:
    1. 手動中止流程時,正在運作中的task會被置為此狀态;
    2. 多個fork分支,當某個分支的task失敗時,其它分支中正在運作的task會被置為此狀态;
開源微服務編排架構:Netflix Conductor

2 任務隊列

任務的執行(同步的系統任務除外)都會先添加到任務隊列中,是典型的生産者消費者模式。

  • 任務隊列,是一個帶有延遲、優先級功能的隊列;
  • 每種類型的Task是一個單獨的隊列,此外,如果配置了domain、isolationGroup,還會拆分成多個隊列實作執行隔離;
  • decider service是生産者,其根據流程配置與目前執行情況,解析出可執行的task後,添加到隊列;
  • 任務執行器(SystemTaskWorker、Worker)是消費者,其長輪詢對應的隊列,從隊列中擷取任務執行;

隊列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的實作。

3 核心功能實作機制

conductor排程的核心是decider service,其根據目前流程運作的狀态,解析出将要執行的任務清單,将任務入隊交給worker執行。

decide主要流程簡化如下,詳細代碼見WorkflowExecutor.java的decide方法:

開源微服務編排架構:Netflix Conductor

其中,排程任務處理流程簡化如下,詳細代碼見WorkflowExecutor.java的scheduleTask方法:

開源微服務編排架構:Netflix Conductor

decide的觸發時機

最主要的觸發時機:

  1. 新啟動執行時,會觸發decide操作
  2. 系統任務執行完成時,會觸發decide操作
  3. Workder任務通過ExecutionService更新任務狀态時,會觸發decide操作

流程控制節點的實作機制

1)Task & TaskMapper

對于每一個Task來說,都有Task和TaskMapper兩部分:

  1. Task:任務的執行邏輯代碼,它的作用是Task的執行
  2. TaskMapper:任務的映射邏輯代碼,它通過Task的定義配置、目前執行個體的執行狀态等資訊,傳回實際需要執行的Task清單

對于一般的任務來說,TaskMapper傳回的是就是Task本身,補充一些執行執行個體的狀态資訊。但是對于控制節點來說,會有不同的邏輯。

2)條件分支(SWITCH)的實作機制

SWITCH用于根據條件判斷,執行不同的分支。

實際上,該節點的Task不做任何操作,TaskMapper根據分支條件,判斷出要走的分之後,傳回對應分支的第一個Task。

SwitchTaskMapper.java getMappedTasks方法關鍵代碼:

// 待排程的Task list,最終傳回結果
List<Task> tasksToBeScheduled = new LinkedList<>();
// evalResult是分支條件變量的值(case)
// decisionCases是一個Map結構,key為分支的case值,value為對應分支的任務定義list(分支内的任務定義會有多個)
// 根據分支變量的實際值,擷取對應分支的任務定義list
List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
// default的邏輯:如果擷取不到對應的分支或者分支為空,則用預設的分支
if (selectedTasks == null || selectedTasks.isEmpty()) {
  selectedTasks = taskToSchedule.getDefaultCase();
}
if (selectedTasks != null && !selectedTasks.isEmpty()) {
  // 擷取分支的第一個(下标0)task,傳回給decider service去做排程(decider會把任務添加到隊列裡,交給worker去執行)
  WorkflowTask selectedTask = selectedTasks.get(0);
  // 調用了deciderService的getTasksToBeScheduled方法,此方法裡又擷取到TaskMapper調用了getMappedTasks。這裡采用了遞歸調用的方式,解析嵌套的Task
  List<Task> caseTasks = taskMapperContext.getDeciderService()
    .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
  tasksToBeScheduled.addAll(caseTasks);
  switchTask.getInputData().put("hasChildren", "true");
}
return tasksToBeScheduled;           

3)并行(FORK)的實作機制

FORK用于開啟多個并行分支。

實際上,該節點的Task不做任何操作,TaskMapper傳回所有并行分支的第一個Task。

ForkJoinTaskMapper.java getMappedTasks關鍵代碼:

// 待排程的Task list,最終傳回結果
List<Task> tasksToBeScheduled = new LinkedList<>();
// 配置中的所有fork分支
List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
for (List<WorkflowTask> wfts : forkTasks) {
  // 每個分支取第一個Task
  WorkflowTask wft = wfts.get(0);
  // 調用了deciderService的getTasksToBeScheduled方法,此方法裡又擷取到TaskMapper調用了getMappedTasks。這裡采用了遞歸調用的方式,解析嵌套的Task
  List<Task> tasks2 = taskMapperContext.getDeciderService()
    .getTasksToBeScheduled(workflowInstance, wft, retryCount);
  tasksToBeScheduled.addAll(tasks2);
}
return tasksToBeScheduled;           

總的來說,分支(SWITCH)、并行(FORK)節點本身沒有執行邏輯,其通過TaskMapper傳回到實際要執行的Task,然後交給Decider Service處理。

重試的實作機制

重試和其延遲時間設定,都是借助任務隊列的功能實作的。

重試:将任務重新添加到任務隊列

重試的延遲時間:添加到任務隊列時設定延遲時間,延遲時間過後,任務才能在隊列中被poll出來執行

五 完整性保障機制

由于排程過程中可能會出現因機器重新開機、網絡異常、JVM崩潰等偶發情況,這些會導緻的decide過程意外終止,流程執行不完整,展現出如流程一直運作中(實際已經沒有在排程),或者其它狀态錯誤等異常現象。

1 WorkflowReconciler

針對這種情況,conductor有一個WorkflowReconciler,會定期嘗試decide所有正在運作中的流程,修複流程執行的一緻性。此外,它還有一個作用是校驗流程逾時時間。

2 decideQueue

那麼WorkflowReconciler是如何擷取到目前運作中的流程呢,答案是decideQueue。

decideQueue和任務隊列相同,也是一個具有延遲功能的隊列,其存放的是正在執行中的流程的執行個體id。在任務開始執行時(包括新啟動執行、重試執行、恢複執行、重跑執行等),會将執行個體id push到decideQueue中;在執行結束(成功、失敗)時,會從decideQueue中删除執行個體id。

3 ExecutionLockService

WorkflowReconciler會定期嘗試decide所有正在運作中的流程用于逾時判斷、維護流程一緻性。但是流程本身正常執行也會觸發decide,如果同一個執行同時觸發兩個decide,可能會導緻狀态混亂,執行卡住等問題。

conductor采用了鎖來解決這個問題,其提供了單機LocalOnlyLock(基于信号量實作)、redis分布式鎖(基于redission實作)、zookeeper分布式鎖三種實作。

decide方法中最開始會嘗試擷取鎖,如果擷取失敗則直接傳回。通過鎖來保障不會對同一個流程執行個體并發執行decide。

if (!executionLockService.acquireLock(workflowId)) {
  return false;
}           

由于鎖是可配置的,可能會導緻一個誤區:單台機器的話不用配置鎖。其實單機也是需要配置鎖的,因為WorkflowReconciler和流程正常執行會産生沖突,可能會導緻偶發的流程狀态混亂問題。

參考:

Github:

https://github.com/Netflix/conductor

官方文檔:

https://netflix.github.io/conductor/

WorkflowReconciler:

https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java

WorkflowSystemTask:

https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java

PostgreSQL實戰進階

PostgreSQL被譽為“世界上功能最強大的開源資料庫”,是以加州大學伯克利分校計算機系開發的POSTGRES 4.2為基礎的對象關系型資料庫管理系統。PostgreSQL支援大部分 SQL标準并且提供了許多其他現代特性:複雜查詢、外鍵、觸發器、視圖、事務完整性、MVCC。同樣,PostgreSQL 可以用許多方法擴充,比如,通過增加新的資料類型、函數、操作符、聚集函數、索引。開發者可以免費使用、修改、和分發 PostgreSQL,不管是私用、商用、還是學術研究使用。

本課程由PostgreSQL社群核心成員出品,帶你快速從0-1深入PostgreSQL核心特性。

點選這裡

,檢視詳情。