Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
And you can therefore also provide a specific Executor to all the Async methods.
thenRun/thenRunAsync : Returns a new CompletionStage that, when this stage completes normally, executes the given action.
thenAccept/thenAcceptAsync : Returns a new CompletionStage that, when this stage completes normally, is executed with this stage’s result as the argument to the supplied action.
thenApply/thenApplyAsync : Returns a new CompletionStage that, when this stage completes normally, is executed with this stage’s result as the argument to the supplied function.
thenAcceptBoth/thenAcceptBothAsync : like thenCombine but taking a function that returns void. thenAcceptBoth 跟 thenCombine 类似,但是返回 CompletableFuture 类型。
runAfterBoth/runAfterBothAsync : accepts a Runnable for execution after both CompletableFutures complete
applyToEither/applyToEitherAsync : takes a unary function, supplying it with the result of whichever CompletableFuture completes first
acceptEither/acceptEitherAsync : like applyToEither but taking a unary function with void result
runAfterEither/runAfterEitherAsync : accepts a Runnable for execution after either CompletableFuture completes
importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;publicclassAsynchronousExceptions{publicstaticvoidmain(finalString[]args)throwsInterruptedException{for(finalbooleanfailure:newboolean[]{false,true}){CompletableFuture<Integer>x=CompletableFuture.supplyAsync(()->{if(failure){thrownewRuntimeException("Oops, something went wrong");return42;try{// Blocks (avoid this in production code!), and either returns the promise's value, or...System.out.println(x.get());System.out.println("isCompletedExceptionally = "+x.isCompletedExceptionally());// Output[failure=false]: 42// Output[failure=false]: isCompletedExceptionally = false}catch(ExecutionExceptione){// ... rethrows the RuntimeException wrapped as an ExecutionExceptionSystem.out.println(e.getMessage());System.out.println(e.getCause().getMessage());System.out.println("isCompletedExceptionally = "+x.isCompletedExceptionally());// Output[failure=true]: java.lang.RuntimeException: Oops, something went wrong// Output[failure=true]: Oops, something went wrong// Output[failure=true]: isCompletedExceptionally = true
CompletableFuture 提供了其他几个更优雅的异常处理方式。
1、exceptionally
exceptionally(Function<Throwable,? extends T> fn) : 只有当 CompletableFuture 抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。非常类似于 try-catch。可以用它来捕获异常并返回一个默认值或者错误代码(handle exception and return a default or error value)。
importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;publicclassAsynchronousExceptionsHandlingWithExceptionally{publicstaticvoidmain(finalString[]args)throwsInterruptedException,ExecutionException{for(finalbooleanfailure:newboolean[]{false,true}){CompletableFuture<Integer>x=CompletableFuture.supplyAsync(()->{if(failure){thrownewRuntimeException("Oops, something went wrong");return42;
* Returns a new CompletableFuture that is completed when this CompletableFuture completes,
* with the result of the given function of the exception triggering this CompletableFuture's completion
* when it completes exceptionally; otherwise, if this CompletableFuture completes normally,
* then the returned CompletableFuture also completes normally with the same value.
CompletableFuture<Integer>tryX=x.exceptionally(ex->-1);// Note that tryX and x are of same type.// Blocks (avoid this in production code!), and either returns the promise's valueSystem.out.println(tryX.get());System.out.println("isCompletedExceptionally = "+tryX.isCompletedExceptionally());// Output[failure=false]: 42// Output[failure=false]: isCompletedExceptionally = false// Output[failure=true]: -1// Output[failure=true]: isCompletedExceptionally = false
importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;publicclassAsynchronousExceptionsHandlingWithWhenComplete{publicstaticvoidmain(finalString[]args)throwsInterruptedException,ExecutionException{for(finalbooleanfailure:newboolean[]{false,true}){CompletableFuture<Integer>x=CompletableFuture.supplyAsync(()->{if(failure){thrownewRuntimeException("Oops, something went wrong");return42;
* Returns a new CompletableFuture with the same result or exception as this CompletableFuture,
* that executes the given action when this stage completes.
CompletableFuture<Integer>tryX=x.whenComplete((value,ex)->{// Note that tryX and x are of same type. This CompletableFuture acts as an invisible "decorator".if(value!=null){// We get a chance to transform the result by adding 1...System.out.println("Result: "+value);}else{// ... or return an error value:System.out.println("Error code: -1. Root cause: "+ex.getMessage());try{// Blocks (avoid this in production code!), and either returns the promise's value, or...System.out.println(tryX.get());System.out.println("isCompletedExceptionally = "+tryX.isCompletedExceptionally());// Output[failure=false]: Result: 42// Output[failure=false]: 42// Output[failure=false]: isCompletedExceptionally = false}catch(ExecutionExceptione){// ... rethrows the RuntimeException wrapped as an ExecutionExceptionSystem.out.println(e.getMessage());System.out.println("isCompletedExceptionally = "+tryX.isCompletedExceptionally());// Output[failure=true]: Error code: -1. Root cause: java.lang.RuntimeException: Oops, something went wrong// Output[failure=true]: Oops, something went wrong// Output[failure=true]: isCompletedExceptionally = true
2.2 handle
handle 方法给你一个机会,可以在一个方法里对正常处理结果进行转换或者对异常进行捕获处理:
importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;importstaticjava.lang.String.format;publicclassAsynchronousExceptionsHandlingWithHandle{publicstaticvoidmain(finalString[]args)throwsInterruptedException,ExecutionException{for(finalbooleanfailure:newboolean[]{false,true}){CompletableFuture<Integer>x=CompletableFuture.supplyAsync(()->{if(failure){thrownewRuntimeException("Oops, something went wrong");return42;
* Returns a new CompletableFuture that, when this CompletableFuture completes either normally or exceptionally,
* is executed with this stage's result and exception as arguments to the supplied function.
CompletableFuture<HttpResponse>tryX=x.handle((value,ex)->{// Note that tryX and x are of different type.if(value!=null){// We get a chance to transform the result...returnnewHttpResponse(200,format("{\"value\": %s}",value));}else{// ... or return details on the error using the ExecutionException's message:returnnewHttpResponse(500,format("{\"error\": \"%s\"}",ex.getMessage()));// Blocks (avoid this in production code!), and either returns the promise's value:System.out.println(tryX.get());System.out.println("isCompletedExceptionally = "+tryX.isCompletedExceptionally());// Output[failure=false]: 200 - {"value": 42}// Output[failure=false]: isCompletedExceptionally = false// Output[failure=true]: 500 - {"error": "java.lang.RuntimeException: Oops, something went wrong"}// Output[failure=true]: isCompletedExceptionally = falseprivatestaticclassHttpResponse{privatefinalintstatus;privatefinalStringbody;publicHttpResponse(finalintstatus,finalStringbody){this.status=status;this.body=body;publicStringtoString(){returnstatus+" - "+body;}
CompletableFuture<Integer>tryX=x.exceptionally(ex->-1);// Note that tryX and x are of same type.// Blocks (avoid this in production code!), and either returns the promise's valueSystem.out.println(tryX.get());// 不会抛异常,返回-1System.out.println("isCompletedExceptionally = "+tryX.isCompletedExceptionally());System.out.println(x.get());// 还是抛异常!System.out.println("isCompletedExceptionally = "+x.isCompletedExceptionally());
* Singleton delay scheduler, used only for starting and
* cancelling tasks.
staticfinalclassDelayer{staticScheduledFuture<?>delay(Runnablecommand,longdelay,TimeUnitunit){returndelayer.schedule(command,delay,unit);staticfinalclassDaemonThreadFactoryimplementsThreadFactory{publicThreadnewThread(Runnabler){Threadt=newThread(r);t.setDaemon(true);t.setName("CompletableFutureDelayScheduler");returnt;staticfinalScheduledThreadPoolExecutordelayer;static{(delayer=newScheduledThreadPoolExecutor(1,newDaemonThreadFactory())).setRemoveOnCancelPolicy(true);
public CompletableFuture orTimeout(long timeout, TimeUnit unit) : completes the CompletableFuture with a TimeoutException after the specified timeout has elapsed.
public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) : provides a default value in the case that the CompletableFuture pipeline times out.
allAsList(Iterable<ListenableFuture>): Returns a ListenableFuture whose value is a list containing the values of each of the input futures, in order. If any of the input futures fails or is cancelled, this future fails or is cancelled.
可以看到由于 Step_1: computing integers from 1 to 5 和 Step_3: multiplying the sum by 1, 2 and 3 都是异步执行的,整个执行时间只需要 4秒,对比单线程则需要16秒。而且关键是整个代码可读性非常简洁、内聚和优雅( 从Java 8开始 Java再也不是那么冗长了:) :
we summed integers from 1 to 5 using a reducer, combining the neutral element CompletableFuture.completedFuture(0) with other futures as they completed using thenCombine
we waited for all multiplications to complete using allOf and thenApply-ed Integer’s natural order comparator to find the maximum value.
CompletableFuture vs. Java8 Stream vs. RxJava1 & RxJava2