天天看點

SpringBatch從入門到精通-3.2-并行處理-遠端分區1. 背景2. 遠端分區的db輪詢模式3. 分區器Partitioner 和PartitionHandler的使用4. StepExecutionAggregator聚合器的使用5. 整體流程

1. 背景

上章講到了并行處理相關内容。但遠端分區還是講的不夠細。還差

遠端分區的db輪詢模式

分區器Partitioner 和PartitionHandler的使用,

聚合器的使用。

遠端分區的整體流程

2. 遠端分區的db輪詢模式

SpringBatch從入門到精通-3.2-并行處理-遠端分區1. 背景2. 遠端分區的db輪詢模式3. 分區器Partitioner 和PartitionHandler的使用4. StepExecutionAggregator聚合器的使用5. 整體流程
  • Master使用者端需要實作 outgoIng,outband
@Bean  //定義channle
    public DirectChannel managerDBPollRequests() {
        return new DirectChannel();
    }
    @Bean  // 定義從channel内容經過amqp發送到requests隊列中
    public IntegrationFlow managerDBPollOutboundFlow() {
        return IntegrationFlows.from(managerDBPollRequests())
                .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
                .get();
    }
    
    // 定義遠端分區step Manager端(master)
     @Bean
    public Step managerDBPollStep() {
        return this.managerStepBuilderFactory.get("managerDBPollStep") // 定義名稱
                .partitioner("workerDBPollStep", new SimplePartitioner()) //傳入分區器
                .gridSize(GRID_SIZE) // 傳入分區器可以分幾個區
                .outputChannel(managerDBPollRequests()) //将分區結果發送到對應的channel
                .build();
    }
    
​      
SpringBatch從入門到精通-3.2-并行處理-遠端分區1. 背景2. 遠端分區的db輪詢模式3. 分區器Partitioner 和PartitionHandler的使用4. StepExecutionAggregator聚合器的使用5. 整體流程

代碼流程大概是上圖的樣子。

  • worker使用者端需要實作 incomingRequest,inband
    ​
        // 定義消息接收channel
        @Bean
        public DirectChannel workerDBPollRequests() {
            return new DirectChannel();
        }
    ​
        @Bean
        public IntegrationFlow workerDBPollInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
            return IntegrationFlows
                    .from(Amqp.inboundAdapter(
                            rabbitmqConnectionFactory,"requests"))//從消息隊列request來消息
                    .channel(workerDBPollRequests()//執行完的消息内容,發送到channel
                    ).get();
        }
        @Bean
        public Step workerDBPollStep() {
            return this.workerStepBuilderFactory
                    .get("workerDBPollStep") //step名稱
                    .inputChannel(workerDBPollRequests()) // 接收channel
                    .tasklet(workerDBPollStepTasklet(null))
                    .build();
        }      
    SpringBatch從入門到精通-3.2-并行處理-遠端分區1. 背景2. 遠端分區的db輪詢模式3. 分區器Partitioner 和PartitionHandler的使用4. StepExecutionAggregator聚合器的使用5. 整體流程

3. 分區器Partitioner 和PartitionHandler的使用

  • PartitionHandler 分區執行器

PartitionHandler

是了解遠端處理或網格環境結構的元件。它能夠向

StepExecution

遠端執行個體發送請求

Step

,以某種特定于結構的格式(如 DTO)進行包裝。它不必知道如何拆分輸入資料或如何聚合多次

Step

執行的結果。一般來說,它可能也不需要了解彈性或故障轉移,因為在許多情況下,這些都是結構的特性。在任何情況下,Spring Batch 始終提供獨立于結構的可重新開機性。失敗的

Job

總是可以重新啟動,隻有失敗

Steps

的才會重新執行。

PartitionHandler

接口可以為各種結構類型提供專門的實作,包括簡單的 RMI 遠端處理、EJB 遠端處理、自定義 Web 服務、JMS、Java 空間、共享記憶體網格(如 Terracotta 或 Coherence)和網格執行結構(如 GridGain)。Spring Batch 不包含任何專有網格或遠端結構的實作。

然而,Spring Batch 确實提供了一個有用的實作,它使用Spring 的政策在單獨的執行線程中本地

PartitionHandler

執行

Step

執行個體 。

TaskExecutor

該實作稱為

TaskExecutorPartitionHandler

.

可以在

TaskExecutorPartitionHandler

java 配置中顯式配置,如下例所示:

@Bean
    public Step step1Manager() {
        return stepBuilderFactory.get("step1.manager")
                .partitioner("step1", new SimplePartitioner())
                .partitionHandler(partitionHandler())
                .build();
    }
​
    @Bean
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
        retVal.setTaskExecutor(new SimpleAsyncTaskExecutor());
        retVal.setStep(step1());
        retVal.setGridSize(10);
        return retVal;
    }      

gridSize屬性确定要建立的單獨步驟執行的數量,是以它可以與TaskExecutor中線程池的大小相比對。或者,可以将其設定為大于可用的線程數,這使得工作塊更小。

TaskExecutionPartitionHandler對于IO密集型步驟執行個體非常有用,例如複制大量檔案或将檔案系統複制到内容管理系統中。它還可以通過提供作為遠端調用代理的步驟實作(例如使用Spring遠端處理)來用于遠端執行。

  • 分區器Partitioner

有一個更簡單的

Partitioner

職責:僅生成執行上下文作為新步驟執行的輸入參數(無需擔心重新啟動)。它隻有一個方法,如下面的接口定義所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}      

此方法的傳回值将每個步驟執行的唯一名稱 (Map中的key) ,值為具體的step的ExecutionContext資訊。這些名稱稍後在批進行中繼資料中顯示為分區中的步驟名稱

StepExecutions

。它

ExecutionContext

隻是一個名稱-值對的包,是以它可能包含一系列主鍵、行号或輸入檔案的位置。然後遠端

Step

通常使用占位符綁定到上下文輸入

#{…}

使用stepScope 中的@Value來擷取

{partition2={start=20}, partition1={start=10}, partition0={start=1}}      

4. StepExecutionAggregator聚合器的使用

  • StepExecutionAggregator 最後的統計分析聚合邏輯
    void aggregate(StepExecution result, Collection<StepExecution> executions);      

    該接口隻有一個方法。一個方法從入參來看是。接受Collection<StepExecution> executions 的集合。也就是和partitioner的反過程。分而治之的思想,彙總聚合分區step完成manager狀态的更新。

    預設實作如下 DefaultStepExecutionAggregator.java

    public void aggregate(StepExecution result, Collection<StepExecution> executions) {
            Assert.notNull(result, "To aggregate into a result it must be non-null.");
            if (executions == null) {
                return;
            }
            for (StepExecution stepExecution : executions) {
                BatchStatus status = stepExecution.getStatus();
                result.setStatus(BatchStatus.max(result.getStatus(), status));
                result.setExitStatus(result.getExitStatus().and(stepExecution.getExitStatus()));
                result.setFilterCount(result.getFilterCount() + stepExecution.getFilterCount());
                result.setProcessSkipCount(result.getProcessSkipCount() + stepExecution.getProcessSkipCount());
                result.setCommitCount(result.getCommitCount() + stepExecution.getCommitCount());
                result.setRollbackCount(result.getRollbackCount() + stepExecution.getRollbackCount());
                result.setReadCount(result.getReadCount() + stepExecution.getReadCount());
                result.setReadSkipCount(result.getReadSkipCount() + stepExecution.getReadSkipCount());
                result.setWriteCount(result.getWriteCount() + stepExecution.getWriteCount());
                result.setWriteSkipCount(result.getWriteSkipCount() + stepExecution.getWriteSkipCount());
            }
        }      
    這個aggregate是在什麼時候設定進去的呢 PartitionStep#doExecute
    protected void doExecute(StepExecution stepExecution) throws Exception {
       stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
    ​
       // Wait for task completion and then aggregate the results
       Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
       stepExecution.upgradeStatus(BatchStatus.COMPLETED);
       stepExecutionAggregator.aggregate(stepExecution, executions);
    ​
       // If anything failed or had a problem we need to crap out
       if (stepExecution.getStatus().isUnsuccessful()) {
          throw new JobExecutionException("Partition handler returned an unsuccessful step");
       }
    }      

分區step如何設定aggregate

@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
            .partitioner("step1", new SimplePartitioner())
            .partitionHandler(partitionHandler())
            .aggregator(new DefaultStepExecutionAggregator())
            .build();
}      

5. 整體流程

整體流程就是partitionStep的執行過程

protected void doExecute(StepExecution stepExecution) throws Exception {
		stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
		
		// Wait for task completion and then aggregate the results
		// 等待任務結束然後聚合結果。
		Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
		// 不管結果是成功還是失敗,直接更新結果為完成
		stepExecution.upgradeStatus(BatchStatus.COMPLETED);
		// 調用聚合器,将分區結果聚合
		stepExecutionAggregator.aggregate(stepExecution, executions);
		
		// If anything failed or had a problem we need to crap out
		// 聚合的結果如果不是成功狀态,那麼直接抛出錯誤。step異常結束,job異常結束
		if (stepExecution.getStatus().isUnsuccessful()) {
			throw new JobExecutionException("Partition handler returned an unsuccessful step");
		}
	}
	// 狀态為failed 或者 大于FAILED,即 ABANDONED, UNKNOWN都為失敗
	public boolean isUnsuccessful() {
		return this == FAILED || this.isGreaterThan(FAILED);
	}      

代碼位置: GitHub - jackssybin/jackssybin_springBatch