Java CompletableFuture 异步超时实现探索

2023-05-27 0 606

译者:天猫信息技术张一峰

序言

JDK 8是一场关键性的回退,追加了十分多的优点,当中众所周知即是 CompletableFuture。从此从 JDK 微观或者说象征意义上的全力支持了如前所述该事件的触发器程式设计本体论,填补了 Future 的瑕疵。

在他们的日常生活强化中,最常见方式即是多处置器博戈达继续执行。这时就会牵涉到 CompletableFuture 的采用。

常见采用形式

上面总括两个常见情景。

倘若他们有三个 RPC 远距初始化服务项目

public static void main(String[] args){ //各项任务 A,费时2 秒 int resultA = compute(1);//各项任务 B,费时2 秒 int resultB = compute(2);//先期销售业务方法论处置 System.out.println(resultA + resultB);}

能估计到,以太网继续执行最多费时4 秒,因此 B 各项任务并不倚赖 A 各项任务结论。

对此种情景,他们一般来说会优先选择博戈达的形式强化,Demo 标识符如下表所示:

public static void main(String[] args){ //仅单纯总括,在制造标识符中可别那么写!//统计数据费时的表达式 time(()->{ CompletableFuture result = Stream.of(1,2)//建立触发器各项任务.map(x -> CompletableFuture.supplyAsync(()-> compute(x), executor))//裂解.reduce(CompletableFuture.completedFuture(0),(x, y)-> x.thenCombineAsync(y, Integer::sum, executor));//等候结论 try { System.out.println(“结果:”+ result.get());} catch (ExecutionException InterruptedException e){ System.err.println(“各项任务继续执行极度”);} });}输入:[async-1]:各项任务继续执行已经开始:1[async-2]:各项任务继续执行已经开始:2[async-1]:各项任务继续执行顺利完成:1[async-2]:各项任务继续执行顺利完成:2结论:3费时:2秒

能看见费时变为了2 秒。

存在的问题

分析

看上去 CompletableFuture 现有功能能满足他们诉求。但当他们引入一些现实常见情况时,一些潜在的不足便暴露出来了。

compute(x)如果是两个根据入参查询用户某类型优惠券列表的各项任务,他们需要查询两种优惠券并组合在一起返回给上游。倘若上游要求他们2 秒内处置完毕并返回结论,但 compute(x)费时却在0.5秒 ~无穷大波动。这时他们就需要把费时过长的 compute(x)各项任务结论放弃,仅处置在指定时间内顺利完成的各项任务,尽可能保证服务项目可用。

那么以上标识符的费时由费时最长的服务项目决定,无法满足现有诉求。一般来说他们会采用 get(long timeout, Time

public static void main(String[] args){ //仅单纯总括,在制造标识符中可别那么写!//统计数据费时的表达式 time(()->{ List

能看见,只要他们能够给 compute(x)设置两个延时时间将各项任务中断,结合 get、getNow 等

那么问题也就转变为了,如何给各项任务设置触发器延时时间呢?

现有做法

当触发器各项任务是两个 RPC 请求时,他们能设置两个 JSF 延时,以达到触发器延时效果。

当请求是两个 R2M 请求时,他们也能控制 R2M 连接的最大延时时间来达到效果。

那么看好像他们都是在倚赖三方中间件的能力来管理各项任务延时时间?那么就存在两个问题,中间件延时控制能力有限,如果触发器各项任务是中间件 IO 操作+ 本地计算操作怎么办?

public V get(long timeout, TimeUnit unit) throws InterruptedException {//配置的延时时间 timeout = unit.toMillis(timeout);//剩余等候时间 long remaintime = timeout .getNow();} this.setDoneTime();//延时抛出极度 throw this.clientTimeoutException(false);}

当这个各项任务刚好卡在延时边缘顺利完成时,这

某些 CPU 采用率高的情况下,就会出现触发器各项任务没能触发抛出极度中断,导致他们无法准确控制延时时间。对上游来说,本次请求全部失败。

解决形式

JDK 9

这类问题十分常见,如大促情景,服务项目器 CPU 瞬间升高就会出现以上问题。

那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture 正式提供了 orTimeout、completeTimeout 方法,来准确同时实现触发器延时控制。

public CompletableFuture orTimeout(long timeout, TimeUnit unit){ if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this;}

JDK 9 orTimeout 其同时实现原理是通过两个定时各项任务,在给定时间之后抛出极度。如果各项任务在指定时间内顺利完成,则取消抛极度的操作。

以上标识符他们按继续执行顺序来看下:

首先继续执行 new Timeout(this)。

static final class Timeout implements Runnable { final CompletableFuture f; Timeout(CompletableFuture f){ this.f = f;} public void run(){ if (f != null &&!f.isDone())//抛出延时极度 f.completeExceptionally(new TimeoutException());}}

通过源码能看见,Timeout 是两个同时实现 Runnable 的类,run()方法负责给传入的触发器各项任务通过 completeExceptionally CAS 赋值极度,将各项任务标记为极度顺利完成。

那么谁来触发这个 run()方法呢?他们看下 Delayer 的同时实现。

static final class Delayer { static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit){ //到时间触发 command 各项任务 return delayer.schedule(command, delay, unit);} static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r){ Thread t = new Thread(r); t.setDaemon(true); t.setName(“CompletableFutureDelayScheduler”); return t;} } static final ScheduledThreadPoolExecutor delayer; static {(delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true);}}

Delayer 其实就是两个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit)通过 ScheduledThreadPoolExecutor 同时实现指定时间后触发 Timeout 的 run()方法。

到这里就已经同时实现了延时抛出极度的操作。但当各项任务顺利完成时,就没必要触发 Timeout 了。因此他们还需要同时实现两个取消方法论。

static final class Canceller implements BiConsumer{ final Future f; Canceller(Future f){ this.f = f;} public void accept(Object ignore, Throwable ex){ if (ex == null && f != null &&!f.isDone())//3 未触发抛极度各项任务则取消 f.cancel(false);}}

当各项任务继续执行顺利完成,或者各项任务继续执行极度时,他们也就没必要抛出延时极度了。因此他们能把 delayer.schedule(command, delay, unit)返回的定时延时各项任务取消,不再触发 Timeout。当他们的触发器各项任务顺利完成,因此定时延时各项任务未顺利完成的时候,就是他们取消的时机。因此他们能通过 whenComplete(BiConsumer action)来顺利完成。

Canceller 就是两个 BiConsumer 的同时实现。其持有了 delayer.schedule(command, delay, unit)返回的定时延时各项任务,accept(Object ignore, Throwable ex)同时实现了定时延时各项任务未顺利完成后,继续执行 cancel(boolean mayInterruptIfRunning)取消各项任务的操作。

JDK 8

如果他们采用的是 JDK 9或以上,他们能直接用 JDK 的同时实现来顺利完成触发器延时操作。那么 JDK 8怎么办呢?

其实他们也能根据上述方法论单纯同时实现两个工具类来辅助。

以下是他们营销自己的工具类以及用法,贴出来给大家作为参考,大家也能自己写的更优雅一些~

初始化形式:

CompletableFutureExpandUtils.orTimeout(触发器各项任务,延时时间,时间单位);

工具类源码:

package com.jd.jr.market.reduction.util;import com.jdpay.market.common.exception.UncheckedException;import java.util.concurrent.*;import java.util.function.BiConsumer;/*** CompletableFuture 扩展工具* *@author zhangtianci7*/public class CompletableFutureExpandUtils {/*** 如果在给定延时之前未顺利完成,则极度顺利完成此 CompletableFuture 并抛出{@link TimeoutException}。 ** @param timeout 在出现 TimeoutException 极度顺利完成之前等候多长时间,以{@code unit}为单位* @param unit 两个{@link TimeUnit},结合{@code timeout}参数,表示给定粒度单位的持续时间* @return 入参的 CompletableFuture */ public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit unit){ if (null == unit){ throw new UncheckedException(“时间的给定粒度不能为空”);} if (null == future){ throw new UncheckedException(“触发器各项任务不能为空”);} if (future.isDone()){ return future;} return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));} /*** 延时时极度顺利完成的操作*/ static final class Timeout implements Runnable { final CompletableFuture future; Timeout(CompletableFuture future){ this.future = future;} public void run(){ if (null != future &&!future.isDone()){ future.completeExceptionally(new TimeoutException());} }} /*** 取消不需要的延时的操作*/ static final class Canceller implements BiConsumer{ final Future future; Canceller(Future future){ this.future = future;} public void accept(Object ignore, Throwable ex){ if (null == ex && null != future &&!future.isDone()){ future.cancel(false);} }} /*** 单例延迟调度器,仅用于启动和取消各项任务,两个线程就足够*/ static final class Delayer { static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit){ return delayer.schedule(command, delay, unit);} static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r){ Thread t = new Thread(r); t.setDaemon(true); t.setName(“CompletableFutureExpandUtilsDelayScheduler”); return t;} } static final ScheduledThreadPoolExecutor delayer; static { delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); delayer.setRemoveOnCancelPolicy(true);} }}

参考资料

JEP 266: JDK 9并发包更新提案

Java CompletableFuture 异步超时实现探索

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务