日常
项目场景:
KC 项目是和多个网约车供应商合作,通过供应商提供的 API 接口请求叫车。由于与多个供应商合作,需求同一时间可叫多个供应商,但若采取同步叫车将会导致等待时间过长,因此叫车时采用多线程异步叫车。KC 前端 H5 请求接口叫车进入 Loading 状态,后台设置超时时间,在设定时间范围内,全部供应商响应完成或者超时返回,只要有一个或一个以上的供应商叫车成功,否则则返回叫车失败,用户重新叫车。
叫车成功后由定时任务去请求服务商接口更新订单状态,且由相关策略去进行取消重新叫车。同一时间,当有只一个司机接单则取消其他单,若有多个司机接单根据策略中的供应商、费用和乘客选择进行关闭,只留下一个订单。
以上就是该项目的叫车场景。
问题描述:
2021 年 11 月某个晚上发版后,组长根据产品要求放开某城市叫车,结果用户在 H5 页面下单时,供应商 CC 有多个司机接单,在取消单后遭遇司机打电话辱骂,甚至个别遭遇人身安全威胁。于是乎当晚紧急关闭叫车服务,可见事情的严重性。
第二天来到工位,和产品聊了下昨晚的事情,产品说到 “KC 现在还未定位到问题出在哪,说不定就快和组长说再见了”。我听到如此,刚好昨晚发版暂时没什么重要的事做,也开始看起曾经修改过的 KC 项目。
原因分析:
我在 KC 项目接入过网约车供应商 RQ,项目的代码结构还行,只需要实现对应的网约车接口即可,但也整体的叫车流程有个大概。
最开始我怀疑是策略的问题,因为昨晚发版在后管改动了用车策略,然后就发生了这个问题。但看策略代码没什么问题,最后结合日志一起才定位到问题点。
叫车代码经优化如下:
package xianzhan.j17;import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;/*** JDK 17** @author xianzhan*/
public class Main {private static final int CORE_NUM = Runtime.getRuntime().availableProcessors();private static final int CORE_POOL_SIZE = (CORE_NUM << 4) + (CORE_NUM << 2);private static final int MAX_POOL_SIZE = CORE_POOL_SIZE << 2;/*** 具体配置根据项目实际需求设置*/private static final Executor EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(MAX_POOL_SIZE),Thread::new,(r, e) -> e.execute(r));private static final ScheduledExecutorService SCHEDULED_FUTURE = new ScheduledThreadPoolExecutor(1,r -> {Thread t = new Thread(r);t.setDaemon(true);t.setName("Timeout");return t;},(r, e) -> e.execute(r));public static CompletableFuture<OrderCreateOut> orderCreate(OrderCreateIn in) {return CompletableFuture.supplyAsync(() -> {int spendTime = (int) (Math.random() * 10);System.out.printf("订单 - 开始创建。threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());try {// 模拟业务执行TimeUnit.SECONDS.sleep(spendTime);} catch (Exception e) {e.printStackTrace();}System.out.printf("订单 - 结束创建。threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());return new OrderCreateOut(in.supplierCode(), LocalDateTime.now().toString());}, EXECUTOR);}public static <T> CompletableFuture<T> timeout(long timeout, TimeUnit unit, String message) {CompletableFuture<T> ret = new CompletableFuture<>();SCHEDULED_FUTURE.schedule(() -> ret.completeExceptionally(new TimeoutException(message)), timeout, unit);return ret;}public static void main(String[] args) {System.out.println("main start: " + LocalDateTime.now());CallingStatus status = new CallingStatus();List<OrderCreateIn> orderCreateIns = List.of(new OrderCreateIn("DD"),new OrderCreateIn("CC"),new OrderCreateIn("SQ"));@SuppressWarnings("unchecked")CompletableFuture<OrderCreateOut>[] futureList = orderCreateIns.stream().map(Main::orderCreate).peek(future -> future.thenAccept(out -> status.setSuccess(true))).toArray(CompletableFuture[]::new);CompletableFuture<Void> all = CompletableFuture.allOf(futureList);CompletableFuture<Void> timeout = timeout(15, TimeUnit.SECONDS, "叫车超时");CompletableFuture<Void> done = all.applyToEither(timeout, Function.identity());try {done.get();} catch (Exception e) {// 日志工具记录异常信息e.printStackTrace();}System.out.printf("用户创建订单是否成功:%s%n", status.isSuccess());System.out.println("main end: " + LocalDateTime.now());}/*** @param supplierCode 供应商代码*/private static record OrderCreateIn(String supplierCode) {}/*** @param supplierCode 供应商代码* @param orderId 创建成功供应商返回的订单 id*/private static record OrderCreateOut(String supplierCode, String orderId) {}private static class CallingStatus {/*** 只要有一个供应商成功即成功*/private volatile boolean success;public boolean isSuccess() {return success;}public void setSuccess(boolean success) {this.success = success;}}
}
上面代码乍一看没什么问题,就是异步请求供应商接口,然后设置超时,若时间范围内没有一个供应商成功则返回失败。问题点就出现在没有处理超时后的成功订单!
当我们使用多线程时就不能用同步思维来写代码了,乍一看超时返回了那么就不会继续执行了,但多线程是会的,所以我们需要处理超时的订单。
解决方案:
package xianzhan.j17;import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;/*** JDK 17** @author xianzhan*/
public class Main {private static final int CORE_NUM = Runtime.getRuntime().availableProcessors();private static final int CORE_POOL_SIZE = (CORE_NUM << 4) + (CORE_NUM << 2);private static final int MAX_POOL_SIZE = CORE_POOL_SIZE << 2;/*** 具体配置根据项目实际需求设置*/private static final Executor EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(MAX_POOL_SIZE),Thread::new,(r, e) -> e.execute(r));private static final ScheduledExecutorService SCHEDULED_FUTURE = new ScheduledThreadPoolExecutor(1,r -> {Thread t = new Thread(r);t.setDaemon(true);t.setName("Timeout");return t;},(r, e) -> e.execute(r));public static CompletableFuture<OrderCreateOut> orderCreate(OrderCreateIn in) {return CompletableFuture.supplyAsync(() -> {int spendTime = (int) (Math.random() * 10);System.out.printf("订单 - 开始创建。threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());try {// 模拟业务执行TimeUnit.SECONDS.sleep(spendTime);} catch (Exception e) {e.printStackTrace();}System.out.printf("订单 - 结束创建。threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());return new OrderCreateOut(in.supplierCode(), LocalDateTime.now().toString());}, EXECUTOR);}public static <T> CompletableFuture<T> timeout(long timeout, TimeUnit unit, String message) {CompletableFuture<T> ret = new CompletableFuture<>();SCHEDULED_FUTURE.schedule(() -> ret.completeExceptionally(new TimeoutException(message)), timeout, unit);return ret;}public static void main(String[] args) {System.out.println("main start: " + LocalDateTime.now());CallingStatus status = new CallingStatus();List<OrderCreateIn> orderCreateIns = List.of(new OrderCreateIn("DD"),new OrderCreateIn("CC"),new OrderCreateIn("SQ"));@SuppressWarnings("unchecked")CompletableFuture<OrderCreateOut>[] futureList = orderCreateIns.stream()// 过滤超时的供应商,防止再次超时,导致在供应商创建多个订单// .filter(out -> !timeoutSupplierCode.contains(out.supplierCode())).map(Main::orderCreate).peek(future -> future.thenAccept(out -> {status.setSuccess(true);if (status.isTimeout()) {/** 记录请求超时的供应商,并对超时订单进行处理* 1. 在未处理完该超时订单,该供应商将不再请求* 2. 取消超时订单,因为未参与策略选择*/System.out.printf("保存处理超时供应商(%s)和订单(%s)%n", out.supplierCode(), out.orderId());}})).toArray(CompletableFuture[]::new);CompletableFuture<Void> all = CompletableFuture.allOf(futureList);CompletableFuture<Void> timeout = timeout(6, TimeUnit.SECONDS, "叫车超时");CompletableFuture<Void> done = all.applyToEither(timeout, Function.identity());try {done.get();} catch (Exception e) {// 日志工具记录异常信息e.printStackTrace();status.setTimeout(true);}System.out.printf("用户创建订单是否成功:%s%n", status.isSuccess());System.out.println("main end: " + LocalDateTime.now());}/*** @param supplierCode 供应商代码*/private static record OrderCreateIn(String supplierCode) {}/*** @param supplierCode 供应商代码* @param orderId 创建成功供应商返回的订单 id*/private static record OrderCreateOut(String supplierCode, String orderId) {}private static class CallingStatus {/*** 只要有一个供应商成功即成功*/private volatile boolean success;/*** 超时*/private volatile boolean timeout;public boolean isSuccess() {return success;}public void setSuccess(boolean success) {this.success = success;}public boolean isTimeout() {return timeout;}public void setTimeout(boolean timeout) {this.timeout = timeout;}}
}
主要修改了三个地方
CallingStatus
添加volatile boolean timeout
超时判断- 创建订单
CompletableFuture<OrderCreateOut> future
使用thenAccept()
方法进行超时订单处理 - 利用
filter()
过滤上一次留下的超时供应商,直至该超时单由策略取消成功
最后定位到问题,根据请求供应商时间和供应商响应时间日志证据,并带上解决方案给了组长。
总结
在使用多线程时要多多注意,使用并行思维,不然未处理的情况导致公司损失钱财很严重!