gongstring技术博客
最新文章
源码解读
软件安装
常见问题
大数据
常用工具
鸡汤文
备案号:鄂ICP备15015839号-1
鄂公网安备 42010202001692号
JDK8新特性之并行编程类CompletableFuture示例
2021-03-06 22:31:07
作者: gongstring
源码解读
/
JDK8新特性之并行编程类CompletableFuture示例
# 1. 需求背景 在业务编写过程中,经常会遇到事务较长的任务。例如接口用途是用户登录完成后,需要查询用户信息,其中包括:登录信息、基础信息、权限、账户余额等。如果按照顺序执行方式,则整个接口执行时间总长度等于各个子任务(或方法)时长的总和。而为了提高系统吞吐能力,业务系统一般会添加超时机制(例如:如果5S内接口没有执行完毕,将会自动超时),势必导致接口调用失败率提升,影响用户体验。 为了解决上述问题,除了进行业务拆分或增加缓存等传统方式,异步并行编程不失为重要的解决办法之一,在JAVA的异步编程世界中包含很多可选方案:CompletableFuture、RxJava等。本文将基于JDK8的新特性CompletableFuture讲解如何进行异步编程,对于RxJava的对比及使用不做赘述。 本文不讨论并行和并发的区别,如果读者有兴趣,可以翻阅`《Java并发实现原理》`及`《RxJava反应式编程》`等书。 ## 1.1 场景分类 并行编程场景大致分类如下:  本文假设有一个接口,传入参数userid,需要返回如下用户信息,其中每个字段分别来自于不同方法(可以理解成是分别来自不同的数据源或数据表,需要分开查询)的返回值,使用并行执行的方式提升接口性能,分别基于CompletableFuture实现编写上面四种场景的代码示例。 ```java package com.gongstring.jdk.bean; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; import java.util.Map; /** * 用户信息 * @author gongstring
* @date 2021-03-06 18:25:22 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class UserInfo implements Serializable { /** * 用户姓名 */ private String username; /** * 收件地址列表 */ private List
addresses; /** * 子女基本信息 */ private List
> childrenList; /** * 订阅期刊列表 */ private List
subscribes; } ``` # 2. 代码示例 接下来将分别编写实现上述业务场景的示例代码,为了方便通过日志查看执行流程,在代码中添加sleep方法,用于模拟接口响应时间不一致。 下面是用于模拟任务执行的工具类,为了达到区分差异的效果,分别添加了等待代码,并且等待时间不一致: ```java package com.gongstring.jdk; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; /** * 模拟请求用户信息接口 * @author gongstring
* @date 2021-03-06 18:25:22 */ public class CompletableFutureUserUtil { private static final String LOG_TPL = "Tag:[%s],当前时间:[%s]"; /** * 根据userId查询用户姓名 * @param userId * @return */ public static String getUserName(String userId){ if(!"gongstring".equals(userId)) return ""; printLog("getUserName start"); try { //模拟接口请求花费1s Thread.sleep(1 * 1000); }catch (Exception e){ } printLog("getUserName end"); return "野渔"; } /** * 获取用户收件地址 * @param userId * @return */ public static List
getAddresses(String userId){ if(!"gongstring".equals(userId)) return new ArrayList<>(); printLog("getAddresses start"); try { //模拟接口请求花费3s Thread.sleep(3 * 1000); }catch (Exception e){ } printLog("getAddresses end"); return Arrays.asList("武汉市江岸区","武汉市江夏区"); } /** * 获取用户收件地址 * @param userId * @return * @throws Exception */ public static List
getAddressesWithException(String userId) throws Exception{ if(!"gongstring".equals(userId)) return new ArrayList<>(); printLog("getAddressesWithException start"); //模拟接口请求花费2s Thread.sleep(2 * 1000); int a = 0; int b = a/0;//故意抛出异常 //模拟接口请求花费1s Thread.sleep(1 * 1000); printLog("getAddressesWithException end"); return Arrays.asList("武汉市江岸区","武汉市江夏区"); } /** * 查询子女信息 * @param userId * @return */ public static List
> getChildren(String userId){ if(!"gongstring".equals(userId)) return new ArrayList<>(); printLog("getChildren start"); try { //模拟接口请求花费4s Thread.sleep(4 * 1000); }catch (Exception e){ } printLog("getChildren end"); Map
firstChild = new HashMap<>(); firstChild.put("name","李雷"); firstChild.put("sex","男"); Map
secondChild = new HashMap<>(); secondChild.put("name","韩梅梅"); secondChild.put("sex","女"); return Arrays.asList(firstChild,secondChild); } public static List
getSubscribes(String userId){ if(!"gongstring".equals(userId)) return new ArrayList<>(); printLog("getSubscribes start"); try { //模拟接口请求花费6s Thread.sleep(6 * 1000); }catch (Exception e){ } printLog("getSubscribes end"); return Arrays.asList("Chain Daily","Global Times"); } public static void printLog(String tag){ System.out.println(String.format(LOG_TPL,tag, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))); } } ``` * username方法耗时:`1s` * addresses方法耗时:`3s` * children方法耗时:`4s` * subscribes方法耗时:`6s` **CompletableFuture常用静态方法介绍:** `supplyAsync`:用于有返回值的任务 `runAsync`:用于没有返回值的任务 `Executor`参数可以手动指定线程池,否则默认使用ForkJoinPool.commonPool()系统级公共线程池(创建的线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止) **取值方法介绍**: ```javascript public T get() //Futrue的方法 阻塞 public T get(long timeout, TimeUnit unit) //Futrue的方法 阻塞。 推荐使用 // 新提供的方法 public T getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有) public T join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用 ``` * 注意:为了防止线程不会自动结束,在本文中推荐使用get(long timeout, TimeUnit unit) ## 2.1 所有任务串行执行 下面是模拟串行编程,依次执行方法示例: ```java /** * 同步执行请求 */ @Test public void syncTest(){ final String userId = "gongstring"; long startTime = System.currentTimeMillis(); //定义Future对象 UserInfo userInfo = UserInfo.builder() .username(CompletableFutureUserUtil.getUserName(userId)) .addresses(CompletableFutureUserUtil.getAddresses(userId)) .children(CompletableFutureUserUtil.getChildren(userId)) .subscribes(CompletableFutureUserUtil.getSubscribes(userId)) .build(); CompletableFutureUserUtil.printLog("方法结束"); System.out.println("用户信息:"+ JSON.toJSONString(userInfo)); System.out.println(String.format("串行同步接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` Tag:[getUserName start],当前时间:[2021-03-06 21:21:15] Tag:[getUserName end],当前时间:[2021-03-06 21:21:16] Tag:[getAddresses start],当前时间:[2021-03-06 21:21:16] Tag:[getAddresses end],当前时间:[2021-03-06 21:21:19] Tag:[getChildren start],当前时间:[2021-03-06 21:21:19] Tag:[getChildren end],当前时间:[2021-03-06 21:21:23] Tag:[getSubscribes start],当前时间:[2021-03-06 21:21:23] Tag:[getSubscribes end],当前时间:[2021-03-06 21:21:29] Tag:[方法结束],当前时间:[2021-03-06 21:21:29] 用户信息:{"addresses":["武汉市江岸区","武汉市江夏区"],"children":[{"sex":"男","name":"李雷"},{"sex":"女","name":"韩梅梅"}],"subscribes":["Chain Daily","Global Times"],"username":"野渔"} 串行同步接口执行总耗时:14097毫秒 ``` 可以看的很明显,代码依次调用各个字段数据对应的方法,总耗时也等于各个方法等待时长之和:`14s`(1+3+4+6) ## 2.2 所有任务并行执行 本示例中,将会分别通过多个方法,同时执行结束后,将结果填充到返回对象中。 ### 2.2.1 分开阻塞写法 ```java /** * 并行执行,并等待所有方法执行结束,返回数据 * @throws Exception */ @Test public void execute() throws Exception{ final String userId = "gongstring"; //超时时间 long timeout = 10; long startTime = System.currentTimeMillis(); //定义带返回值的Future对象 final CompletableFuture
usernameFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getUserName(userId)); final CompletableFuture
> addressesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getAddresses(userId)); final CompletableFuture
>> childrenFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getChildren(userId)); final CompletableFuture
> subcribesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getSubscribes(userId)); UserInfo userInfo = UserInfo.builder() .username(usernameFuture.get(timeout, TimeUnit.SECONDS)) .addresses(addressesFuture.get(timeout, TimeUnit.SECONDS)) .children(childrenFuture.get(timeout, TimeUnit.SECONDS)) .subscribes(subcribesFuture.get(timeout, TimeUnit.SECONDS)) .build(); CompletableFutureUserUtil.printLog("方法结束"); System.out.println("用户信息:"+ JSON.toJSONString(userInfo)); System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` Tag:[getChildren start],当前时间:[2021-03-06 21:21:53] Tag:[getSubscribes start],当前时间:[2021-03-06 21:21:53] Tag:[getAddresses start],当前时间:[2021-03-06 21:21:53] Tag:[getUserName start],当前时间:[2021-03-06 21:21:53] Tag:[getUserName end],当前时间:[2021-03-06 21:21:54] Tag:[getAddresses end],当前时间:[2021-03-06 21:21:56] Tag:[getChildren end],当前时间:[2021-03-06 21:21:57] Tag:[getSubscribes end],当前时间:[2021-03-06 21:21:59] Tag:[方法结束],当前时间:[2021-03-06 21:21:59] 用户信息:{"addresses":["武汉市江岸区","武汉市江夏区"],"children":[{"sex":"男","name":"李雷"},{"sex":"女","name":"韩梅梅"}],"subscribes":["Chain Daily","Global Times"],"username":"野渔"} 并行执行接口执行总耗时:6098毫秒 ``` 示例总耗时花费`6.098s`,约等于最长的方法getSubscribes的时长`6s`,说明整个接口的耗时将取决于耗时最长的代码逻辑,并且各个方法是并行执行。 ### 2.2.2 合并阻塞写法 ``` /** * 并行,所有任务执行结束,总任务才结束 * @throws Exception */ @Test public void allEnd() throws Exception{ final String userId = "gongstring"; //超时时间 long timeout = 10; long startTime = System.currentTimeMillis(); //定义Future对象 final CompletableFuture
usernameFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getUserName(userId)); usernameFuture.whenCompleteAsync((username,e) ->{ //todo:执行异步操作结果 }); final CompletableFuture
> addressesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getAddresses(userId)); addressesFuture.whenCompleteAsync((addresses,e) ->{ //todo:执行异步操作结果 }); CompletableFuture
allOf = CompletableFuture.allOf(usernameFuture,addressesFuture); allOf.get(timeout, TimeUnit.SECONDS); //阻塞等待任务结束 CompletableFutureUserUtil.printLog("方法结束"); System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` Tag:[getAddresses start],当前时间:[2021-03-06 22:11:32] Tag:[getUserName start],当前时间:[2021-03-06 22:11:32] Tag:[getUserName end],当前时间:[2021-03-06 22:11:33] Tag:[getAddresses end],当前时间:[2021-03-06 22:11:35] Tag:[方法结束],当前时间:[2021-03-06 22:11:35] 并行执行接口执行总耗时:3102毫秒 ``` * 备注:allOf方法适用于异步数据订阅消费类的场景,如果用于返回数据给外部(例如2.2.1中的结果),可能会出现数据不完整情况。 ## 2.3 并行且任一结束 本示例演示,并行执行的几个方法中,有任一任务执行完,则认为全部执行结束。有一个典型场景匹配,例如:如果需要查询IP地址所属地区,可以调用互联网公共API结果,为了确保服务的稳定运行,可以查询多个开放接口,不同的api提供方查询性能不一,只要有任意一个有返回值,认为查询结束。 ```java /** * 并行,任一任务结束,总任务结束 * @throws Exception */ @Test public void anyEnd() throws Exception{ final String userId = "gongstring"; //超时时间 long timeout = 10; long startTime = System.currentTimeMillis(); final UserInfo userInfo = new UserInfo(); //定义Future对象 final CompletableFuture
usernameFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getUserName(userId)); usernameFuture.whenCompleteAsync((username,e) ->{ userInfo.setUsername(username); }); final CompletableFuture
> addressesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getAddresses(userId)); addressesFuture.whenCompleteAsync((addresses,e) ->{ userInfo.setAddresses(addresses); }); CompletableFuture
anyOf = CompletableFuture.anyOf(usernameFuture,addressesFuture); anyOf.get(timeout, TimeUnit.SECONDS); //阻塞等待任务结束 CompletableFutureUserUtil.printLog("方法结束"); System.out.println("用户信息:"+ JSON.toJSONString(userInfo)); System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` Tag:[getAddresses start],当前时间:[2021-03-06 22:09:07] Tag:[getUserName start],当前时间:[2021-03-06 22:09:07] Tag:[getUserName end],当前时间:[2021-03-06 22:09:08] Tag:[方法结束],当前时间:[2021-03-06 22:09:08] 用户信息:{"username":"野渔"} 并行执行接口执行总耗时:1091毫秒 ``` 根据结果,可以看到,第一个方法执行结束时,整个方法执行结束。 ## 2.4 任一任务异常任务全部结束 本示例演示,子任务中,任一任务结束,整个任务直接结束。例如我们在并行读取用户相关信息时,任何一个方法抛出异常,则直接接口抛异常,方法调用结束。 ### 2.4.1 基于2.2.1示例 ```java /** * 并行执行,有任一方法执行完毕,直接完成(添加异常) */ @Test public void applyEitherWithException() { long startTime = System.currentTimeMillis(); try { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3 * 1000); // 休息3秒钟 int a = 0; int b= a/0; //故意抛出异常 } catch (InterruptedException e) { e.printStackTrace(); } return " 第一个操作睡眠3秒 "; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } return " 第二个操作睡眠5秒 "; }), (r) -> { return r + "---" + " applyToEither end "; }).join(); CompletableFutureUserUtil.printLog("方法结束"); System.out.println("执行结果:"+ result); }catch (Exception e){ System.out.println("抛出异常啦:"+e.getMessage()); } System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` 抛出异常啦:java.lang.ArithmeticException: / by zero 并行执行接口执行总耗时:3006毫秒 ``` ## 2.5 任一异常不影响主任务 在执行子任务过程中,如果有抛出异常,不影响其他子任务及主任务执行逻辑,可以视为忽略局部异常 ```java /** * 并行,任一任务异常,总任务不影响 */ @Test public void anyExceptionIgnoreEnd(){ final String userId = "gongstring"; //超时时间 long timeout = 10; long startTime = System.currentTimeMillis(); final UserInfo userInfo = new UserInfo(); //定义Future对象 final CompletableFuture
usernameFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getUserName(userId)); usernameFuture.whenCompleteAsync((username,e) ->{ userInfo.setUsername(username); }); final CompletableFuture
> subcribesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getSubscribes(userId)); subcribesFuture.whenCompleteAsync((subcribes,e) ->{ userInfo.setSubscribes(subcribes); }); final CompletableFuture
> addressesFuture = CompletableFuture.supplyAsync(() -> { try { return CompletableFutureUserUtil.getAddressesWithException(userId); } catch (Exception e) { CompletableFutureUserUtil.printLog(String.format("getAddressesWithException抛出异常:%s",e.getMessage())); e.printStackTrace(); } return null; }); addressesFuture.whenCompleteAsync((addresses,e) ->{ userInfo.setAddresses(addresses); }); try { CompletableFuture
allOf = CompletableFuture.allOf(usernameFuture,subcribesFuture,addressesFuture); allOf.get(timeout, TimeUnit.SECONDS); //阻塞等待任务结束 }catch (Exception e){ CompletableFutureUserUtil.printLog(String.format("总任务抛出异常:%s",e.getMessage())); } CompletableFutureUserUtil.printLog("方法结束"); System.out.println("用户信息:"+ JSON.toJSONString(userInfo)); System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果 ``` Tag:[getAddressesWithException start],当前时间:[2021-03-06 22:25:22] Tag:[getSubscribes start],当前时间:[2021-03-06 22:25:22] Tag:[getUserName start],当前时间:[2021-03-06 22:25:22] Tag:[getUserName end],当前时间:[2021-03-06 22:25:23] Tag:[getAddressesWithException抛出异常:/ by zero],当前时间:[2021-03-06 22:25:24] java.lang.ArithmeticException: / by zero at com.gongstring.jdk.CompletableFutureUserUtil.getAddressesWithException(CompletableFutureUserUtil.java:70) at com.gongstring.jdk.CompletableFuture1Test.lambda$anyExceptionIgnoreEnd$22(CompletableFuture1Test.java:201) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Tag:[getSubscribes end],当前时间:[2021-03-06 22:25:28] Tag:[方法结束],当前时间:[2021-03-06 22:25:28] 用户信息:{"subscribes":["Chain Daily","Global Times"],"username":"野渔"} 并行执行接口执行总耗时:6096毫秒 ``` 从结果执行总时间约6s可以看出,虽然`getAddressesWithException`方法抛出异常,但是不影响主任务执行逻辑。同时通过此方法对比,也能看出,直接使用CompletableFuture.allOf时,如果抛出异常,主任务不会直接退出。 ## 2.6 方法抛出异常影响主任务 ``` /** * 并行,任一任务异常,总任务结束 */ @Test public void anyExceptionEnd(){ final String userId = "gongstring"; //超时时间 long timeout = 10; long startTime = System.currentTimeMillis(); final UserInfo userInfo = new UserInfo(); //定义Future对象 final CompletableFuture
> subcribesFuture = CompletableFuture.supplyAsync(() -> CompletableFutureUserUtil.getSubscribes(userId)); subcribesFuture.whenCompleteAsync((subcribes,e) ->{ userInfo.setSubscribes(subcribes); }); final CompletableFuture
> addressesFuture = CompletableFuture.supplyAsync(() -> { try { return CompletableFutureUserUtil.getAddressesWithException(userId); } catch (Exception e) { CompletableFutureUserUtil.printLog(String.format("getAddressesWithException抛出异常:%s",e.getMessage())); e.printStackTrace(); } return null; }); addressesFuture.whenCompleteAsync((addresses,e) ->{ userInfo.setAddresses(addresses); }); try { CompletableFuture
anyOf = CompletableFuture.anyOf(subcribesFuture,addressesFuture); anyOf.get(timeout, TimeUnit.SECONDS); //阻塞等待任务结束 }catch (Exception e){ CompletableFutureUserUtil.printLog(String.format("总任务抛出异常:%s",e.getMessage())); } CompletableFutureUserUtil.printLog("方法结束"); System.out.println("用户信息:"+ JSON.toJSONString(userInfo)); System.out.println(String.format("并行执行接口执行总耗时:%s毫秒",System.currentTimeMillis() - startTime)); } ``` 执行结果: ``` Tag:[getAddressesWithException start],当前时间:[2021-03-06 22:28:32] Tag:[getSubscribes start],当前时间:[2021-03-06 22:28:32] Tag:[getAddressesWithException抛出异常:/ by zero],当前时间:[2021-03-06 22:28:34] java.lang.ArithmeticException: / by zero at com.gongstring.jdk.CompletableFutureUserUtil.getAddressesWithException(CompletableFutureUserUtil.java:70) at com.gongstring.jdk.CompletableFuture1Test.lambda$anyExceptionEnd$26(CompletableFuture1Test.java:240) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Tag:[方法结束],当前时间:[2021-03-06 22:28:34] 用户信息:{} 并行执行接口执行总耗时:2091毫秒 ``` 由于使用了anyOf,所以异常的抛出时间应在最早执行结束任务之前,否则不生效。 ` 注意:此处的并发编程使用的线程池为系统自带(根据当前服务器核心数计算线程池),如果需要使用自定义线程池,可以在调用anyOf(()-{},线程池实例)或allOf(()-{},线程池实例)添加。`