天天看點

Activiti 5.3:流程活動自動與手工觸發執行

Activiti 5.3支援流程活動自動執行與手工觸發執行。其中,自動執行是指,在啟動流程之前,準備流程所需要的控制流程進度的變量資料,啟動流程之後,無需外部幹預,就能夠按照預定義的流程執行;手工觸發執行是指,執行到流程中某個個結點後流程暫時停止運作,直到收到外部發送的信号以後,才會繼續向前推進,這樣情況可以更加精細地控制流程。

下面主要通過基于Activiti 5.3的<parallelGateway>、<serviceTask>、<receiveTask>、<userTask>元素來看一下。首先,我們在測試的過程中,用到JUnit 3.x,為了友善,這裡給了一層封裝,代碼如下所示:

package org.shirdrn.workflow.activiti;

import junit.framework.TestCase;

import org.activiti.engine.FormService;

import org.activiti.engine.HistoryService;

import org.activiti.engine.IdentityService;

import org.activiti.engine.ManagementService;

import org.activiti.engine.ProcessEngine;

import org.activiti.engine.ProcessEngines;

import org.activiti.engine.RepositoryService;

import org.activiti.engine.RuntimeService;

import org.activiti.engine.TaskService;

/**

* @author shirdrn

*/

public abstract class AbstractTest extends TestCase {

private ProcessEngine processEngine;

protected String deploymentId;

protected RepositoryService repositoryService;

protected RuntimeService runtimeService;

protected TaskService taskService;

protected FormService formService;

protected HistoryService historyService;

protected IdentityService identityService;

protected ManagementService managementService;

@Override

protected void setUp() throws Exception {

super.setUp();

if(processEngine==null) {

processEngine = ProcessEngines.getDefaultProcessEngine();

}

repositoryService = processEngine.getRepositoryService();

runtimeService = processEngine.getRuntimeService();

taskService = processEngine.getTaskService();

formService = processEngine.getFormService();

historyService = processEngine.getHistoryService();

identityService = processEngine.getIdentityService();

managementService = processEngine.getManagementService();

initialize();

}

@Override

protected void tearDown() throws Exception {

super.tearDown();

destroy();

}

protected abstract void initialize() throws Exception;

protected abstract void destroy() throws Exception;

}

這裡面,主要是在測試之前做一些初始化工作,主要包括流程引擎執行個體的建構,及其流程提供的基本服務。下面測試會用到該抽象類。

自動執行

<serviceTask>元素,可以實作自動活動,文法如下所示:

<serviceTask id="serviceTaskId" name="serviceTaskName"

activiti:class="org.shirdrn.workflow.activiti.gateway.ServiceTaskClass"/>

其中,activiti:class屬性為該結點對應的處理類,該類要求實作org.activiti.engine.delegate.JavaDelegate接口,該接口定義如下所示:

package org.activiti.engine.delegate;

public interface JavaDelegate {

void execute(DelegateExecution execution) throws Exception;

}

execute方法的參數DelegateExecution execution可以在流程中各個結點之間傳遞流程變量。

下面給出一個具體的例子:

自動執行的流程,如圖所示:

Activiti 5.3:流程活動自動與手工觸發執行

對應的流程定義檔案為GatewayTest.testAutomaticForkJoin.bpmn20.xml,如下所示:

<?xml version="1.0" encoding="UTF-8"?>

<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">

<process id="AutomaticParalellBasedForkJoin" name="AutomaticParalellBasedForkJoin">

<startEvent id="startevent7" name="Start"></startEvent>

<parallelGateway id="parallelgateway12" name="Fork"></parallelGateway>

<serviceTask id="servicetask3" name="Task 1" activiti:class="org.shirdrn.workflow.activiti.gateway.ServiceTask1"></serviceTask>

<serviceTask id="servicetask4" name="Task 2" activiti:class="org.shirdrn.workflow.activiti.gateway.ServiceTask2"></serviceTask>

<serviceTask id="servicetask5" name="Task 3" activiti:class="org.shirdrn.workflow.activiti.gateway.ServiceTask3"></serviceTask>

<parallelGateway id="parallelgateway13" name="First Join"></parallelGateway>

<serviceTask id="servicetask6" name="Task 4" activiti:class="org.shirdrn.workflow.activiti.gateway.ServiceTask4"></serviceTask>

<parallelGateway id="parallelgateway14"></parallelGateway>

<endEvent id="endevent7" name="End"></endEvent>

<sequenceFlow id="flow45" name="" sourceRef="startevent7" targetRef="parallelgateway12"></sequenceFlow>

<sequenceFlow id="flow46" name="" sourceRef="parallelgateway12" targetRef="servicetask3"></sequenceFlow>

<sequenceFlow id="flow47" name="" sourceRef="parallelgateway12" targetRef="servicetask4"></sequenceFlow>

<sequenceFlow id="flow48" name="" sourceRef="parallelgateway12" targetRef="servicetask5"></sequenceFlow>

<sequenceFlow id="flow49" name="" sourceRef="servicetask3" targetRef="parallelgateway13"></sequenceFlow>

<sequenceFlow id="flow50" name="" sourceRef="servicetask4" targetRef="parallelgateway13"></sequenceFlow>

<sequenceFlow id="flow51" name="" sourceRef="servicetask5" targetRef="parallelgateway14"></sequenceFlow>

<sequenceFlow id="flow52" name="" sourceRef="parallelgateway13" targetRef="servicetask6"></sequenceFlow>

<sequenceFlow id="flow53" name="" sourceRef="servicetask6" targetRef="parallelgateway14"></sequenceFlow>

<sequenceFlow id="flow54" name="" sourceRef="parallelgateway14" targetRef="endevent7"></sequenceFlow>

</process>

</definitions>

上述流程定義中,一共定義了4個ServiceTask,模拟實作代碼如下所示:

 package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

import org.activiti.engine.delegate.DelegateExecution;

import org.activiti.engine.delegate.JavaDelegate;

public class ServiceTask1 implements JavaDelegate {

private final Logger log = Logger.getLogger(ServiceTask1.class.getName());

@Override

public void execute(DelegateExecution execution) throws Exception {

Thread.sleep(10000);

log.info("variavles=" + execution.getVariables());

execution.setVariable("task1", "I am task 1");

log.info("I am task 1.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class ServiceTask2 implements JavaDelegate {

private final Logger log = Logger.getLogger(ServiceTask2.class.getName());

@Override

public void execute(DelegateExecution execution) throws Exception {

Thread.sleep(10000);

log.info("variavles=" + execution.getVariables());

execution.setVariable("task2", "I am task 2");

log.info("I am task 2.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class ServiceTask3 implements JavaDelegate {

private final Logger log = Logger.getLogger(ServiceTask3.class.getName());

@Override

public void execute(DelegateExecution execution) throws Exception {

Thread.sleep(10000);

log.info("variavles=" + execution.getVariables());

execution.setVariable("task3", "I am task 3");

log.info("I am task 3.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class ServiceTask4 implements JavaDelegate {

private final Logger log = Logger.getLogger(ServiceTask4.class.getName());

@Override

public void execute(DelegateExecution execution) throws Exception {

Thread.sleep(10000);

log.info("variavles=" + execution.getVariables());

execution.setVariable("task4", "I am task 4");

log.info("I am task 4.");

}

}

測試代碼,如下所示:

package org.shirdrn.workflow.activiti.gateway;

import org.activiti.engine.runtime.ProcessInstance;

import org.activiti.engine.test.Deployment;

import org.shirdrn.workflow.activiti.AbstractTest;

/**

* @author shirdrn

*/

public class AutomaticParallelGatewayTest extends AbstractTest {

private String deploymentId;

@Override

protected void initialize() throws Exception {

deploymentId = repositoryService.createDeployment()

.addClasspathResource("diagrams/GatewayTest.testAutomaticForkJoin.bpmn20.xml")

.deploy().getId();

}

@Override

protected void destroy() throws Exception {

repositoryService.deleteDeployment(deploymentId, true);

}

@Deployment

public void testForkJoin() {

ProcessInstance pi = runtimeService.startProcessInstanceByKey("AutomaticParalellBasedForkJoin");

assertEquals(true, pi.isEnded());

}

}

隻需要啟動一個流程執行個體,它會自動執行到結束。這種情況下,你不需要關注流程的執行進度,而隻需要把精力集中在每個結點的處理邏輯(通常是簡單或者複雜的商業邏輯)上,運作結果如下所示:

2011-3-23 11:50:12 org.shirdrn.workflow.activiti.gateway.ServiceTask1 execute

資訊: variavles={}

2011-3-23 11:50:12 org.shirdrn.workflow.activiti.gateway.ServiceTask1 execute

資訊: I am task 1.

2011-3-23 11:50:22 org.shirdrn.workflow.activiti.gateway.ServiceTask2 execute

資訊: variavles={task1=I am task 1}

2011-3-23 11:50:22 org.shirdrn.workflow.activiti.gateway.ServiceTask2 execute

資訊: I am task 2.

2011-3-23 11:50:32 org.shirdrn.workflow.activiti.gateway.ServiceTask4 execute

資訊: variavles={task1=I am task 1, task2=I am task 2}

2011-3-23 11:50:32 org.shirdrn.workflow.activiti.gateway.ServiceTask4 execute

資訊: I am task 4.

2011-3-23 11:50:42 org.shirdrn.workflow.activiti.gateway.ServiceTask3 execute

資訊: variavles={task1=I am task 1, task2=I am task 2, task4=I am task 4}

2011-3-23 11:50:42 org.shirdrn.workflow.activiti.gateway.ServiceTask3 execute

資訊: I am task 3.  

手工觸發執行

通過<receiveTask>和<userTask>元素都可以實作流程的手工觸發執行。

基于<receiveTask>

實作的流程,如圖所示:

Activiti 5.3:流程活動自動與手工觸發執行

對應的流程定義檔案Task.ReceiveTask.bpmn20.xml,如下所示:

<?xml version="1.0" encoding="UTF-8"?>

<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">

<process id="MyReceiveTask" name="MyReceiveTask">

<startEvent id="startevent4" name="Start"></startEvent>

<receiveTask id="receivetask1" name="Check bank">

<extensionElements>

<activiti:executionListener event="start" class="org.shirdrn.workflow.activiti.task.CheckBankReceiveTask"></activiti:executionListener>

</extensionElements>

</receiveTask>

<receiveTask id="receivetask2" name="Check merchant">

<extensionElements>

<activiti:executionListener event="start" class="org.shirdrn.workflow.activiti.task.CheckMerchantReceiveTask"></activiti:executionListener>

</extensionElements>

</receiveTask>

<endEvent id="endevent5" name="End"></endEvent>

<sequenceFlow id="flow16" name="" sourceRef="startevent4" targetRef="receivetask1"></sequenceFlow>

<sequenceFlow id="flow17" name="" sourceRef="receivetask1" targetRef="receivetask2"></sequenceFlow>

<sequenceFlow id="flow18" name="" sourceRef="receivetask2" targetRef="endevent5"></sequenceFlow>

</process>

</definitions>

上述流程定義中,對應的兩個處理類,代碼分别如下所示:

package org.shirdrn.workflow.activiti.task;

import java.util.HashMap;

import java.util.logging.Logger;

import org.activiti.engine.delegate.DelegateExecution;

import org.activiti.engine.delegate.JavaDelegate;

public class CheckBankReceiveTask implements JavaDelegate {

private final Logger log = Logger.getLogger(CheckBankReceiveTask.class.getName());

@SuppressWarnings("unchecked")

@Override

public void execute(DelegateExecution execution) throws Exception {

log.info("i am CheckBankReceiveTask.");

System.out.println("in : " + execution.getVariables());

((HashMap<String, Object>)execution.getVariables().get("in")).put("next", "CheckBankTask");

((HashMap<String, Object>)execution.getVariables().get("out")).put("reponse", "subprocess:CheckBankReceiveTask->CheckMerchantReceiveTask");

}

}

package org.shirdrn.workflow.activiti.task;

import java.util.HashMap;

import java.util.logging.Logger;

import org.activiti.engine.delegate.DelegateExecution;

import org.activiti.engine.delegate.JavaDelegate;

public class CheckMerchantReceiveTask implements JavaDelegate {

private final Logger log = Logger.getLogger(CheckMerchantReceiveTask.class.getName());

@SuppressWarnings("unchecked")

@Override

public void execute(DelegateExecution execution) throws Exception {

log.info("i am CheckMerchantReceiveTask.");

System.out.println("in : " + execution.getVariables());

((HashMap<String, Object>)execution.getVariables().get("in")).put("previous", "CheckMerchantReceiveTask");

}

上面還用到一個org.shirdrn.workflow.activiti.subprocess.Merchant類,該類必須支援序列化,如下所示: 

package org.shirdrn.workflow.activiti.subprocess;

import java.io.Serializable;

public class Merchant implements Serializable {

private static final long serialVersionUID = 1L;

public Merchant(String merchantId, int priority, short serviceType, short status) {

super();

this.merchantId = merchantId;

this.priority = priority;

this.serviceType = serviceType;

this.status = status;

}

public Merchant(String merchantId) {

this(merchantId, -1, (short)0, (short)0);

}

private String merchantId;

private int priority = -1;

private short serviceType = 0;

private short status = 0;

public String getMerchantId() {

return merchantId;

}

public void setMerchantId(String merchantId) {

this.merchantId = merchantId;

}

public int getPriority() {

return priority;

}

public void setPriority(int priority) {

this.priority = priority;

}

public short getServiceType() {

return serviceType;

}

public void setServiceType(short serviceType) {

this.serviceType = serviceType;

}

public short getStatus() {

return status;

}

public void setStatus(short status) {

this.status = status;

}

@Override

public String toString() {

return "Merchant[" + merchantId + "]";

}

}

測試用例,代碼如下所示:

 package org.shirdrn.workflow.activiti.task;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.activiti.engine.repository.Deployment;

import org.activiti.engine.runtime.Execution;

import org.activiti.engine.runtime.ProcessInstance;

import org.shirdrn.workflow.activiti.AbstractTest;

import org.shirdrn.workflow.activiti.subprocess.Merchant;

/**

* @author shirdrn

*/

public class MyReceiveTaskTest extends AbstractTest {

@Override

protected void initialize() throws Exception {

Deployment deployment = repositoryService

.createDeployment()

.addClasspathResource(

"diagrams/Task.ReceiveTask.bpmn20.xml")

.deploy();

deploymentId = deployment.getId();

}

@Override

protected void destroy() throws Exception {

repositoryService.deleteDeployment(deploymentId, true);

}

public void testSubProcess() {

// prepare data packet

Map<String, Object> variables = new HashMap<String, Object>();

Map<String, Object> subVariables = new HashMap<String, Object>();

variables.put("maxTransCount", 1000000);

variables.put("merchant", new Merchant("ICBC"));

variables.put("protocol", "UM32");

variables.put("repository", "10.10.38.99:/home/shirdrn/repository");

variables.put("in", subVariables);

variables.put("out", new HashMap<String, Object>());

// start process instance

ProcessInstance pi = runtimeService.startProcessInstanceByKey("MyReceiveTask", variables);

List<Execution> executions = runtimeService.createExecutionQuery().list();

assertEquals(1, executions.size());

Execution execution = runtimeService.createExecutionQuery().singleResult();

runtimeService.setVariable(execution.getId(), "type", "receiveTask");

runtimeService.signal(execution.getId());

assertEquals(1, executions.size());

execution = runtimeService.createExecutionQuery().list().get(0);

assertNotNull(execution);

runtimeService.setVariable(execution.getId(), "oper", "shirdrn");

runtimeService.signal(execution.getId());

}

}

運作結果如下所示:

2011-3-23 12:51:35 org.shirdrn.workflow.activiti.task.CheckBankReceiveTask execute

資訊: i am CheckBankReceiveTask.

in : {protocol=UM32, repository=10.10.38.99:/home/shirdrn/repository, merchant=Merchant[ICBC], maxTransCount=1000000, in={}, out={}}

2011-3-23 12:51:35 org.shirdrn.workflow.activiti.task.CheckMerchantReceiveTask execute

資訊: i am CheckMerchantReceiveTask.

in : {protocol=UM32, repository=10.10.38.99:/home/shirdrn/repository, merchant=Merchant[ICBC], maxTransCount=1000000, type=receiveTask, in={}, out={}}

基于<userTask>

實作的流程,如圖所示:

Activiti 5.3:流程活動自動與手工觸發執行

對應的流程定義檔案,如下所示:

<?xml version="1.0" encoding="UTF-8"?>

<definitions id="definitions"

xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:activiti="http://activiti.org/bpmn"

targetNamespace="Umpay">

<process id="ParalellBasedForkJoin">

<startEvent id="theStart" />

<sequenceFlow id="flow1" sourceRef="theStart" targetRef="fork" />

<parallelGateway id="fork" name="Fork" />

<sequenceFlow sourceRef="fork" targetRef="task1" />

<sequenceFlow sourceRef="fork" targetRef="task2" />

<sequenceFlow sourceRef="fork" targetRef="task3" />

<userTask id="task1" name="Task 1">

<extensionElements>

<activiti:taskListener event="complete"

class="org.shirdrn.workflow.activiti.gateway.Task1Listener" />

</extensionElements>

</userTask>

<sequenceFlow sourceRef="task1" targetRef="firstJoin" />

<userTask id="task2" name="Task 2">

<extensionElements>

<activiti:taskListener event="complete"

class="org.shirdrn.workflow.activiti.gateway.Task2Listener" />

</extensionElements>

</userTask>

<sequenceFlow sourceRef="task2" targetRef="firstJoin" />

<userTask id="task3" name="Task 3">

<extensionElements>

<activiti:taskListener event="complete"

class="org.shirdrn.workflow.activiti.gateway.Task3Listener" />

</extensionElements>

</userTask>

<sequenceFlow sourceRef="task3" targetRef="secondJoin" />

<parallelGateway id="firstJoin" name="First Join" />

<sequenceFlow sourceRef="firstJoin" targetRef="task4" />

<userTask id="task4" name="Task 4">

<extensionElements>

<activiti:taskListener event="complete"

class="org.shirdrn.workflow.activiti.gateway.Task4Listener" />

</extensionElements>

</userTask>

<sequenceFlow sourceRef="task4" targetRef="secondJoin" />

<parallelGateway id="secondJoin" />

<sequenceFlow sourceRef="secondJoin" targetRef="theEnd" />

<endEvent id="theEnd" />

</process>

</definitions>  

我們看一下上述定義中,如下配置片段:

<userTask id="task1" name="Task 1">

<extensionElements>

<activiti:taskListener event="complete"

class="org.shirdrn.workflow.activiti.gateway.Task1Listener" />

</extensionElements>

</userTask>

<activiti:taskListener>元素的event屬性,它一共包含三種事件:"create"、"assignment"、"complete",分别表示結點執行處理邏輯的時機為:在處理類執行個體化時、在結點處理邏輯被指派時、在結點處理邏輯執行完成時,可以根據自己的需要進行指定。

上述流程定義中,4個任務結點對應的處理類,代碼分别如下所示:

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

import org.activiti.engine.delegate.DelegateTask;

import org.activiti.engine.impl.pvm.delegate.TaskListener;

public class Task1Listener implements TaskListener {

private final Logger log = Logger.getLogger(Task1Listener.class.getName());

@Override

public void notify(DelegateTask delegateTask) {

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

log.info("I am task 1.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class Task2Listener implements TaskListener {

private final Logger log = Logger.getLogger(Task2Listener.class.getName());

@Override

public void notify(DelegateTask delegateTask) {

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

log.info("I am task 2.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class Task3Listener implements TaskListener {

private final Logger log = Logger.getLogger(Task3Listener.class.getName());

@Override

public void notify(DelegateTask delegateTask) {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

log.info("I am task 3.");

}

}

package org.shirdrn.workflow.activiti.gateway;

import java.util.logging.Logger;

public class Task4Listener implements TaskListener {

private final Logger log = Logger.getLogger(Task4Listener.class.getName());

@Override

public void notify(DelegateTask delegateTask) {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

log.info("I am task 4.");

}

}

測試用例,代碼如下所示:

package org.shirdrn.workflow.activiti.gateway;

import java.util.Date;

import java.util.List;

import org.activiti.engine.runtime.ProcessInstance;

import org.activiti.engine.task.Task;

import org.activiti.engine.task.TaskQuery;

import org.activiti.engine.test.Deployment;

import org.shirdrn.workflow.activiti.AbstractTest;

/**

* @author shirdrn

*/

public class ParallelGatewayTest extends AbstractTest {

private String deploymentId;

private Date start = null;

private Date end = null;

@Override

protected void initialize() throws Exception {

deploymentId = repositoryService.createDeployment()

.addClasspathResource("diagrams/GatewayTest.testForkJoin.bpmn20.xml")

.deploy().getId();

}

@Override

protected void destroy() throws Exception {

repositoryService.deleteDeployment(deploymentId, true);

}

@Deployment

public void testUnbalancedForkJoin() {

ProcessInstance pi = runtimeService.startProcessInstanceByKey("ParalellBasedForkJoin");

TaskQuery query = taskService.createTaskQuery().processInstanceId(pi.getId()).orderByTaskName().asc();

List<Task> tasks = query.list();

assertEquals(3, tasks.size());

start = new Date();

for(Task task : tasks) {

taskService.complete(task.getId());

end = new Date();

System.out.println("" + (end.getTime()-start.getTime()) + "ms.");

}

tasks = query.list();

assertEquals(1, tasks.size());

for(Task task : tasks) {

taskService.complete(task.getId());

end = new Date();

System.out.println("" + (end.getTime()-start.getTime()) + "ms.");

}

end = new Date();

System.out.println("" + (end.getTime()-start.getTime()) + "ms.");

}

}

運作結果如下所示:

2011-3-23 12:50:09 org.shirdrn.workflow.activiti.gateway.Task1Listener notify

資訊: I am task 1.

10031ms.

2011-3-23 12:50:19 org.shirdrn.workflow.activiti.gateway.Task2Listener notify

資訊: I am task 2.

20078ms.

2011-3-23 12:50:24 org.shirdrn.workflow.activiti.gateway.Task3Listener notify

資訊: I am task 3.

25093ms.

2011-3-23 12:50:29 org.shirdrn.workflow.activiti.gateway.Task4Listener notify

資訊: I am task 4.

30172ms.

30172ms.

繼續閱讀