TIP
流程编排可以让系统提供的服务能力做到可视化。业务逻辑一目了然,支持普通流程图和泳道图
普通流程图:
泳道图:
# 2.1 节点多支路
场景假设:
在上传商品图片时,一般会经过风控系统,对所传图片进行审查,以此防止给用户展示了一些违规图片,所带来的不良社会反映。这里暂且忽略性能问题,将风控审查动作做在了商品获取链路中
BPMN图示如下:
🟢 加载商品基本信息后,如果有图片则进行图片审查,否则结束流程
新增“图片筛查”服务节点:
@Slf4j
@TaskComponent(name = "risk-control")
public class RiskControlService {
@TaskService(name = "check-img")
public void checkImg(CheckInfo checkInfo) {
AssertUtil.notNull(checkInfo);
AssertUtil.notBlank(checkInfo.getImg());
log.info("check img: " + checkInfo.getImg());
}
}
2
3
4
5
6
7
8
9
10
11
12
🟢 节点间的箭头线可以定义执行条件,格式如上图,res.img != null
代表 StoryBus 中的 res 不为空,且 res 的 img 字段不为null
🟢 Kstry 引擎中条件表达式解析器有三个,boolean解析器、角色鉴权解析器、Spel表达式解析器
🔷 如果是直接输入boolean值,比如 true、y、no等会被认定为 boolean 值,使用 boolean 解析器解析判断
🔷 如果符合权限定义的格式,使用角色鉴权解析器解析判断,后面讲到角色权限时会再详细介绍
🔷 前两者都不符合时则使用Spel表达式解析器,解析引擎是 Spring 的 Spel 解析器,表达式格式解析失败时会报错。解析结果一定得是Boolean值。比如上面 res.img != null
如果 res为 null 时,会抛异常结束
🟢 事件(Event)、网关(Gateway)、任务(Task)节点都可以从当前节点引出多个支路(也叫允许有多个出度),但是只有并行网关、包含网关、结束事件可以接收多个入度,其他节点有多个入度时会出现配置文件解析失败的报错
🟢 一个链路图中有且仅有一个开始事件和一个结束事件(子事件中同样有这个限制,外围事件和子流程中的事件是可以共同存在的)
🟢 任务(Task)、事件(Event)节点后面的出度如果没有定义表达式时,默认为true。不同类型的网关(Gateway)节点特点不同,后面介绍
🟢 条件表达式支持o{数字}: 表达式
(比如:o1: sta.name != null
),来定义多支路时的执行顺序,数字越小优先级越高。未指定顺序时表达式默认为最低优先级
# 2.2 并行网关
场景假设:
加载商品基础信息之后,假设需要再加载SKU信息、店铺信息。两个加载过程没有前后依赖关系,所以可以并行进行。加载完所有信息之后再对商详信息进行后置处理
BPMN图示如下:
新增“加载SKU信息”、“加载店铺信息”、“商详后置处理”三个服务节点:
// 初始化 sku信息,GoodsService.java
@TaskService(name = "init-sku")
public InitSkuResponse initSku(@ReqTaskParam("id") Long goodsId) {
SkuInfo sku1 = new SkuInfo();
sku1.set...
SkuInfo sku2 = new SkuInfo();
sku2.set...
return InitSkuResponse.builder().skuInfos(Lists.newArrayList(sku1, sku2)).build();
}
// 商详信息后置处理,GoodsService.java
@TaskService(name = "detail-post-process")
public void detailPostProcess(DetailPostProcessRequest request) {
GoodsDetail goodsDetail = request.getGoodsDetail();
ShopInfo shopInfo = request.getShopInfo();
if (shopInfo != null) {
goodsDetail.setShopInfo(shopInfo);
}
}
// 加载店铺信息,ShopService.java
@TaskService(name = "get-shopInfo-goodsId")
public ShopInfo getShopInfoByGoodsId(@ReqTaskParam("id") Long goodsId) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(200L);
return goodsIdShopInfoMapping.get(goodsId);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
getShopInfoByGoodsId
中线程 sleep 了 200ms 模拟耗时较长的任务
🟢 并行网关要求所有入度全部执行完才能向下继续,否则将一直等待
🟢 使用并行网关时,一般会有前后两个并行网关节点一起出现,前面将一个分支拆解成多个,后面将多个分支进行聚合
🟢 并行网关支持开启异步流程。未开启异步流程时,并行网关拆分出的多个分支还是一个线程逐一执行,开启异步流程后,每个分支都将逐一创建异步任务并提交到线程池中执行
🟢 并行网关后面的出度如果有表达式,表达式会被忽略,无论设置与否都不会解析,都会默认为true
将风控组件加到流程之后,得到流程图如下:
🟢 此时如果再次执行这个 Story 会报错,报错信息: [K1040008] A process branch that cannot reach the ParallelGateway appears! sequenceFlowId: Flow_0attv25
🟢 报错信息提示链路中存在不能到达并行网关的分支。原因是商品图片只有出现和不出现两种情况,所以“初始化基本信息”服务节点后面的两条链路只能执行一条,而并行网关要求的是所有入度分支都完成时才能继续执行。解决这个问题有两种方式:
🔷 将前一个并行网关改为包含网关,包含网关不要求所有入度分支都必须被执行
🔷 如下图,关闭并行网关的严格模式:strict-mode=false
。关闭严格模式的并行网关,不再限制网关入度必须都被执行。关闭严格模式的并行网关与包含网关也并非是完全等价的。因为并行网关后面出度的条件表达式是被忽略的,但是包含网关后面出度的条件表达式是会被解析执行起到决策作用的
# 2.3 排他网关
场景假设:
为了推广公司app,产品承诺会对app端下单用户免费赠送运费险,其他平台没有此优惠
BPMN图示如下:
新增“送运费险”服务节点:
@Slf4j
@TaskComponent(name = "logistic")
public class LogisticService {
@NoticeSta
@TaskService(name = "get-logistic-insurance")
public LogisticInsurance getLogisticInsurance(GetLogisticInsuranceRequest request) {
log.info("request source:{}", request.getSource());
LogisticInsurance logisticInsurance = new LogisticInsurance();
logisticInsurance.setDesc("运费险描述");
logisticInsurance.setType(1);
return logisticInsurance;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
🟢 图示中为什么要多加一个包含网关呢?是因为之前有提到过:只有并行网关、包含网关、结束事件可以接收归并多个入度,其他节点只能接收一个入度,“送运费险”节点不能直接到“加载店铺信息”节点,因为后者已经有了一个入度,所以增加一个包含网关将多个分支进行合拢
🟢 排他网关入度只能有一个,出度可以多个。出度上面的条件表达式会被解析执行,如果没有条件表达式时会默认是true
🟢 排他网关有多个出度上面表达式被解析成true时,会选择第一个为true的分支继续向下执行,其他的将会被忽略不再执行。图示上出度的前后并不代表程序解析时出度的先后顺序,所以排他网关后面如果多个出度都为true时运行结果是不确定的,应尽量避免这种事情发生
🟢 当全部出度上的表达式都解析为false时会抛出异常并结束流程,异常信息:[K1040008] Match to the next process node as empty! taskId: Gateway_15malyv
🟢 由于排他网关最终执行的只有一条链路,所以排他网关是不支持开启异步的,因为没啥意义
# 2.4 包含网关
场景假设:
假设商品描述中有一些统计信息,比如收藏数、评价数、下单数等,不同的数据统计在不同的系统模块中维护,在商品加载时这些统计参数也需要被加载。但也并非是所有商品都需要加载全部的统计参数,比如未开启评价的商品就不需要获取评价数
BPMN图示如下:
加载商品基础信息时,加上可以评价的属性:
@NoticeResult
@TaskService(name = "init-base-info")
public GoodsDetail initBaseInfo(@ReqTaskParam(reqSelf = true) GoodsDetailRequest request) {
// needEvaluate(true)
return GoodsDetail.builder().id(request.getId()).name("商品").img("https://xxx.png").needEvaluate(true).build();
}
2
3
4
5
6
新增订单信息获取、评价信息获取、商品扩展信息获取服务节点:
@TaskComponent(name = "order")
public class OrderService {
@NoticeSta
@TaskService(name = "get-order-info")
public OrderInfo getOrderInfo(@ReqTaskParam("id") Long goodsId) {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderedCount(10);
log.info("goods id: {}, get OrderInfo: {}", goodsId, JSON.toJSONString(orderInfo));
return orderInfo;
}
}
@TaskComponent(name = "evaluation")
public class EvaluationService {
@NoticeSta
@TaskService(name = "get-evaluation-info")
public EvaluationInfo getEvaluationInfo(@ReqTaskParam("id") Long goodsId) {
EvaluationInfo evaluationInfo = new EvaluationInfo();
evaluationInfo.setEvaluateCount(20);
log.info("goods id: {}, get EvaluationInfo: {}", goodsId, JSON.toJSONString(evaluationInfo));
return evaluationInfo;
}
}
@NoticeSta
@TaskService(name = "get-goods-ext-info")
public GoodsExtInfo getGoodsExtInfo(@ReqTaskParam("id") Long goodsId) {
GoodsExtInfo goodsExtInfo = new GoodsExtInfo();
goodsExtInfo.setCollectCount(30);
log.info("goods id: {}, get GoodsExtInfo: {}", goodsId, JSON.toJSONString(goodsExtInfo));
return goodsExtInfo;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
最终商品信息后置处理时将统计信息汇总:
@TaskService(name = GoodsCompKey.detailPostProcess)
public void detailPostProcess(DetailPostProcessRequest request) {
GoodsDetail goodsDetail = request.getGoodsDetail();
...
GoodsExtInfo goodsExtInfo = request.getGoodsExtInfo();
OrderInfo orderInfo = request.getOrderInfo();
EvaluationInfo evaluationInfo = request.getEvaluationInfo();
goodsDetail.setStatistics(Lists.newArrayList(goodsExtInfo.getCollectCount(), orderInfo.getOrderedCount(), evaluationInfo.getEvaluateCount()));
}
2
3
4
5
6
7
8
9
🟢 包含网关与并行网关一样,支持开启异步流程,支持接收多个入度
🟢 包含网关没有所有入度必须被执行的限制,等待全部入度执行完成或者得知其中可能有部分入度不满足条件不再执行后,会继续向下执行
🟢 包含网关后面出度可以设置条件表达式,表达式解析规则与排他网关出度解析规则相同
# 2.5 子流程
# 2.5.1 子流程使用
场景假设:
上面可以看到,数据统计不仅仅在商详展示时会用到,商品列表可能会用到,订单展示也可能会用到,所以统计逻辑是一个可以复用的模块,可以将其抽离,以便之后进行单独维护升级。这样也使原来比较复杂的流程图得到了简化,理解起来会更加方便
BPMN图示如下:
BPMN配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL">
<bpmn:process id="Process_0zcsieh" isExecutable="true">
<!-- 定义子流程 -->
<bpmn:subProcess id="Activity_0melq36" name="商品数据统计">
<bpmn:startEvent id="Event_1h3q9xl">
<bpmn:outgoing>Flow_0hyd05a</bpmn:outgoing>
</bpmn:startEvent>
...
<bpmn:endEvent id="Event_10avvlg">
<bpmn:incoming>Flow_1ylaoip</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1ylaoip" sourceRef="Gateway_0ay4c64" targetRef="Event_10avvlg" />
</bpmn:subProcess>
<!-- 引用子流程 -->
<bpmn:callActivity id="Activity_1w3fhiy" name="商品数据统计" calledElement="Activity_0melq36">
<bpmn:incoming>Flow_0hbi1mg</bpmn:incoming>
<bpmn:outgoing>Flow_0lmq6ak</bpmn:outgoing>
</bpmn:callActivity>
</bpmn:process>
</bpmn:definitions>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
🟢 定义bpmn:process
将需要分离的子流程包含在内
🟢 子流程有独立于父流程之外的开始事件、结束事件
🟢 子流程是支持嵌套的,A子流程可以依赖B子流程,但是自身依赖自身是非法的。并且子流程中也支持开启异步模式
🟢 定义bpmn:callActivity
引用子流程,程序运行到此时会跳转至子流程执行,子流程执行完成后会跳转回来继续执行
# 2.5.2 子流程拦截器
场景假设:
业务动作需要被复用、因为父流程太过复杂一些业务动作可以被抽离单独维护、第三方提供的一些服务能力等,都需要定义子流程来进行业务间隔离。被划分出来的子流程可以被看作一个整体被单独维护。框架提供了定义子流程拦截器的能力,可以在子流程执行前、执行后、出现异常(链路中出现异常或者超时)时自定义操作。除此之外,还可以定义无论是否出现异常都一定会执行的最终通知
@Component
public class StatisticsInterceptor implements SubProcessInterceptor {
@Override
public Set<String> pointcut() {
return Sets.newHashSet("Event_1h3q9xl", "Event_08xh60r");
}
@Override
public Set<SubProcessIdentity> getSubProcessIdentity() {
return SubProcessInterceptor.super.getSubProcessIdentity();
}
@Override
public boolean beforeProcessor(ScopeDataOperator dataOperator, Role role) {
// 开始统计
log.info("开始统计...");
return true;
}
@Override
public void afterProcessor(ScopeDataOperator dataOperator, Role role) {
// 统计结束
log.info("统计结束...");
}
@Override
public void errorProcessor(Throwable exception, ScopeDataOperator dataOperator, Role role) {
// 统计异常
log.info("统计异常...");
}
@Override
public void finallyProcessor(ScopeDataOperator dataOperator, Role role) {
// 兜底逻辑
log.info("兜底逻辑...");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
🟢 子流程拦截器需要实现SubProcessInterceptor
接口,并且需要将自定义拦截器交给Spring容器来管理
🟢 pointcut()
方法需要被实现,返回子流程中开始事件Id的Set集合,以此告知框架哪些子流程需要被拦截器处理
🟢 不同流程共同引用一个子流程时,拦截器如果要区分不同的主流程 pointcut()
方法就无法支持了。此时需要实现getSubProcessIdentity()
方法,它返回一个SubProcessIdentity
对象的Set集合。框架用SubProcessIdentity
对象集合来判断当前拦截器是否生效。SubProcessIdentity
有两种构造参数:
🔷 传入子流程的startEventId
,运行过程中拦截器定义的startEventId
与子流程的startEventId
相同时当前拦截器生效
public SubProcessIdentity(String startEventId) {
this(startEventId, null);
}
2
3
🔷 传入子流程的startEventId
和storyId
(最外层流程中的startEventId
),运行过程中拦截器中定义的startEventId
和storyId
都匹配成功时当前拦截器生效
public SubProcessIdentity(String startEventId, String storyId) {
super(TaskServiceUtil.joinName(startEventId, storyId), IdentityTypeEnum.SUB_PROCESS);
}
2
3
🟢 beforeProcessor
是子流程的前置通知方法,拦截器匹配成功后,进入子流程之前被执行。用来实现一些子流程执行之前的业务动作。它返回一个boolean变量。true代表进入子流程,fasle代表跳过子流程继续执行下一节点
🟢 afterProcessor
是子流程的后置通知,拦截器匹配成功,子流程执行完成之后会执行其中定义的业务动作
🟢 errorProcessor
是子流程的异常通知,拦截器匹配成功后,子流程执行中出现异常或超时时会执行其中定义的业务动作
🟢 finallyProcessor
是子流程的最终通知,拦截器匹配成功后,无论子流程是否执行成功,只要进入了子流程最终一定会执行其中定义的业务动作
🟢 getOrder()
可以控制多个拦截器生效的前后顺序。与Spring中的Order相同,返回的变量值越小优先级越高。没有复写该方法时默认返回1000
# 2.5.3 子流程控制
TIP
并非所有子流程都是业务的核心逻辑,也并非所有子流程执行时间都与父流程相同。所以框架提供了给子流程设置严格模式和超时时间的入口
🟢 timeout
以ms为单位。当timeout
存在且大于等于0时,即可控制允许子流程执行的最大超时时间。达到最大超时时间之后,如果子流程还未结束就会抛出超时异常
🟢 strict-mode
未被显示设置时默认为true,子流程出现异常或执行超时时会结束整个流程。strict-mode
被设置成false后,如果子流程出现异常或执行超时,异常将会被忽略,当前子流程和所属当前子流程的子流程将会被中断。之后继续执行子流程后面的服务节点
# 2.6 节点控制
# 2.6.1 允许服务为空
场景假设:
假设商品渲染时需要加载一些外部的商业广告,这时我们就要升级流程图增加获取广告信息的部分。但是这个流程图又在被多个系统解析执行,一部分系统有加载广告的模块,但有的系统不具备这个能力。这时如果不具备加载广告服务节点的系统解析该流程图时就会报错,原因是代码中找不到与配置文件中节点定义相匹配的服务节点
BPMN图示如下:
🟢 这时候会收到错误信息:[K1040004] No available TaskService matched! service task id: Activity_0ctfijm, name: 加载广告
🟢 可通过给节点增加:allow-absent=true
,来解决这个问题,该属性代表:允许配置节点找不到对应的服务节点,找不到时不会报错,会跳过继续执行
# 2.6.2 异常降级
场景假设:
在加载商品时,有着一个步骤是加载运费险信息。假设当下所依赖的物流系统极不稳定,获取运费险信息时经常出错。但是运费险信息又并非商品渲染流程的核心逻辑。所以在获取运费险失败时不应该中断整个流程,此时需要对问题节点降级处理,整体流程要继续向下执行
代码中模拟异常:
@NoticeSta
@TaskService(name = "get-logistic-insurance")
public LogisticInsurance getLogisticInsurance(GetLogisticInsuranceRequest request) {
// 模拟异常
int i = 1/0;
log.info("request source:{}", request.getSource());
LogisticInsurance logisticInsurance = new LogisticInsurance();
logisticInsurance.setDesc("运费险描述");
logisticInsurance.setType(1);
return logisticInsurance;
}
2
3
4
5
6
7
8
9
10
11
得到错误信息:
...
Caused by: java.lang.ArithmeticException: / by zero
at cn.kstry.demo.service.LogisticService.getLogisticInsurance(LogisticService.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
... 5 common frames omitted
2
3
4
5
6
节点降级处理:
可通过给节点增加:strict-mode=false
属性,来关闭服务节点的严格模式(并行网关也有一个严格模式,要分清两者作用的不同),跳过异常,让流程得以继续向下执行
# 2.6.3 节点调用控制
场景假设:
在调用服务节点获取收藏数时,等待时间随机从0ms~1000ms不等,此时可以设置方法的一个超时时间,当超时后可以进行重试,重试几次都失败时调用降级方法兜底
@NoticeSta
@TaskService(name = "get-goods-ext-info", invoke = @Invoke(retry = 2, demotion = "pr:goods@get-goods-ext-info-demotion", timeout = 500, strictMode = false))
public GoodsExtInfo getGoodsExtInfo(@ReqTaskParam("id") Long goodsId) throws InterruptedException {
GoodsExtInfo goodsExtInfo = new GoodsExtInfo();
goodsExtInfo.setCollectCount(30);
// 模拟超时
int i = Math.abs(new Random().nextInt());
TimeUnit.MILLISECONDS.sleep(i % 1000);
log.info("goods id: {}, get GoodsExtInfo: {}", goodsId, JSON.toJSONString(goodsExtInfo));
return goodsExtInfo;
}
@TaskService(name = "get-goods-ext-info-demotion")
public GoodsExtInfo getGoodsExtInfoDemotion(@ReqTaskParam("id") Long goodsId) {
GoodsExtInfo goodsExtInfo = new GoodsExtInfo();
goodsExtInfo.setCollectCount(0);
log.info("获取收藏数失败,执行降级方法。。。");
return goodsExtInfo;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
🟢 @TaskService
注解中的invole
属性是@Invokle
注解类型,可以用来控制方法的调用策略,具体配置如下:
🔷 retry
:服务节点执行失败时重试次数。默认为 0 代表不重试。举例:为 1 代表执行失败时,将进行一次重试
🔷 timeout
:执行服务节点时允许的最大等待时间,以ms为单位。默认为 -1 代表不设置超时时间,一直等待
🔷 strictMode
:严格模式,控制服务节点执行失败后是否要抛出异常,默认是严格模式,节点抛出异常后结束整个 Story 流程。关闭严格模式后,节点抛出异常时忽略该节点异常继续向下执行
🔷 demotion
:指定服务节点执行失败或超时后的降级处理表达式。异常发生后会根据表达式从容器中找到对应的服务节点执行,执行降级任务的服务节点必须与原服务节点有相同的方法入参和出参。demotion配置格式有问题,或者降级使用的目标服务节点未找到时降级策略将会失效
🔷 retryIncludeExp
:抛出异常包含指定异常时,进行重试。默认情况下列表为空,任何异常都将重试
🔷 retryExcludeExp
:抛出异常不包含指定异常时,进行重试。默认情况下列表为空,任何异常都将重试
降级表达式举例:
🟢 pr:risk-control@check-img
服务组件名是 risk-control
且服务名是 init-base-info
的服务节点
🟢 pr:risk-control@check-img@triple
服务组件名是 risk-control
且服务名是 init-base-info
且能力点名是 triple
的服务能力节点
@Invoke
中的属性具体生效顺序是:
1️⃣ 如果配置了大于-1的超时时间,目标方法调用会存在超时时间的限制,给定时间内未成功返回会抛出超时异常
2️⃣ 流程在出现超时异常或者执行服务节点异常时,会再判断是否设置了重试次数,如果重试次数大于0,会进行相应次数的重试操作
3️⃣ 未指定重试次数或者指定次数的重试都失败后,如果指定了降级方法,则会在此时调用降级方法
4️⃣ 以上调用全部都失败后,会判断当前服务节点是否为严格模式,如果是非严格模式,异常将会被忽略,流程会继续向下执行
# 2.7 流程遍历
场景假设:
在日常研发工作中,不难遇到一些需要对数据集合进行遍历处理的工作。比如集合中的每项数据依次使用给定的计算规则进行遍历计算。再比如电商场景中订单列表、商品列表、SKU列表等,在返回给前端界面渲染展示前都需要对列表的每一项进行相关资源的加载计算和赋值。在以上场景中,都有一个相同的特点,就是虽然计算规则复杂且场景多样,但是作用到集合中每项数据的规则都是一致且统一的
Kstry提供能力:对选定集合中的每一项数据在服务节点、子流程两个维度进行指定规则的加工计算
# 2.7.1 BPMN配置属性
ite-source
: 指定数据集合资源的位置,格式为正常的取值表达式,如:req.numList
,sta.skuList
🟢 只有指定正确格式的ite-source
属性,才会开启遍历动作
🟢 ite-source
指定的资源一定是实现java.lang.Iterable
接口或者数组格式,否则不会开启遍历动作
🟢 如果指定的集合资源未找到,则对应的服务节点或者子任务将直接跳过不再执行
ite-strategy
: 指定遍历策略,有三个取值
🟢 all
:不指定时的默认策略,集合中全部元素在遍历中都需要获取到正确的结果,只要有一个不满足会抛出异常,是否结束流程受strict-mode
影响。
🟢 best
:集合中全部元素进行遍历,尽量多的拿到正确结果,如果有数据项执行期间抛出异常,流程会忽略异常不会结束
🟢 any
:集合中只要有一项数据执行成功,就会结束遍历过程
ite-async
: 遍历动作是否并发进行
ite-stride
: 遍历步长。用来指定每次遍历时从集合或者数组中拿取多少个元素。默认为1代表逐个遍历。最后一次遍历集合中数据项数量小于步长时会全部返回
注意:
ite-stride
大于1时,在服务节点中 dataOperator.iterDataItem()
获取变量实际会得到一个包装有List实例的Optional对象
# 2.7.2 遍历服务节点
对集合中的一组数据求平方
定义服务节点方法
@TaskComponent(name = "calculate-service")
public class CalculateService {
/**
* 求平方再放回
*/
@TaskService(name = "square")
public void square(ScopeDataOperator dataOperator) throws InterruptedException {
// 遍历执行迭代器的每一项数据时,获取当前项数据,详细可看下面 ScopeDataOperator 介绍
Optional<Integer> iterDataItem = dataOperator.iterDataItem();
iterDataItem.ifPresent(i ->
dataOperator.computeIfAbsent("sta.squareResult", Lists::newCopyOnWriteArrayList).ifPresent(list -> {
list.add(i * i);
dataOperator.setResult(list);
})
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
定义BPMN流程
执行流程
@Test
public void test() {
DataSource dataSource = new DataSource();
dataSource.setNumList(Lists.newArrayList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
List<Integer> list = Lists.newCopyOnWriteArrayList();
StoryRequest<List<Integer>> fireRequest = ReqBuilder.returnType(list)
.timeout(3000).request(dataSource).startId("story-def-iterate-test_001").build();
TaskResponse<List<Integer>> fire = storyEngine.fire(fireRequest);
System.out.println(JSON.toJSONString(fire));
Assert.assertTrue(fire.isSuccess());
for (int i = 0; i < integers.size(); i++) {
Assert.assertEquals(integers.get(i) * integers.get(i), (int) fire.getResult().get(i));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2.7.3 遍历子流程节点
假设给一组SKU加载名称、图片等资源信息
定义服务节点方法
@TaskComponent(name = "sku-service")
public class SkuService {
@NoticeScope(target = "sku-list", scope = ScopeTypeEnum.STABLE)
@TaskService(name = "get-sku-list")
public List<SkuBo> getSkuList() {
List<SkuBo> collect = LongStream.range(1, 11).mapToObj(i -> {
SkuBo skuBo = new SkuBo();
skuBo.setId(i);
return skuBo;
}).collect(Collectors.toList());
System.out.println(Thread.currentThread().getName() + " - init sku ->" + JSON.toJSONString(collect));
return collect;
}
@NoticeResult
@TaskService(name = "set-sku-name")
public List<SkuBo> setSkuName(@StaTaskParam("sku-list") List<SkuBo> skuList, ScopeDataOperator dataOperator) {
Optional<SkuBo> o = dataOperator.iterDataItem();
o.ifPresent(bo -> {
if (bo.getId() == 4) {
throw new RuntimeException("加载名称失败了,但是无伤大雅,因为设置了best策略!");
}
bo.setName("SKU名称" + bo.getId());
System.out.println(Thread.currentThread().getName() + " - set sku name ->" + JSON.toJSONString(bo));
});
return skuList;
}
@TaskService(name = "set-sku-img")
public void setSkuImg(ScopeDataOperator dataOperator) {
Optional<SkuBo> o = dataOperator.iterDataItem();
o.ifPresent(bo -> {
bo.setImg("SKU图片" + bo.getId());
System.out.println(Thread.currentThread().getName() + " - set sku img ->" + JSON.toJSONString(bo));
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
定义BPMN流程
执行流程
@Test
public void test() {
List<SkuBo> list = Lists.newArrayList();
StoryRequest<List<SkuBo>> fireRequest = ReqBuilder.returnType(list).startId("story-def-iterate-test_100").build();
TaskResponse<List<SkuBo>> fire = storyEngine.fire(fireRequest);
System.out.println(JSON.toJSONString(fire));
Assert.assertTrue(fire.isSuccess());
}
2
3
4
5
6
7
8
查看日志
kstry-task-thread-pool-0 - init sku ->[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":6},{"id":7},{"id":8},{"id":9},{"id":10}]
kstry-task-thread-pool-1 - set sku name ->{"id":1,"name":"SKU名称1"}
kstry-task-thread-pool-1 - set sku name ->{"id":2,"name":"SKU名称2"}
kstry-task-thread-pool-1 - set sku name ->{"id":3,"name":"SKU名称3"}
2022-08-28 02:01:45.042 [kstry-task-thread-pool-1] [bc4a9c9e-44d3-4976-89bd-d99e43780feb] WARN cn.kstry.framework.core.engine.BasicTaskCore.doInvokeMethod:181 - [K1050006] Iteration item execution failure! taskName:设置SKU名字
java.lang.RuntimeException: 加载名称失败了,但是无伤大雅,因为设置了best策略!
at cn.kstry.framework.test.iterator.service.SkuService.lambda$setSkuName$1(SkuService.java:57)
...
kstry-task-thread-pool-1 - set sku name ->{"id":5,"name":"SKU名称5"}
kstry-task-thread-pool-1 - set sku name ->{"id":6,"name":"SKU名称6"}
kstry-task-thread-pool-1 - set sku name ->{"id":7,"name":"SKU名称7"}
kstry-task-thread-pool-1 - set sku name ->{"id":8,"name":"SKU名称8"}
kstry-task-thread-pool-1 - set sku name ->{"id":9,"name":"SKU名称9"}
kstry-task-thread-pool-1 - set sku name ->{"id":10,"name":"SKU名称10"}
kstry-iterator-thread-pool-0 - set sku img ->{"id":1,"img":"SKU图片1","name":"SKU名称1"}
kstry-iterator-thread-pool-1 - set sku img ->{"id":2,"img":"SKU图片2","name":"SKU名称2"}
kstry-iterator-thread-pool-2 - set sku img ->{"id":3,"img":"SKU图片3","name":"SKU名称3"}
kstry-iterator-thread-pool-1 - set sku img ->{"id":9,"img":"SKU图片9","name":"SKU名称9"}
kstry-iterator-thread-pool-0 - set sku img ->{"id":10,"img":"SKU图片10","name":"SKU名称10"}
kstry-iterator-thread-pool-3 - set sku img ->{"id":4,"img":"SKU图片4"}
kstry-iterator-thread-pool-4 - set sku img ->{"id":5,"img":"SKU图片5","name":"SKU名称5"}
kstry-iterator-thread-pool-7 - set sku img ->{"id":8,"img":"SKU图片8","name":"SKU名称8"}
kstry-iterator-thread-pool-5 - set sku img ->{"id":6,"img":"SKU图片6","name":"SKU名称6"}
kstry-iterator-thread-pool-6 - set sku img ->{"id":7,"img":"SKU图片7","name":"SKU名称7"}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
🟢 由于加载名称节点设置了best策略,即使执行失败流程也没结束,可以看到只是id=4时,没有成功设置名称
🟢 加载图片的线程id是不一样的,是因为开启了异步,更多的线程参与了任务的执行,并发带来的另一个副作用就是无序
# 2.8 服务节点拦截器
TIP
框架提供了服务节点拦截器,用来对流程中需要执行的服务节点进行拦截和控制
拦截器定义
@Slf4j
@Component
public class ServiceNodeInterceptor implements TaskInterceptor {
/**
* 接口有默认行为,可以不实现该方法
*/
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
/**
* 接口有默认行为,可以不实现该方法
*/
@Override
public boolean match(IterData iterData) {
ScopeDataOperator dataOperator = iterData.getDataOperator();
return true;
}
@Override
public Object invoke(Iter iter) {
ServiceNodeResource serviceNode = iter.getServiceNode();
ScopeDataOperator dataOperator = iter.getDataOperator();
Role role = iter.getRole();
Object data = iter.next();
log.info("ComponentName: {}, ServiceName: {}, AbilityName: {}, data: {}", serviceNode.getComponentName(), serviceNode.getServiceName(), serviceNode.getAbilityName(), data);
return data;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
🟢 创建实现了TaskInterceptor
接口的类实例并放入Spring容器中,服务节点拦截器便会生效
🟢 TaskInterceptor
接口提供以下三个方法:
🔷 getOrder()
:用来指定拦截器列表顺序,接口有默认实现可以不重写,默认返回0
🔷 match(IterData iterData)
:用来判断当前拦截器是否生效。默认情况下,服务节点拦截器对全部服务节点生效,该方法可以过滤掉无需生效的拦截器。接口有默认实现可以不重写,默认返回:true
🔷 invoke(Iter iter)
:拦截器实际生效的方法。提供Iter
入参,可以凭此对象执行下一个拦截器或目标节点,获取Role
、ScopeDataOperator
、ServiceNodeResource
🟢 服务节点拦截器在节点策略动作的最外层。也就是说出现重试、降级、迭代等动作时服务节点会被执行多次,但是拦截器只会执行一次。如果需要在服务节点方法被调用时进行拦截,可以使用Spring AOP,因为类组件本身就是Spring中的实例
# 2.9 自定义指令
TIP
一些场景中需要对数据进行操作比如排序、过滤、计算等。这些操作服务节点可以完成,但它一般是以满足业务动作为目标,用来定义完成这种洗数据的工作太过于繁重,然而数据加工又不得不做。框架提供了自定义指令的功能,可以用来定义通用的工具,在流程中直接使用
自定义指令是在服务节点的基础上创建的,它实际上就是一个服务节点。只是此时的服务节点提供了一套更通用更便捷的使用方式,被定义用来解决一些与业务无关的通用性问题。这种更通用更便捷的使用方式就是自定义指令。
# 2.9.1 定义指令
用来对StoryBus中的集合进行排序
/**
* 简单定义,用来演示自定义指令,不能直接用在生产环境,有很多异常情况未考虑
*/
@TaskComponent(name = "InstructService")
public class InstructService {
@TaskInstruct(name = "sort")
@TaskService(name = "instruct-service-sort")
public void sort(InstructContent instructContent, ScopeDataOperator operator) {
SortContent sortContent = JSON.parseObject(instructContent.getContent(), SortContent.class);
Optional<Object> dataOptional = operator.getData(sortContent.getSource());
if (!dataOptional.isPresent() || !(dataOptional.get() instanceof List)) {
return;
}
List<?> list = (List<?>) dataOptional.get();
if (sortContent.asc) {
list.sort(Comparator.comparing(o -> {
try {
return (Comparable) PropertyUtils.getProperty(o, sortContent.getSortFieldName());
} catch (Exception e) {
return null;
}
}));
} else {
list.sort(Comparator.comparing(o -> {
try {
return (Comparable) PropertyUtils.getProperty(o, sortContent.getSortFieldName());
} catch (Exception e) {
return null;
}
}).reversed());
}
operator.setData(sortContent.getTarget(), list);
}
@Data
public static class SortContent {
/**
* 要排序数据的位置
*/
private String source;
/**
* 根据对象的哪个字段来排序
*/
private String sortFieldName;
/**
* 排序后的集合放在什么地方
*/
private String target;
/**
* 是否是升序
*/
private boolean asc;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
🟢 如果不考虑@TaskInstruct(name = "sort")
,仅仅是定义了一个名字为instruct-service-sort
的服务节点,加上@TaskInstruct
就是自定义指令
🟢 @TaskInstruct
有一个name
属性,用来定义指令的名称。在BPMN配置文件中使用指令时名称前面需要加c-
,比如上面的例子就是c-sort
🟢 InstructContent
参数用来获取指令名称和内容。无需增加任何注解,直接使用即可
# 2.9.2 使用指令
场景假设:
获取到完整的商品信息后,对标签进行排序
指令名称是c-sort
,内容是:
{
"source": "sta.shopInfo.labels", // 要排序数据的位置
"sortFieldName": "index", // 根据对象的哪个字段来排序
"target": "var.labels", // 排序后的集合放在什么地方
"asc": false // 是否是升序
}
2
3
4
5
6
🟢 上面例子中简单定义服务节点的指令名称和值,其他的并未定义
🟢 服务节点支持同时配置指令和服务节点名称,并且同一个服务节点支持同时配置多个不同指令
🟢 c-
开头的指令如果和服务节点名称同时存在,默认会先执行名称对应的服务节点再执行指令。如果执行顺序要反过来需要在c-
前面加^
,例如:^c-sort
🟢 包含网关、排他网关也支持同时配置一个或多个指令,定义的指令会在网关之前执行,使用场景是网关前面定义指令对数据进行加工,网关后面再跟条件表达式进行路由决策
注意:
代码方式定义流程时是不支持增加^
前缀来改变指令和节点方法执行顺序的
# 2.10 脚本指令
TIP
Kstry服务节点支持JS脚本指令,实现方式就是自定义指令。如果当下的脚本引擎或者逻辑不满足现状的话,可以使用自定义指令进行定制化脚本引擎的开发
# 2.10.1 实现脚本指令
@TaskInstruct(name = "jscript")
@TaskService(name = "js-script-instruct")
public void instruct(InstructContent instructContent, ScopeDataOperator scopeDataOperator) {
if (StringUtils.isBlank(instructContent.getContent())) {
return;
}
JsScriptProperty property = null;
if (scopeDataOperator.getTaskProperty().isPresent()) {
try {
property = JSON.parseObject(scopeDataOperator.getTaskProperty().get(), JsScriptProperty.class);
} catch (Exception e) {
LOGGER.warn("[{}] js script property parsing exception. instruct: '{}{}', property: {}", ExceptionEnum.SCRIPT_PROPERTY_PARSER_ERROR.getExceptionCode(),
BpmnElementProperties.SERVICE_TASK_TASK_INSTRUCT, instructContent.getInstruct(), scopeDataOperator.getTaskProperty().orElse(StringUtils.EMPTY), e);
}
}
String invokeMethodName = "invoke";
String script = instructContent.getContent();
try {
if (property != null && StringUtils.isNotBlank(property.getInvokeMethod())) {
invokeMethodName = property.getInvokeMethod();
} else {
script = GlobalUtil.format(DEFAULT_FUNCTION, invokeMethodName, script);
}
LOGGER.debug("invoke js script. instruct: '{}{}', script: {}, property: {}",
BpmnElementProperties.SERVICE_TASK_TASK_INSTRUCT, instructContent.getInstruct(), script, scopeDataOperator.getTaskProperty().orElse(StringUtils.EMPTY));
ScriptEngine jsEngine = new ScriptEngineManager().getEngineByName("javascript");
Bindings bind = jsEngine.createBindings();
bind.put("ksta", scopeDataOperator.getStaScope());
bind.put("kvar", scopeDataOperator.getVarScope());
bind.put("kreq", scopeDataOperator.getReqScope());
bind.put("kres", scopeDataOperator.getResult().orElse(null));
jsEngine.setBindings(bind, ScriptContext.ENGINE_SCOPE);
jsEngine.eval(script);
Object result = ((Invocable) jsEngine).invokeFunction(invokeMethodName);
if (result != null && property != null && StringUtils.isNotBlank(property.getReturnType())) {
result = JSON.parseObject(JSON.toJSONString(result), Class.forName(property.getReturnType()));
}
if (property != null && CollectionUtils.isNotEmpty(property.getReturnTarget())) {
for (String target : property.getReturnTarget()) {
boolean setRes = scopeDataOperator.setData(target, result);
LOGGER.debug("invoke js script set result. instruct: '{}{}', target: {}, set result: {}, result: {}",
BpmnElementProperties.SERVICE_TASK_TASK_INSTRUCT, instructContent.getInstruct(), target, setRes ? "success" : "fail", result);
}
}
LOGGER.debug("invoke js script success. instruct: '{}{}', result: {}", BpmnElementProperties.SERVICE_TASK_TASK_INSTRUCT, instructContent.getInstruct(), result);
} catch (Throwable e) {
throw ExceptionUtil.buildException(e, ExceptionEnum.SCRIPT_EXECUTE_ERROR, GlobalUtil.format("js script execution exception! instruct: '{}{}', property: {}, script: \n{}",
BpmnElementProperties.SERVICE_TASK_TASK_INSTRUCT, instructContent.getInstruct(), scopeDataOperator.getTaskProperty().orElse(StringUtils.EMPTY), script));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
以上为使用自定义指令实现支持的服务节点脚本功能的源码,可直接使用默认脚本能力,也可作为参考实现自己的脚本引擎
# 2.10.2 默认脚本功能
前面提到过包含网关、排他网关也支持配置自定义指令,脚本指令也是指令,所以也是支持的。下图是在排他网关上定义的一个脚本指令:
🟢 脚本指令需要指定脚本名称c-jscript
,此名称是固定的,指令内容不能为空否则脚本不生效
🟢 如果指令和服务节点名称同时存在,并且想要在节点方法之前执行,可以在指令名称前加^
,这也是自定义指令的功能
🟢 在服务节点或者网关上可以使用task-property
来指定脚本属性
{
'invoke-method': 'invoke',
'return-type': 'java.lang.Integer',
'return-target': ["res", "sta.r", "var.r"]
}
2
3
4
5
🔷 invoke-method
:指定脚本方法名称。默认无需设置,此时直接写脚本逻辑即可,如果该字段不为空,脚本内容需定义成方法格式: function inv(){ ksta.v = kreq.a + kvar.num + ksta.num; return ksta.v * 2;}
🔷 return-type
:脚本执行完之后结果的返回类型
🔷 return-target
:脚本执行完之后最终的结果被放到StoryBus的什么位置,支持设置多个
注意:
高版本JDK不支持JS引擎获取,new ScriptEngineManager().getEngineByName("javascript");
会得到空值,此时需要在项目中加入依赖:
<dependency>
<groupId>org.openjdk.nashorn</groupId>
<artifactId>nashorn-core</artifactId>
<version>15.3</version>
</dependency>
2
3
4
5
# 2.11 代码方式定义流程
TIP
BPMN文件虽然有非常优秀的可视化特点,但是也有一些不足之处,比如ide中点击定义在配置文件中的服务节点无法快速定位到具体方法、无法识别自定义协议的流程文件等。Kstry提供代码定义流程的方式来平衡这些问题。
框架提供ProcessLink
作为代码方式定义流程最核心的类,它提供了众多辅助流程节点彼此间拼接的方法:
🟢 nextTask
:指定下一个服务节点,类组件和服务节点名称都需要提供
🟢 nextService
:指定下一个服务节点,仅需要提供服务节点名称即可。在确保服务节点名称全局唯一时使用,可以节省代码量
🟢 nextInstruct
:指定下一个指令。指令是一种特殊的服务节点。
🟢 nextSubProcess
:指定下一个子流程
🟢 nextExclusive
:指定下一个排他网关
🟢 nextInclusive
:指定下一个包含网关
🟢 nextParallel
:指定下一个并行网关
🟢 joinTask
:将多个任务分支进行合并
🟢 end
:指定下一个结束事件
# 2.11.1 主流程定义
代码方式定义此流程图如下:
@Bean
public ProcessLink buildShowGoodsLink() {
StartProcessLink bpmnLink = StartProcessLink.build(SHOW_GOODS_LINK, "展示商品详情");
// 构建一个游离的并行网关,网关标识开启异步,并且是非严格模式的并行网关(因为网关左侧的分支并不能全部到达并行网关,如果不设置非严格模式就会报错,也可用包含网关来替代将不会有这个问题)
ParallelJoinPoint pPoint1 = bpmnLink.parallel().openAsync().notStrictMode().build();
// 初始化基本信息
ProcessLink initTask = bpmnLink.nextTask().component(CSF.C.GOODS).service(CSF.GOODS.S.INIT_BASE_INFO).name("初始化商品信息").customRole("goods-custom-role@goods-detail").build();
// 指向并行网关
initTask.nextParallel("res.img == null", pPoint1);
ProcessLink rCheck = initTask
// res.img != null 时校验图片
.nextTask("res.img != null", CSF.C.RISK_CONTROL, CSF.RISK_CONTROL.S.CHECK_IMG).name("校验图片").build()
// 指向一个判断角色的排他网关
.nextExclusive().build();
// 排他网关指向并行网关
rCheck.nextParallel(KeyUtil.nr(CSF.RISK_CONTROL.S.CHECK_IMG, CSF.F_RISK_CONTROL.F.triple), pPoint1);
rCheck
// 三方服务统计
.nextTask(KeyUtil.r(CSF.RISK_CONTROL.S.CHECK_IMG, CSF.F_RISK_CONTROL.F.triple), CSF.C.RISK_CONTROL, CSF.RISK_CONTROL.S.STATISTICS).name("风控统计").build()
// 之后指向并行网关
.nextParallel(pPoint1);
// 指向排他网关
ProcessLink sourceCheck = pPoint1.nextExclusive().build();
// 构建一个游离的包含网关
InclusiveJoinPoint iPoint1 = bpmnLink.inclusive().build();
// 排他网关指向包含网关
sourceCheck.nextInclusive(KeyUtil.req("source", "!=", "'app'"), iPoint1);
// 排他网关指向送运费险节点
sourceCheck
// 送运费险有可能报错,设置非严格模式,报错会忽略,会继续向下执行
.nextTask(ServiceTask.builder().component(CSF.C.LOGISTIC).service(CSF.LOGISTIC.S.GET_LOGISTIC_INSURANCE).name("运费险").notStrictMode().ins())
.nextInclusive(iPoint1);
bpmnLink.parallel().build()
.joinLinks(
// 并行网关指向广告服务节点,广告服务节点没有对应的TaskService默认会报错,但是加上 allowAbsent() 之后就不会报错了
pPoint1.nextTask("advertising", "get-advertising").name("获取广告").allowAbsent().build(),
// 并行节点指向子流程
pPoint1.nextSubProcess(STATISTICS_SUB_PROCESS).build(),
// 并行网关指向 加载SKU信息 节点
pPoint1.nextService(CSF.GOODS.S.INIT_SKU).name("加载SKU信息").build(),
iPoint1.nextTask(CSF.C.SHOP, CSF.SHOP.S.GET_SHOP_INFO_BY_GOODS_ID).name("获取店铺信息").build()
)
.nextTask(CSF.C.GOODS, CSF.GOODS.S.DETAIL_POST_PROCESS).name("商品信息后置处理").build()
.end();
return bpmnLink;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
🟢 举例说明代码方式也可以定义比较复杂的流程。但是可读性很差
🟢 简单的流程可以用代码来定义,复杂的流程还是推荐使用 bpmn 文件定义,可视化展现
🟢 还有一种方式就是 bpmn 文件可视化定义一些复杂的子流程,在代码中进行引用,这个是完全可行的,配置文件和代码中子流程的相互引用是打通了的
🟢 定义ProcessLink
对象,放入Spring容器中,流程在启动时就会被加载生效,这种生效方式是不支持动态变更的
# 2.11.2 子流程定义
@Bean
public SubProcessLink buildStatisticsSubProcess() {
return SubProcessLink.build(STATISTICS_SUB_PROCESS, link -> {
// 构建一个游离的包含网关,网关标识开启异步
InclusiveJoinPoint inclusive1 = link.inclusive().openAsync().build();
// 开始节点指向包含网关
link.nextInclusive(inclusive1);
// 再次构建一个游离的包含网关
InclusiveJoinPoint inclusive2 = link.inclusive().build();
// 包含网关1分别指向三个 TaskService, 三个 TaskService 再分别指向包含网关2,包含网关2指向结束节点
inclusive2.joinLinks(
inclusive1.nextTask("res.needEvaluate == true", CSF.C.EVALUATION, CSF.EVALUATION.S.GET_EVALUATION_INFO).name("获取评价信息").build(),
inclusive1.nextTask(CSF.C.GOODS, CSF.GOODS.S.GET_GOODS_EXT_INFO).name("获取额外信息").build(),
inclusive1.nextTask(CSF.C.ORDER, CSF.ORDER.S.GET_ORDER_INFO).name("获取订单信息").build()
).end();
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
🟢 子流程与主流程有相似之处,定义SubProcessLink
对象放入Spring容器中,子流程在启动时就会被加载生效,同样不支持动态变更
# 2.11.3 解析文件定义流程
代码定义流程看上去比BPMN可视化流程图复杂不少,为什么还要支持呢?
正如前面所说的,少量服务节点组成的流程,代码定义起来会更加方便。而更重要的一个原因则是,用代码方式定义流程可以支持任意协议的流程配置文件,可以按需解析开源或公司内部的前端流程组件,并配合Kstry完成最终的流程编排。
而框架本身,在1.1.x之后的版本里PBMN流程配置文件也是用这套API来解析的,具体可以参考Kstry源码中的实现: CamundaBpmnModelTransfer (opens new window)。在后面的动态流程配置里,这套API也是必不可少的部分
# 2.12 动态流程配置
TIP
定义路径扫描bpmn文件和创建ProcessLink
实例放入Spring容器两种流程的定义方式是静态的,如果要修改只能重启应用。框架在1.1.x版本之后提供了动态获取流程配置的能力,并且动态化配置支持通过http或rpc调用、关系或者非关系型数据库查询、订阅消息队列、订阅注册中心、公司自定义存储介质查询等任意方式获取
# 2.12.1 使用动态流程
@Component
public class DynamicProcessRegister implements DynamicProcess {
@Override
public long version(String key) {
return 0;
}
@Override
public Optional<ProcessLink> getProcessLink(String startId) {
InputStream inputStream = DynamicProcessRegister.class.getClassLoader().getResourceAsStream("dynamic/goods.bpmn");
BpmnProcessParser parser = new BpmnProcessParser("动态获取流程", inputStream);
return parser.getProcessLink(startId);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
🟢 创建实现DynamicProcess
接口的类实例,放入Spring容器中,动态流程组件即会生效
🟢 DynamicProcess
提供两个方法:
🔷 version(String key)
:传入startId返回当前流程的版本号,版本号没有变化并且流程缓存没有失效时,框架不会调用getProcessLink(String startId)
来获取新的流程,反之则会重新获取新的流程。流程缓存的时长是1天。修改流程后控制该方法进行版本号升级,新的流程会即刻生效。
🔷 getProcessLink(String startId)
:根据startId获取流程配置。缓存未失效且版本未改变时该方法不会被调用。
🟢 上面代码演示的是从类路径下加载流程配置文件,通过流程解析器解析生成ProcessLink
对象并返回的过程。可以根据需要从任意存储介质中获取流程配置信息
🟢 如果不需要缓存,每次都要获取最新的流程,将version(String key)
的返回值设置成小于0的值即可
# 2.12.2 使用动态子流程
TIP
框架同样提供了动态获取子流程的能力
@Component
public class DynamicSubProcessRegister implements DynamicSubProcess {
@Override
public List<SubProcessLink> getSubProcessLinks() {
InputStream inputStream = DynamicSubProcessRegister.class.getClassLoader().getResourceAsStream("dynamic/goods.bpmn");
BpmnProcessParser parser = new BpmnProcessParser("动态获取流程", inputStream);
return Lists.newArrayList(parser.getAllSubProcessLink().values());
}
}
2
3
4
5
6
7
8
9
10
🟢 创建实现DynamicSubProcess
接口的类实例,放入Spring容器中,动态子流程组件即会生效
🟢 动态子流程没有版本的概念,只要是获取就是最新的。获取子流程动作是在创建主流程时被触发的,可以理解成主流程和子流程共用一个版本号
# 2.12.3 动态与静态流程关系
只有bpmnPath
一种流程配置途径时,下面这种可视化流程图就是被定义出来的流程,也是凭此来指导程序实际线路的执行。ProcessLink
出现之后,框架提供了自定义流程的能力给使用者,在1.x之后的版本中,BPMN可视化流程图也是被解析成ProcessLink
对象来生效的,此时的BPMN可视化流程就作为了流程定义的一个子集而存在。更多情况下使用者可以通过编码、解析各类配置文件等方式来创建ProcessLink
实例,最终交付给框架的ProcessLink
实例将会作为流程定义来指导程序的执行。
Kstry提供了三种方式来定义流程配置,分别是:
🟢 【基础流程配置】指定@EnableKstry
注解的bpmnPath
属性来扫描指定目录下的bpmn配置文件
🟢 【基础流程配置】创建ProcessLink
实例放入Spring容器中
🟢 【动态流程配置】实现DynamicProcess
接口的getProcessLink
方法
前面两种方式定义的流程是应用启动前就必须要定义出来的属于基础流程配置,基础流程除非应用重启否则是不允许修改的。根据startId
在基础流程配置库里获取不到流程配置时最后一种方式才会生效,其获取流程配置的方式是动态的,传入startId
返回ProcessLink
实例,其中所需的流程配置信息可以通过http或rpc调用、关系或者非关系型数据库查询、订阅消息队列、订阅注册中心、公司自定义存储介质查询等任意方式获取。

之所以没有使用动态改变基础流程配置库的方式来实现流程的动态配置能力,有以下原因:
🟢 【安全性】流程配置可能会分重要级别,核心的流程配置是被严格管控不允许随意变更的,所以这类配置应该放在代码中不允许随意变更。如果在流程配置基础库中找到与startId
相匹配的流程,动态获取部分将失效,以此来保障流程的绝对安全。即便核心流程中有部分需要动态获取的扩展逻辑,也可以用动态子流程的形式来实现。
🟢 【兼容性】动态化配置可能比较适合使用注册中心来存储维护,这样当发生变更时能快速及时通知相关应用做流程变更。但并没有一个注册中心是所有公司都使用的,除了开源产品还有公司自研,如果想要一一支持是不现实的。选用其他中间件产品也是同样的道理。所以解决这个问题的方式就是提供一个动态获取流程配置的入口,至于流程配置在什么地方获取将是因人而异的。
在流程中出现可复用或者复杂的链路片段时,就可以将这些片段抽离出来定义子流程。父流程可以通过子流程Id来引用子流程。同样也有三种方式来定义子流程,分别是:
🟢 【基础子流程配置】指定@EnableKstry
注解的bpmnPath
属性来扫描指定目录下的bpmn配置文件
🟢 【基础子流程配置】创建SubProcessLink
实例放入Spring容器中
🟢 【动态子流程配置】实现DynamicSubProcess
接口的getSubProcessLinks
方法
主流程有基础和动态之分,相对应也有基础和动态子流程的存在。下图为两者结合起来时,相互之间是否允许被引用的说明:

🟢 基础流程仅能引用基础子流程配置,动态流程可以引用基础子流程和动态子流程配置
🟢 两种子流程在各自的区域内可以相互引用,动态子流程中可以引用基础子流程配置,反之则不被允许
警告
子流程相互间引用时,一定不能出现环状依赖,否则会出现异常❗️❗️❗️