1. 背景
上章講到了并行處理相關内容。但遠端分區還是講的不夠細。還差
遠端分區的db輪詢模式
分區器Partitioner 和PartitionHandler的使用,
聚合器的使用。
遠端分區的整體流程
2. 遠端分區的db輪詢模式
- Master使用者端需要實作 outgoIng,outband
@Bean //定義channle
public DirectChannel managerDBPollRequests() {
return new DirectChannel();
}
@Bean // 定義從channel内容經過amqp發送到requests隊列中
public IntegrationFlow managerDBPollOutboundFlow() {
return IntegrationFlows.from(managerDBPollRequests())
.handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
.get();
}
// 定義遠端分區step Manager端(master)
@Bean
public Step managerDBPollStep() {
return this.managerStepBuilderFactory.get("managerDBPollStep") // 定義名稱
.partitioner("workerDBPollStep", new SimplePartitioner()) //傳入分區器
.gridSize(GRID_SIZE) // 傳入分區器可以分幾個區
.outputChannel(managerDBPollRequests()) //将分區結果發送到對應的channel
.build();
}
代碼流程大概是上圖的樣子。
- worker使用者端需要實作 incomingRequest,inband
// 定義消息接收channel @Bean public DirectChannel workerDBPollRequests() { return new DirectChannel(); } @Bean public IntegrationFlow workerDBPollInboundFlow(ConnectionFactory rabbitmqConnectionFactory) { return IntegrationFlows .from(Amqp.inboundAdapter( rabbitmqConnectionFactory,"requests"))//從消息隊列request來消息 .channel(workerDBPollRequests()//執行完的消息内容,發送到channel ).get(); } @Bean public Step workerDBPollStep() { return this.workerStepBuilderFactory .get("workerDBPollStep") //step名稱 .inputChannel(workerDBPollRequests()) // 接收channel .tasklet(workerDBPollStepTasklet(null)) .build(); }
3. 分區器Partitioner 和PartitionHandler的使用
- PartitionHandler 分區執行器
這
PartitionHandler
是了解遠端處理或網格環境結構的元件。它能夠向
StepExecution
遠端執行個體發送請求
Step
,以某種特定于結構的格式(如 DTO)進行包裝。它不必知道如何拆分輸入資料或如何聚合多次
Step
執行的結果。一般來說,它可能也不需要了解彈性或故障轉移,因為在許多情況下,這些都是結構的特性。在任何情況下,Spring Batch 始終提供獨立于結構的可重新開機性。失敗的
Job
總是可以重新啟動,隻有失敗
Steps
的才會重新執行。
該
PartitionHandler
接口可以為各種結構類型提供專門的實作,包括簡單的 RMI 遠端處理、EJB 遠端處理、自定義 Web 服務、JMS、Java 空間、共享記憶體網格(如 Terracotta 或 Coherence)和網格執行結構(如 GridGain)。Spring Batch 不包含任何專有網格或遠端結構的實作。
然而,Spring Batch 确實提供了一個有用的實作,它使用Spring 的政策在單獨的執行線程中本地
PartitionHandler
執行
Step
執行個體 。
TaskExecutor
該實作稱為
TaskExecutorPartitionHandler
.
可以在
TaskExecutorPartitionHandler
java 配置中顯式配置,如下例所示:
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", new SimplePartitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(new SimpleAsyncTaskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
gridSize屬性确定要建立的單獨步驟執行的數量,是以它可以與TaskExecutor中線程池的大小相比對。或者,可以将其設定為大于可用的線程數,這使得工作塊更小。
TaskExecutionPartitionHandler對于IO密集型步驟執行個體非常有用,例如複制大量檔案或将檔案系統複制到内容管理系統中。它還可以通過提供作為遠端調用代理的步驟實作(例如使用Spring遠端處理)來用于遠端執行。
- 分區器Partitioner
有一個更簡單的
Partitioner
職責:僅生成執行上下文作為新步驟執行的輸入參數(無需擔心重新啟動)。它隻有一個方法,如下面的接口定義所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的傳回值将每個步驟執行的唯一名稱 (Map中的key) ,值為具體的step的ExecutionContext資訊。這些名稱稍後在批進行中繼資料中顯示為分區中的步驟名稱
StepExecutions
。它
ExecutionContext
隻是一個名稱-值對的包,是以它可能包含一系列主鍵、行号或輸入檔案的位置。然後遠端
Step
通常使用占位符綁定到上下文輸入
#{…}
使用stepScope 中的@Value來擷取
{partition2={start=20}, partition1={start=10}, partition0={start=1}}
4. StepExecutionAggregator聚合器的使用
- StepExecutionAggregator 最後的統計分析聚合邏輯
void aggregate(StepExecution result, Collection<StepExecution> executions);
該接口隻有一個方法。一個方法從入參來看是。接受Collection<StepExecution> executions 的集合。也就是和partitioner的反過程。分而治之的思想,彙總聚合分區step完成manager狀态的更新。
預設實作如下 DefaultStepExecutionAggregator.java
這個aggregate是在什麼時候設定進去的呢 PartitionStep#doExecutepublic void aggregate(StepExecution result, Collection<StepExecution> executions) { Assert.notNull(result, "To aggregate into a result it must be non-null."); if (executions == null) { return; } for (StepExecution stepExecution : executions) { BatchStatus status = stepExecution.getStatus(); result.setStatus(BatchStatus.max(result.getStatus(), status)); result.setExitStatus(result.getExitStatus().and(stepExecution.getExitStatus())); result.setFilterCount(result.getFilterCount() + stepExecution.getFilterCount()); result.setProcessSkipCount(result.getProcessSkipCount() + stepExecution.getProcessSkipCount()); result.setCommitCount(result.getCommitCount() + stepExecution.getCommitCount()); result.setRollbackCount(result.getRollbackCount() + stepExecution.getRollbackCount()); result.setReadCount(result.getReadCount() + stepExecution.getReadCount()); result.setReadSkipCount(result.getReadSkipCount() + stepExecution.getReadSkipCount()); result.setWriteCount(result.getWriteCount() + stepExecution.getWriteCount()); result.setWriteSkipCount(result.getWriteSkipCount() + stepExecution.getWriteSkipCount()); } }
protected void doExecute(StepExecution stepExecution) throws Exception { stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); // Wait for task completion and then aggregate the results Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution); stepExecution.upgradeStatus(BatchStatus.COMPLETED); stepExecutionAggregator.aggregate(stepExecution, executions); // If anything failed or had a problem we need to crap out if (stepExecution.getStatus().isUnsuccessful()) { throw new JobExecutionException("Partition handler returned an unsuccessful step"); } }
分區step如何設定aggregate
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", new SimplePartitioner())
.partitionHandler(partitionHandler())
.aggregator(new DefaultStepExecutionAggregator())
.build();
}
5. 整體流程
整體流程就是partitionStep的執行過程
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
// Wait for task completion and then aggregate the results
// 等待任務結束然後聚合結果。
Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
// 不管結果是成功還是失敗,直接更新結果為完成
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
// 調用聚合器,将分區結果聚合
stepExecutionAggregator.aggregate(stepExecution, executions);
// If anything failed or had a problem we need to crap out
// 聚合的結果如果不是成功狀态,那麼直接抛出錯誤。step異常結束,job異常結束
if (stepExecution.getStatus().isUnsuccessful()) {
throw new JobExecutionException("Partition handler returned an unsuccessful step");
}
}
// 狀态為failed 或者 大于FAILED,即 ABANDONED, UNKNOWN都為失敗
public boolean isUnsuccessful() {
return this == FAILED || this.isGreaterThan(FAILED);
}
代碼位置: GitHub - jackssybin/jackssybin_springBatch