一、什么是异步编程

在编程的世界里,我们经常会遇到一些比较耗时的任务,比如网络请求、文件读写等。如果采用同步编程的方式,程序会一直等待这些任务完成,才能继续执行后面的代码,这就好比你在排队买奶茶,前面的人买奶茶特别慢,你只能干等着,啥别的事儿也做不了。而异步编程就不一样了,它允许你在等待这些耗时任务的同时,去做其他的事情,就像你在排队买奶茶的时候,还能顺便刷刷手机、和朋友聊聊天。

在 Java 里,CompletableFuture 就是实现异步编程的一个强大工具。它可以让我们更方便地处理异步任务,并且支持组合式的任务处理,就像搭积木一样,把不同的任务组合起来完成更复杂的功能。

二、CompletableFuture 基本使用

1. 创建 CompletableFuture

我们可以通过 CompletableFuture 的静态方法来创建一个异步任务。下面是一个简单的示例:

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个 CompletableFuture 异步任务,使用 supplyAsync 方法,它会返回一个带有返回值的 CompletableFuture
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 2 秒钟
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        });

        try {
            // 获取异步任务的结果
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,我们使用 CompletableFuture.supplyAsync 方法创建了一个异步任务,这个任务会在一个新的线程中执行。supplyAsync 方法接收一个 Supplier 函数式接口,我们在这个接口的实现中模拟了一个耗时任务,最后返回一个字符串。然后我们使用 get 方法来获取异步任务的结果。

2. 无返回值的异步任务

如果我们的异步任务不需要返回值,可以使用 CompletableFuture.runAsync 方法。示例如下:

// Java 技术栈
import java.util.concurrent.CompletableFuture;

public class CompletableFutureRunAsyncExample {
    public static void main(String[] args) {
        // 创建一个无返回值的异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 2 秒钟
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("无返回值任务完成");
        });

        try {
            // 等待任务完成
            future.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这里我们使用 runAsync 方法创建了一个无返回值的异步任务,它接收一个 Runnable 函数式接口。

三、组合式异步任务处理

1. 顺序执行任务

有时候我们需要一个任务完成后再执行另一个任务,CompletableFuture 提供了 thenApplythenAccept 等方法来实现顺序执行。

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureSequentialExample {
    public static void main(String[] args) {
        // 创建第一个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 2 秒钟
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 1 完成";
        });

        // 第一个任务完成后执行第二个任务
        CompletableFuture<String> future2 = future1.thenApply(result -> {
            System.out.println(result);
            try {
                // 模拟一个耗时任务,睡 1 秒钟
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 2 完成";
        });

        try {
            // 获取第二个任务的结果
            String result = future2.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,我们先创建了一个异步任务 future1,然后使用 thenApply 方法在 future1 完成后执行另一个任务 future2thenApply 方法接收一个 Function 函数式接口,它会将前一个任务的结果作为参数传递给这个函数。

2. 并行执行任务

CompletableFuture 还支持并行执行多个任务,然后等待所有任务完成。我们可以使用 allOf 方法来实现。

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureParallelExample {
    public static void main(String[] args) {
        // 创建第一个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 2 秒钟
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 1 完成";
        });

        // 创建第二个异步任务
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 1 秒钟
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务 2 完成";
        });

        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        // 当所有任务完成后,获取每个任务的结果
        CompletableFuture<Object[]> allResults = allFutures.thenApply(v -> {
            try {
                return new Object[]{future1.get(), future2.get()};
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                return null;
            }
        });

        try {
            // 获取最终结果
            Object[] results = allResults.get();
            for (Object result : results) {
                System.out.println(result);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,我们创建了两个异步任务 future1future2,然后使用 allOf 方法等待这两个任务都完成。最后,我们使用 thenApply 方法获取每个任务的结果。

四、应用场景

1. 网络请求

在开发 Web 应用时,我们经常需要同时发起多个网络请求。使用 CompletableFuture 可以并行地发起这些请求,提高程序的性能。比如,我们要从不同的 API 获取用户信息和订单信息:

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class NetworkRequestExample {
    public static void main(String[] args) {
        // 模拟获取用户信息的网络请求
        CompletableFuture<String> userInfoFuture = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟网络请求耗时
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用户信息";
        });

        // 模拟获取订单信息的网络请求
        CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟网络请求耗时
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "订单信息";
        });

        // 等待两个请求都完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(userInfoFuture, orderInfoFuture);

        // 当两个请求都完成后,处理结果
        CompletableFuture<Object[]> allResults = allFutures.thenApply(v -> {
            try {
                return new Object[]{userInfoFuture.get(), orderInfoFuture.get()};
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                return null;
            }
        });

        try {
            // 获取最终结果
            Object[] results = allResults.get();
            for (Object result : results) {
                System.out.println(result);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

2. 批量数据处理

在处理大量数据时,我们可以将数据分成多个小块,并行地处理这些小块数据,最后合并结果。比如,我们要对一个大文件进行分词处理:

// Java 技术栈
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class BatchDataProcessingExample {
    public static void main(String[] args) {
        // 模拟大文件的数据
        List<String> data = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            data.add("data" + i);
        }

        // 将数据分成多个小块
        int chunkSize = 100;
        List<List<String>> chunks = new ArrayList<>();
        for (int i = 0; i < data.size(); i += chunkSize) {
            chunks.add(data.subList(i, Math.min(i + chunkSize, data.size())));
        }

        // 并行处理每个小块数据
        List<CompletableFuture<List<String>>> futures = new ArrayList<>();
        for (List<String> chunk : chunks) {
            CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
                List<String> result = new ArrayList<>();
                for (String item : chunk) {
                    // 模拟分词处理
                    result.add(item + "_processed");
                }
                return result;
            });
            futures.add(future);
        }

        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        // 合并所有结果
        CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> {
            List<String> finalResult = new ArrayList<>();
            for (CompletableFuture<List<String>> future : futures) {
                try {
                    finalResult.addAll(future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            return finalResult;
        });

        try {
            // 获取最终结果
            List<String> result = allResults.get();
            System.out.println("处理后的数据数量: " + result.size());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

五、技术优缺点

优点

  • 提高性能:通过并行执行任务,充分利用多核处理器的性能,减少程序的执行时间。就像多个工人同时工作,比一个工人工作要快得多。
  • 代码简洁CompletableFuture 提供了丰富的方法来组合和处理异步任务,使得代码更加简洁易读。
  • 异常处理方便:可以方便地处理异步任务中抛出的异常,避免程序崩溃。

缺点

  • 学习成本较高CompletableFuture 的方法比较多,需要一定的时间来学习和掌握。
  • 调试困难:由于异步任务是在不同的线程中执行的,调试时可能会遇到一些困难。

六、注意事项

1. 异常处理

在使用 CompletableFuture 时,要注意异常处理。可以使用 exceptionally 方法来处理异常。示例如下:

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟抛出异常
            throw new RuntimeException("任务执行出错");
        }).exceptionally(ex -> {
            System.out.println("捕获到异常: " + ex.getMessage());
            return "默认结果";
        });

        try {
            String result = future.get(1, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2. 线程池的使用

CompletableFuture 默认使用 ForkJoinPool.commonPool() 线程池,在高并发场景下,可能需要自定义线程池来避免资源耗尽。示例如下:

// Java 技术栈
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个自定义线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时任务,睡 2 秒钟
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "任务完成";
        }, executor);

        try {
            // 获取异步任务的结果
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executor.shutdown();
        }
    }
}

七、文章总结

CompletableFuture 是 Java 中一个非常强大的异步编程工具,它可以让我们更方便地处理异步任务,并且支持组合式的任务处理。通过并行执行任务,我们可以提高程序的性能,减少执行时间。在实际应用中,CompletableFuture 可以用于网络请求、批量数据处理等场景。但是,使用 CompletableFuture 也有一些注意事项,比如异常处理和线程池的使用。我们要充分发挥 CompletableFuture 的优势,同时避免它的缺点,这样才能编写出高效、稳定的程序。