本文重點為管道模式的抽象與應用,上述示例僅為個人理解。實際應用中,此案例長於應對各種規則冗雜的業務場景,便於規則編排。 ...
本文記錄Pipeline設計模式在業務流程編排中的應用
前言
Pipeline模式意為管道模式,又稱為流水線模式。旨在通過預先設定好的一系列階段來處理輸入的數據,每個階段的輸出即是下一階段的輸入。
本案例通過定義PipelineProduct(管道產品),PipelineJob(管道任務),PipelineNode(管道節點),完成一整條流水線的組裝,並將“原材料”加工為“商品”。其中管道產品負責承載各個階段的產品信息;管道任務負責不同階段對產品的加工;管道節點約束了管道產品及任務的關係,通過信號量定義了任務的執行方式。
依賴
工具依賴如下
<!-- 工具類大全 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>最新版本</version>
</dependency>
編程示例
1. 管道產品定義
package com.example.demo.pipeline.model;
/**
* 管道產品介面
*
* @param <S> 信號量
* @author
* @date 2023/05/15 11:49
*/
public interface PipelineProduct<S> {
}
2. 管道任務定義
package com.example.demo.pipeline.model;
/**
* 管道任務介面
*
* @param <P> 管道產品
* @author
* @date 2023/05/15 11:52
*/
@FunctionalInterface
public interface PipelineJob<P> {
/**
* 執行任務
*
* @param product 管道產品
* @return {@link P}
*/
P execute(P product);
}
3. 管道節點定義
package com.jd.baoxian.mall.market.service.pipeline.model;
import java.util.function.Predicate;
/**
* 管道節點定義
*
* @param <S> 信號量
* @param <P> 管道產品
* @author
* @date 2023/05/15 11:54
*/
public interface PipelineNode<S, P extends PipelineProduct<S>> {
/**
* 節點組裝,按照上個管道任務傳遞的信號,執行 pipelineJob
*
* @param pipelineJob 管道任務
* @return {@link PipelineNode}<{@link S}, {@link P}>
*/
PipelineNode<S, P> flax(PipelineJob<P> pipelineJob);
/**
* 節點組裝,按照傳遞的信號,判斷當前管道的信號是否相等,執行 pipelineJob
*
* @param signal 信號
* @param pipelineJob 管道任務
* @return {@link PipelineNode}<{@link S}, {@link P}>
*/
PipelineNode<S, P> flax(S signal, PipelineJob<P> pipelineJob);
/**
* 節點組裝,按照傳遞的信號,判斷當前管道的信號是否相等,執行 pipelineJob
*
* @param predicate 信號
* @param pipelineJob 管道任務
* @return {@link PipelineNode}<{@link S}, {@link P}>
*/
PipelineNode<S, P> flax(Predicate<S> predicate, PipelineJob<P> pipelineJob);
/**
* 管道節點-任務執行
*
* @param product 管道產品
* @return {@link P}
*/
P execute(P product);
}
4. 管道產品、任務,節點的實現
4.1 管道產品
package com.example.demo.pipeline.factory;
import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.model.PipelineProduct;
import lombok.*;
/**
* 樣例-管道產品
*
* @author
* @date 2023/05/15 14:04
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DemoPipelineProduct implements PipelineProduct<DemoPipelineProduct.DemoSignalEnum> {
/**
* 信號量
*/
private DemoSignalEnum signal;
/**
* 產品-入參及回參
*/
private DemoProductData productData;
/**
* 異常信息
*/
private Exception exception;
/**
* 流程Id
*/
private String tradeId;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class DemoProductData {
/**
* 待驗證入參
*/
private DemoReq userRequestData;
/**
* 待驗證回參
*/
private DemoResp userResponseData;
}
/**
* 產品-信號量
*
* @author
* @date 2023/05/15 13:54
*/
@Getter
public enum DemoSignalEnum {
/**
*
*/
NORMAL(0, "正常"),
/**
*
*/
CHECK_NOT_PASS(1, "校驗不通過"),
/**
*
*/
BUSINESS_ERROR(2, "業務異常"),
/**
*
*/
LOCK_ERROR(3, "鎖處理異常"),
/**
*
*/
DB_ERROR(4, "事務處理異常"),
;
/**
* 枚舉碼值
*/
private final int code;
/**
* 枚舉描述
*/
private final String desc;
/**
* 構造器
*
* @param code
* @param desc
*/
DemoSignalEnum(int code, String desc) {
this.code = code;
this.desc = desc;
}
}
}
4.2 管道任務(抽象類)
package com.example.demo.pipeline.factory.job;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.json.JSONUtil;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import com.example.demo.pipeline.model.PipelineJob;
import lombok.extern.slf4j.Slf4j;
/**
* 管道任務-抽象層
*
* @author
* @date 2023/05/15 19:48
*/
@Slf4j
public abstract class AbstractDemoJob implements PipelineJob<DemoPipelineProduct> {
/**
* 公共執行邏輯
*
* @param product 產品
* @return
*/
@Override
public DemoPipelineProduct execute(DemoPipelineProduct product) {
DemoPipelineProduct.DemoSignalEnum newSignal;
try {
newSignal = execute(product.getTradeId(), product.getProductData());
} catch (Exception e) {
product.setException(e);
newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR;
}
product.setSignal(newSignal);
defaultLogPrint(product.getTradeId(), product);
return product;
}
/**
* 子類執行邏輯
*
* @param tradeId 流程Id
* @param productData 請求數據
* @return
* @throws Exception 異常
*/
abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception;
/**
* 預設的日誌列印
*/
public void defaultLogPrint(String tradeId, DemoPipelineProduct product) {
if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) {
log.info("流水線任務處理異常:流程Id=【{}】,信號量=【{}】,任務=【{}】,參數=【{}】", tradeId, product.getSignal(),
ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
}
}
}
4.3 管道節點
package com.example.demo.pipeline.factory;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.json.JSONUtil;
import com.example.demo.pipeline.model.PipelineJob;
import com.example.demo.pipeline.model.PipelineNode;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Predicate;
/**
* 審核-管道節點
*
* @author
* @date 2023/05/15 14:32
*/
@Slf4j
public class DemoPipelineNode implements PipelineNode<DemoPipelineProduct.DemoSignalEnum, DemoPipelineProduct> {
/**
* 下一管道節點
*/
private DemoPipelineNode next;
/**
* 當前管道任務
*/
private PipelineJob<DemoPipelineProduct> job;
/**
* 節點組裝,按照上個管道任務傳遞的信號,執行 pipelineJob
*
* @param pipelineJob 管道任務
* @return {@link DemoPipelineNode}
*/
@Override
public DemoPipelineNode flax(PipelineJob<DemoPipelineProduct> pipelineJob) {
return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob);
}
/**
* 節點組裝,按照傳遞的信號,判斷當前管道的信號是否相等,執行 pipelineJob
*
* @param signal 信號
* @param pipelineJob 管道任務
* @return {@link DemoPipelineNode}
*/
@Override
public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob<DemoPipelineProduct> pipelineJob) {
return flax(signal::equals, pipelineJob);
}
/**
* 節點組裝,上個管道過來的信號運行 predicate 後是true的話,執行 pipelineJob
*
* @param predicate
* @param pipelineJob
* @return
*/
@Override
public DemoPipelineNode flax(Predicate<DemoPipelineProduct.DemoSignalEnum> predicate,
PipelineJob<DemoPipelineProduct> pipelineJob) {
this.next = new DemoPipelineNode();
this.job = (job) -> {
if (predicate.test(job.getSignal())) {
return pipelineJob.execute(job);
} else {
return job;
}
};
return next;
}
/**
* 管道節點-任務執行
*
* @param product 管道產品
* @return
*/
@Override
public DemoPipelineProduct execute(DemoPipelineProduct product) {
// 執行當前任務
try {
product = job == null ? product : job.execute(product);
return next == null ? product : next.execute(product);
} catch (Exception e) {
log.error("流水線處理異常:流程Id=【{}】,任務=【{}】,參數=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
return null;
}
}
}
5. 業務實現
通過之前的定義,我們已經可以通過Pipeline完成流水線的搭建,接下來以“審核信息提交”這一業務場景,完成應用。
5.1 定義Api、入參、回參
package com.example.demo.api;
import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.PipelineForManagerSubmit;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 演示-API
*
* @author
* @date 2023/08/06 16:27
*/
@Service
public class DemoManagerApi {
/**
* 管道-審核提交
*/
@Resource
private PipelineForManagerSubmit pipelineForManagerSubmit;
/**
* 審核提交
*
* @param requestData 請求數據
* @return {@link DemoResp}
*/
public DemoResp managerSubmit(DemoReq requestData) {
return pipelineForManagerSubmit.managerSubmitCheck(requestData);
}
}
package com.example.demo.model.request;
/**
* 演示入參
*
* @author
* @date 2023/08/06 16:33
*/
public class DemoReq {
}
package com.example.demo.model.response;
import lombok.Data;
/**
* 演示回參
*
* @author
* @date 2023/08/06 16:33
*/
@Data
public class DemoResp {
/**
* 成功標識
*/
private Boolean success = false;
/**
* 結果信息
*/
private String resultMsg;
/**
* 構造方法
*
* @param message 消息
* @return {@link DemoResp}
*/
public static DemoResp buildRes(String message) {
DemoResp response = new DemoResp();
response.setResultMsg(message);
return response;
}
}
5.2 定義具體任務
假定審核提交的流程需要包含:參數驗證、加鎖、解鎖、事務提交
package com.example.demo.pipeline.factory.job;
import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 加鎖-實現層
*
* @author
* @date 2023/05/17 17:00
*/
@Service
@Slf4j
public class CheckRequestLockJob extends AbstractDemoJob {
/**
* 子類執行邏輯
*
* @param tradeId 流程Id
* @param productData 請求數據
* @return
* @throws Exception 異常
*/
@Override
DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
DemoReq userRequestData = productData.getUserRequestData();
log.info("任務[{}]加鎖,線程號:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
return DemoPipelineProduct.DemoSignalEnum.NORMAL;
}
}
package com.example.demo.pipeline.factory.job;
import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 解鎖-實現層
*
* @author
* @date 2023/05/17 17:00
*/
@Service
@Slf4j
public class CheckRequestUnLockJob extends AbstractDemoJob {
/**
* 子類執行邏輯
*
* @param tradeId 流程Id
* @param productData 請求數據
* @return
* @throws Exception 異常
*/
@Override
DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
DemoReq userRequestData = productData.getUserRequestData();
log.info("任務[{}]解鎖,線程號:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
return DemoPipelineProduct.DemoSignalEnum.NORMAL;
}
}
package com.example.demo.pipeline.factory.job;
import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 審核-參數驗證-實現類
*
* @author
* @date 2023/05/15 19:50
*/
@Slf4j
@Component
public class ManagerCheckParamJob extends AbstractDemoJob {
/**
* 執行基本入參驗證
*
* @param tradeId
* @param productData 請求數據
* @return
*/
@Override
DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) {
/*
* 入參驗證
*/
DemoReq userRequestData = productData.getUserRequestData();
log.info("任務[{}]入參驗證,線程號:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
// 非空驗證
// 有效驗證
// 校驗通過,退出
return DemoPipelineProduct.DemoSignalEnum.NORMAL;
}
}
package com.example.demo.pipeline.factory.job;
import cn.hutool.json.JSONUtil;
import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.DemoPipelineProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 審核-信息提交-業務實現
*
* @author
* @date 2023/05/12 14:36
*/
@Service
@Slf4j
public class ManagerSubmitJob extends AbstractDemoJob {
/**
* 子類執行邏輯
*
* @param tradeId 流程Id
* @param productData 請求數據
* @return
* @throws Exception 異常
*/
@Override
DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
DemoReq userRequestData = productData.getUserRequestData();
try {
/*
* DB操作
*/
log.info("任務[{}]信息提交,線程號:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
productData.setUserResponseData(DemoResp.buildRes("成功"));
} catch (Exception ex) {
log.error("審核-信息提交-DB操作失敗,入參:{}", JSONUtil.toJsonStr(userRequestData), ex);
throw ex;
}
return DemoPipelineProduct.DemoSignalEnum.NORMAL;
}
}
5.3 完成流水線組裝
針對入回參轉換,管道任務執行順序及執行信號量的構建
package com.example.demo.pipeline.factory;
import com.example.demo.model.request.DemoReq;
import com.example.demo.model.response.DemoResp;
import com.example.demo.pipeline.factory.job.CheckRequestLockJob;
import com.example.demo.pipeline.factory.job.CheckRequestUnLockJob;
import com.example.demo.pipeline.factory.job.ManagerCheckParamJob;
import com.example.demo.pipeline.factory.job.ManagerSubmitJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Objects;
import java.util.UUID;
/**
* 管道工廠入口-審核流水線
*
* @author
* @date 2023/05/15 19:52
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PipelineForManagerSubmit {
/**
* 審核-管道節點
*/
private final DemoPipelineNode managerSubmitNode = new DemoPipelineNode();
/**
* 審核-管道任務-提交-防刷鎖-加鎖
*/
private final CheckRequestLockJob checkRequestLockJob;
/**
* 審核-管道任務-提交-防刷鎖-解鎖
*/
private final CheckRequestUnLockJob checkRequestUnLockJob;
/**
* 審核-管道任務-參數驗證
*/
private final ManagerCheckParamJob managerCheckParamJob;
/**
* 審核-管道任務-事務操作
*/
private final ManagerSubmitJob managerSubmitJob;
/**
* 組裝審核的處理鏈
*/
@PostConstruct
private void assembly() {
assemblyManagerSubmit();
}
/**
* 組裝處理鏈
*/
private void assemblyManagerSubmit() {
managerSubmitNode
// 參數驗證及填充
.flax(managerCheckParamJob)
// 防刷鎖
.flax(checkRequestLockJob)
// 事務操作
.flax(managerSubmitJob)
// 鎖釋放
.flax((ignore) -> true, checkRequestUnLockJob);
}
/**
* 審核-提交處理
*
* @param requestData 入參
* @return
*/
public DemoResp managerSubmitCheck(DemoReq requestData) {
DemoPipelineProduct initialProduct = managerSubmitCheckInitial(requestData);
DemoPipelineProduct finalProduct = managerSubmitNode.execute(initialProduct);
if (Objects.isNull(finalProduct) || Objects.nonNull(finalProduct.getException())) {
return DemoResp.buildRes("未知異常");
}
return finalProduct.getProductData().getUserResponseData();
}
/**
* 審核-初始化申請的流水線數據
*
* @param requestData 入參
* @return 初始的流水線數據
*/
private DemoPipelineProduct managerSubmitCheckInitial(DemoReq requestData) {
// 初始化
return DemoPipelineProduct.builder()
.signal(DemoPipelineProduct.DemoSignalEnum.NORMAL)
.tradeId(UUID.randomUUID().toString())
.productData(DemoPipelineProduct.DemoProductData.builder().userRequestData(requestData).build())
.build();
}
}
總結
本文重點為管道模式的抽象與應用,上述示例僅為個人理解。實際應用中,此案例長於應對各種規則冗雜的業務場景,便於規則編排。
待改進點:
-
各個任務其實隱含了執行的先後順序,此項內容可進一步實現;
-
針對最後“流水線組裝”這一步,可通過配置描述的方式,進一步抽象,從而將變動控制在每個“管道任務”的描述上,針對規則項做到“可插拔”式處理。
作者:京東保險 侯亞東
來源:京東雲開發者社區 轉載請註明來源