管道(執行流)模型由Pipeline(管道)/ Valve(閥門)/ Context(上下文) 組成
概念
我們把特定的業務,比如訂單業務中的臨時訂單、訂單送出以及訂單支付等,抽象成一組Pipeline(管道);
拿生成臨時訂單業務來說,執行流程包括:1參數校驗->2業務資料校驗->3業務處理,這裡的三段子流程是嚴格按照順序執行的,我們用Valve(閥門)定義它們,每一個子流程即一個Valve;
在管道模式中,我們要處理的對象是一組業務資料,即概念中的Context(上下文),Context貫穿于整個執行流程。
意義
管道模式是多步流程業務很好的抽象,内部基于單連結清單實作順序執行,具有強順序性;
管道模式對于整體流程的拆分,使得業務的擴充性大大增強,當業務需求發生變化,隻需要确定需要加入/删除的子流程位置即可,就像從單連結清單中增加/删除一個節點。
實作
子產品結構

PipeLineContext 實作管道上下文的概念,内部記錄一個所處閥門在管道中的索引;使用HashMap存儲業務資料,使用者流程進行時的資料傳遞。
public class PipeLineContext {
private PipeLineContext() {
}
@Getter
private int index;
@Getter
private Map context;
public PipeLineContext(int size) {
this.index = 0;
this.context = new HashMap<>(size);
}
public void put(String key, Object value) {
context.put(key, value);
}
public void get(String key) {
context.get(key);
}
@JSONField(serialize = false)
public int getAndIncrement() {
this.index++;
return index;
}
@Override
public String toString() {
return "{\"index\":\"" + index + "\", \"context\":\"" + JSON.toJSONString(context) + "\"}";
}
}
PipeLine 管道接口,包括添加閥門方法以及開啟管道方法
public interface PipeLine {
void addValve(Valve valve);
FlowResult start(PipeLineContext pipeLineContext);
}
Valve 閥門接口,閥門都需實作該接口或者該接口的擴充接口
public interface Valve {
Valve getNext();
void setNext(Valve valve);
FlowResult invoke(PipeLineContext pipeLineContext);
}
NormalPipeLine PipeLine接口通用實作
@Component
public class NormalPipeLine implements PipeLine {
private Valve head = null;
private Valve next = null;
@Override
public void addValve(Valve valve) {
if (head == null) {
head = valve;
valve.setNext(next);
} else {
Valve current = head;
while (current != null) {
if (current.getNext() == next) {
current.setNext(valve);
valve.setNext(next);
break;
}
current = current.getNext();
}
}
}
@Override
public FlowResult start(PipeLineContext pipeLineContext) {
if (pipeLineContext == null) {
return FlowResult.fail("pipeLineContext should be not null!");
}
if (head == null) {
return FlowResult.fail("there's no valve in current pipeLine!");
}
return head.invoke(pipeLineContext);
}
}
NormalValve Valve接口通用實作
public class NormalValve implements Valve {
protected Valve next = null;
@Override
public Valve getNext() {
return next;
}
@Override
public void setNext(Valve valve) {
this.next = valve;
}
@Override
public FlowResult invoke(PipeLineContext pipeLineContext) {
return processContinue(pipeLineContext);
}
protected FlowResult processContinue(PipeLineContext pipeLineContext) {
return next == null ? FlowResult.ok() : getNext().invoke(pipeLineContext);
}
}
Validator 訂單-臨時訂單前置參數校驗
@Slf4j
@Component
public class Validator extends NormalValve {
@Override
public FlowResult invoke(PipeLineContext pipeLineContext) {
pipeLineContext.put("param", "1");
return processContinue(pipeLineContext);
}
}
使用AOP織入閥門,跟蹤執行流
@Slf4j
@Aspect
@Component
public class PipeLineAspect {
@Pointcut(value = "this(com.nooice.order.common.pipeline.Valve) " +
"&& execution(* invoke(com.nooice.order.common.pipeline.model.PipeLineContext)) && args((pipeLineContext))",
argNames = "pipeLineContext")
public void valveInvokeCutOffPoint(PipeLineContext pipeLineContext) {
}
@Before(value = "valveInvokeCutOffPoint(pipeLineContext)", argNames = "point,pipeLineContext")
public void doBefore(JoinPoint point, PipeLineContext pipeLineContext) {
int currentIndex = pipeLineContext.getAndIncrement();
String className = point.getTarget().getClass().getName();
log.info("管道前置通知-{}号閥門({})進入執行, pipeLineContext={}", currentIndex, className, pipeLineContext.toString());
}
}
測試
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class PipelineTest {
@Autowired
private NormalPipeLine normalPipeLine;
@Autowired
private Validator validator;
@Autowired
private OrderPreviewValidator orderPreviewValidator;
@Autowired
private Processor processor;
@Test
public void testUserController() {
// 定義上下文
PipeLineContext pipeLineContext = new PipeLineContext(0);
pipeLineContext.put("index", "0");
// 增加閥門
normalPipeLine.addValve(validator); // 參數校驗閥門
normalPipeLine.addValve(orderPreviewValidator); // 業務校驗閥門
normalPipeLine.addValve(processor); // 業務處理閥門
// 管道執行
FlowResult flowResult = normalPipeLine.start(pipeLineContext);
log.info(JSON.toJSONString(flowResult));
}
}
管道前置通知-1号閥門(com.nooice.order.common.pipeline.validator.Validator)進入執行, pipeLineContext=管道前置通知-2号閥門(com.nooice.order.common.pipeline.validator.OrderPreviewValidator)進入執行, pipeLineContext={"index":"2", "context":"{"index":"0","param":"1"}"}
管道前置通知-3号閥門(com.nooice.order.common.pipeline.processor.Processor)進入執行, pipeLineContext={"index":"3", "context":"{"index":"0","param":"2"}"}
PipelineTest: {"code":1,"message":"成功"}