场景假设

继续延用商品展示的例子,在一个商品展示的 Story 中系统需要依赖很多服务,有内部的,也有外部的。内部服务毕竟在自己系统里,可控性相对较强,但不能避免的,也会出现一些耗时较高的操作。外部系统就更不用说了,作为研发一定吃过外部服务延时高可用性差的苦头。一般情况下,我们会引入异步化来尝试优化这些难缠的超时问题,Kstry又是如何做的呢?

# 4.1 超时时间

TIP

Kstry 调用执行 Story 时,存在超时时间的限制,不会允许调用一直等待下去

默认超时时间:

系统默认超时时间是3s,请求开始执行后,如果3s内没有收到结果,就会报超时异常

cn.kstry.framework.core.exception.TaskAsyncException: [K1060002] Asynchronous node task timeout!
    at cn.kstry.framework.core.exception.KstryException.buildException(KstryException.java:88)
    at cn.kstry.framework.core.engine.AsyncTaskCell.get(AsyncTaskCell.java:107)
    at cn.kstry.framework.core.engine.StoryEngine.doFire(StoryEngine.java:184)
    at cn.kstry.framework.core.engine.StoryEngine.fire(StoryEngine.java:90)
    at cn.kstry.demo.web.GoodsController.showGoods(GoodsController.java:49)

Caused by: java.util.concurrent.TimeoutException: Async task timeout! maximum time limit: 3000ms, block task count: 1, block task: [Flow_0rl59u8]
    at cn.kstry.framework.core.engine.AsyncTaskCell.get(AsyncTaskCell.java:98)
    ... 52 common frames omitted
1
2
3
4
5
6
7
8
9
10

Kstry 提供了自定义超时时间的入口:

    🟢 配置全局超时时间:

# application.yml
kstry:
  story:
    timeout: 3000 # 全局超时时间为 3000ms
1
2
3
4

    🟢 显示指定超时时间:

StoryRequest<GoodsDetail> req = ReqBuilder.returnType(GoodsDetail.class).timeout(3000).startId(StartIdEnum.GOODS_SHOW.getId()).request(request).build();
TaskResponse<GoodsDetail> fire = storyEngine.fire(req);
if (fire.isSuccess()) {
  return fire.getResult();
}
1
2
3
4
5

    🟢 timeout(3000) 显示指定当前请求超时时间 3000ms。显示指定会覆盖全局配置的超时时间

# 4.2 开启异步

    首先使用 sleep 模拟几个耗时的服务节点:“送运费险” sleep 100ms,“加载店铺信息” sleep 200ms,“加载评价数” sleep 200ms。调用一次接口,查看追踪日志(后面链路追踪环节会详细讲到,先使用这个功能)中重点需要关注的一些点:

// [http-nio-8080-exec-3][5837f673-15cd-4aef-97cc-6516ff14cb75] INFO  c.k.f.c.m.MonitorTracking - [K1040009] startId: kstry-demo-goods-show, spend 531ms ...

[{
    "methodName": "initBaseInfo",
    "nodeName": "初始化\n基本信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 1,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "checkImg",
    "nodeName": "风控服务",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.RiskControlService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "initSku",
    "nodeName": "加载SKU信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getLogisticInsurance",
    "nodeName": "送运费险",
    "nodeType": "SERVICE_TASK",
    "spendTime": 106,
    "targetName": "cn.kstry.demo.service.LogisticService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getShopInfoByGoodsId",
    "nodeName": "加载店铺信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 205,
    "targetName": "cn.kstry.demo.service.ShopService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getGoodsExtInfo",
    "nodeName": "加载收藏数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getOrderInfo",
    "nodeName": "加载下单数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.OrderService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getEvaluationInfo",
    "nodeName": "加载评价数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 205,
    "targetName": "cn.kstry.demo.service.EvaluationService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "nodeName": "加载广告",
    "nodeType": "SERVICE_TASK",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "detailPostProcess",
    "nodeName": "商详后置处理",
    "nodeType": "SERVICE_TASK",
    "spendTime": 1,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-1",
}]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

    🟢 服务节点依次执行,使用到的线程是同一线程:kstry-task-thread-pool-1

    🟢 调用总耗时531ms。“送运费险”:106ms,“加载店铺信息”:205ms,“加载评价数”:205ms,时间基本都花在了这三个节点上,并且是累加计算的

在子任务中,打开商品数据统计的异步化:

image-20211213145202846

在主流程中打开店铺、商品、物流服务调用的异步化:

image-20211213145245630

    🟢 如图所见,开启异步功能就是如此的简单,在网关上配置open-async=true即可

    🟢 目前支持配置异步开启的组件有:并行网关、包含网关两种,排他网关虽然是允许有多个出度,但最终只有一个出度被执行,所以开启异步的效果不大并未支持开启异步

    🟢 和普通的多线程使用方式一样,并发度太高也并非一定是好事。每开启一个新线程都要创建新的计算任务,加上线程间的上下文切换,在一些本来就耗时很短的服务节点间开启异步大多时候会得不偿失

开启异步化后的统计日志:

// [http-nio-8080-exec-8][7900a5a3-3c9c-44e4-914c-ac2d83e2c2f0] INFO  c.k.f.c.m.MonitorTracking - [K1040009] startId: kstry-demo-goods-show, spend 315ms ...

[{
    "methodName": "initBaseInfo",
    "nodeName": "初始化\n基本信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 2,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-3",
}, {
    "methodName": "checkImg",
    "nodeName": "风控服务",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.RiskControlService",
    "threadId": "kstry-task-thread-pool-3",
}, {
    "methodName": "initSku",
    "nodeName": "加载SKU信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-3",
}, {
    "nodeName": "加载广告",
    "nodeType": "SERVICE_TASK",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getLogisticInsurance",
    "nodeName": "送运费险",
    "nodeType": "SERVICE_TASK",
    "spendTime": 102,
    "targetName": "cn.kstry.demo.service.LogisticService",
    "threadId": "kstry-task-thread-pool-7",
}, {
    "methodName": "getGoodsExtInfo",
    "nodeName": "加载收藏数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-6",
}, {
    "methodName": "getOrderInfo",
    "nodeName": "加载下单数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 0,
    "targetName": "cn.kstry.demo.service.OrderService",
    "threadId": "kstry-task-thread-pool-1",
}, {
    "methodName": "getEvaluationInfo",
    "nodeName": "加载评价数",
    "nodeType": "SERVICE_TASK",
    "spendTime": 201,
    "targetName": "cn.kstry.demo.service.EvaluationService",
    "threadId": "kstry-task-thread-pool-3",
}, {
    "methodName": "getShopInfoByGoodsId",
    "nodeName": "加载店铺信息",
    "nodeType": "SERVICE_TASK",
    "spendTime": 205,
    "targetName": "cn.kstry.demo.service.ShopService",
    "threadId": "kstry-task-thread-pool-7",
}, {
    "methodName": "detailPostProcess",
    "nodeName": "商详后置处理",
    "nodeType": "SERVICE_TASK",
    "spendTime": 1,
    "targetName": "cn.kstry.demo.service.GoodsService",
    "threadId": "kstry-task-thread-pool-7",
}]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

    🟢 总耗时 315ms,比未开启异步前少了大概200ms,可以分析下原因:

        🔷 耗时的节点有三个:“送运费险” sleep 100ms,“加载店铺信息” sleep 200ms,“加载评价数” sleep 200ms

        🔷 “送运费险” 和 “加载店铺信息” 两个节点是串行执行的,所以耗时是累加的

        🔷 “加载评价数” 虽然在子流程,但是子流程引用与上面两个节点是并行执行的

image-20211213195605737

        🔷 两个红框并行执行,两个绿框串行执行。两个并行流程中取耗时较大值300ms,所以最终耗时300ms左右

    🟢 任务流中出现了不止一个的线程id,开启异步化后,任务创建并提交到线程池中,随机分配线程去执行

# 4.3 异步化生命周期

TIP

在并行网关、包含网关上配置open-async=true属性即可开启异步流程。那么异步的开始是什么时候,结束又是在何处呢?

# 4.3.1 异步的开始

开始流程如下:

    🟢 执行引擎检测到并行网关、包含网关上配置open-async=true属性后,会将网关后面的出度包装成异步任务,并行网关与包含网关执行策略又略显不同:

        🔷 并行网关会将其后的全部出度逐一包装成异步任务,提交至线程池执行

        🔷 包含网关会判断其后的出度是否有条件表达式,如果有会先解析条件表达式,将表达式结果为true和没有表达式的出度逐一包装成异步任务提交到线程池执行

    🟢 提交完异步任务的线程会继续执行任务栈中的其他节点任务,不会再顾及异步网关出度及出度之后的节点,直至任务栈中没有了可执行节点时线程会归还至线程池,等待下一次被调用

    🟢 线程池随机选择线程执行上述流程中创建的异步任务

# 4.3.2 异步的结束

    什么是异步?异步就是同一时间多个线程去做了多个事情,以此来节省需要一个线程做多个事情所花费的时间。这种模式有点像算法里面的空间换取时间的味道。Kstry中什么时候结束开启后的异步任务呢?答案是多个可以同一时间执行的异步任务被聚合网关聚合后异步流程就结束了。比如:

    🟢 一个流程被并行网关拆分成了两个异步任务,这两个异步任务都遇到同一聚合节点后,任务就被聚合节点归拢了,异步任务也就结束了

    🟢 一个流程被并行网关拆分成了两个异步任务,两个异步任务又分别拆分出了两个异步任务,只有四个异步任务全部聚合时,才算真正意义上的异步流程结束,如下的流程定义是允许的:

image-20211215163047312

当前聚合节点的元素有:

    🟢 并行网关

    🟢 包含网关

    🟢 结束事件

image-20211215163704815

聚合节点可以随心所欲的聚合多个流程。除聚合节点外的其他节点元素,只允许接收一个入度

TIP

之所以除聚合节点外的其他节点元素,只允许接收一个入度是因为:存在多个入度时,如果这些入度是异步的就会有多个线程执行到这个聚集点,假设Task节点可以支持多入度,那么Task节点就可能被执行多次。因为Task节点不具备聚合节点的能力,不能让前面的流程停下来等待全部流程都到达后才继续。换句话说只有聚合节点才能支持多入度

# 4.4 Reactor异步模型

TIP

开启了异步后是否程序就足够健壮了,就可以支持高流量请求了?答案是否定的,看下面的例子

    还是上面的商品显示流程,其他节点的 sleep 模拟耗时全部清除,只留下“获取店铺信息” sleep 500ms 来模拟调用店铺接口时的耗时

@TaskService(name = "get-shopInfo-goodsId")
public ShopInfo getShopInfoByGoodsId(@ReqTaskParam("id") Long goodsId) throws InterruptedException {
    TimeUnit.MILLISECONDS.sleep(500L);
    ShopInfo shopInfo = goodsIdShopInfoMapping.get(goodsId);
    log.info("goods id: {}, getShopInfoByGoodsId: {}", goodsId, JSON.toJSONString(shopInfo));
    return shopInfo;
}
1
2
3
4
5
6
7

进行一波小流量压测:

2021-12-15_003756

测试结果:

2021-12-15_003737

报错如下:

cn.kstry.framework.core.exception.TaskAsyncException: [K1060002] Asynchronous node task timeout!
    at cn.kstry.framework.core.exception.KstryException.buildException(KstryException.java:88)
    at cn.kstry.framework.core.engine.AsyncTaskCell.get(AsyncTaskCell.java:107)
    at cn.kstry.framework.core.engine.StoryEngine.doFire(StoryEngine.java:184)
    at cn.kstry.framework.core.engine.StoryEngine.fire(StoryEngine.java:90)
    at cn.kstry.demo.web.GoodsController.showGoods(GoodsController.java:49)

Caused by: java.util.concurrent.TimeoutException: Async task timeout! maximum time limit: 3000ms, block task count: 1, block task: [Flow_0rl59u8]
    at cn.kstry.framework.core.engine.AsyncTaskCell.get(AsyncTaskCell.java:98)
    ... 52 common frames omitted
1
2
3
4
5
6
7
8
9
10

2000个样本,失败率91.1%,服务基本是不可用状态。为什么会如此呢,难道是线程池队列满了?查看线程池日志:

image-20211215170948367

    核心线程数16个,最大线程数32个。整个测试下来,工作线程数一直等于核心线程数,任务队列也没满,所以可以确定不是线程池本身的问题

    其实最根本的原因是出在了 sleep 500ms 的地方,工作线程都被占用了!可以分析下原理:

    🟢 少量请求进来时,创建核心线程处理

    🟢 核心线程达到阈值后,再有请求进来会放入阻塞队列

    🟢 理论上线程池中阻塞队列满后才会再次创建线程。但是少量压测请求并未达到阻塞队列的上限,所以工作线程一直是核心线程16个

    🟢 可以大概将线程处理一个请求的时间看作500ms,一秒之内16个线程可以处理32个请求,其他任务都被放入了队列

    🟢 放入队列的任务等待3s后,会被超时中断,请求失败。这是大量异常出现的原因,并非队列满了而是等待超时了

    🟢 如果我们增大超时时间,不难想象,队列任务数会增加。如果任务量足够大、超时时间足够长时,线程池队列也会溢出

    要解决上述问题,就得从本质出发:工作线程将时间花费在了 sleep 500ms(比如调用外部接口)上,限制了任务的吞吐量。所以耗时的任务就应该交出去,比如在远程调用其他服务时,可以换成使用NIO的方式调用服务端接口,接口返回结果后再通知任务线程。Kstry中这样去做:

// 升级前获取店铺信息的任务
@TaskService(name = "get-shopInfo-goodsId")
public ShopInfo getShopInfoByGoodsId(@ReqTaskParam("id") Long goodsId) throws InterruptedException {
    TimeUnit.MILLISECONDS.sleep(500L);
    ShopInfo shopInfo = goodsIdShopInfoMapping.get(goodsId);
    log.info("goods id: {}, getShopInfoByGoodsId: {}", goodsId, JSON.toJSONString(shopInfo));
    return shopInfo;
}

// 升级后获取店铺信息的任务
@TaskService(name = "get-shopInfo-goodsId", targetType = ShopInfo.class)
public Mono<ShopInfo> getShopInfoByGoodsId(@ReqTaskParam("id") Long goodsId) {
    CompletableFuture<ShopInfo> future = new CompletableFuture<>();
    list.add(future);
    return Mono.fromFuture(future);
}

@Scheduled(fixedDelay = 500)
public void init() {
    List<CompletableFuture<ShopInfo>> completableFutures = Lists.newArrayList(list);
    list.clear();
    for (CompletableFuture<ShopInfo> cf : completableFutures) {
      ShopInfo shopInfo = goodsIdShopInfoMapping.get(1L);
      log.info("goods id: {}, getShopInfoByGoodsId: {}", 1, JSON.toJSONString(shopInfo));
      cf.complete(shopInfo);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

TIP

创建一个每 500ms 执行一次的定时器,拿到获取店铺信息时保存的 CompletableFuture 列表并将其中任务完成,以此来模拟获取店铺信息耗时在500ms以内

原来任务做升级:

    🟢 返回值 ShopInfo 被 Mono 包装

    🟢 @TaskService注解增加属性配置:targetType = ShopInfo.class。之所以如此是因为结果的处理是通过解析方法结果类上的注解来做的,Java编译后泛型会被擦除,找不到返回值类型,无法解析判断处理返回结果的逻辑,所以返回值类型需要显示指定

注意:

Kstry只支持 Mono,暂不支持 Flux

    线程再次调用获取店铺信息节点时会立刻返回Mono。等异步完成任务后再反向通知流程继续

同样的压测参数,重新压测:

2021-12-15_005209

将获取店铺信息的耗时模拟至1000ms再次测试:

2021-12-15_005339

    可见,问题解决了

    Kstry 的 StoryEngine 还提供了异步调用入口,返回Mono,可与支持 Reactor 模型的 web服务器无缝衔接。比如:SpringFlux

public <T> Mono<T> fireAsync(StoryRequest<T> storyRequest) {
    try {
        MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, GlobalUtil.getOrSetRequestId(storyRequest));
        preProcessing(storyRequest);
        return doFireAsync(storyRequest);
    } finally {
        MDC.clear();
    }
}
1
2
3
4
5
6
7
8
9

# 4.5 线程切换钩子

TIP

框架提供了线程间切换时的回调钩子,用来传递线程切换前后的数据。钩子只对Kstry框架中执行的任务生效

使用线程切换钩子

@Component
public class SwitchHook implements ThreadSwitchHook<String> {

    public static final ThreadLocal<String> ITERATOR_THREAD_LOCAL = new ThreadLocal<>();

    @Override
    public String getPreviousData(ScopeDataQuery scopeDataQuery) {
        return ITERATOR_THREAD_LOCAL.get();
    }

    @Override
    public void usePreviousData(String data, ScopeDataQuery scopeDataQuery) {
        ITERATOR_THREAD_LOCAL.set(data);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

    🟢 用ITERATOR_THREAD_LOCAL.set()在调用engine.fire()前设置一个字符串,在后面的任意节点上都可以用SwitchHook.ITERATOR_THREAD_LOCAL.get()拿到

    🟢 任务执行期间发生线程切换时,实现ThreadSwitchHook接口的Spring组件就会被调用

    🟢 线程切换前调用getPreviousData(ScopeDataQuery scopeDataQuery)获取到要保存的值,线程切换后再调用usePreviousData(String data, ScopeDataQuery scopeDataQuery)将切换前拿到的数据传入,用以后续的使用

    🟢 ThreadSwitchHook组件出现多个时,重写getOrder()方法即可支持排序