一、Fork/Join 框架核心思想
1. 分而治之(Divide and Conquer)
java
// Fork/Join 的核心理念 public class ForkJoinPhilosophy { /* Fork(拆分):将大任务拆分成小任务 Join(合并):将小任务的结果合并成最终结果 工作原理: 1. 任务拆分 → 递归分解直到阈值 2. 并行执行 → 工作窃取(Work-Stealing) 3. 结果合并 → 递归合并计算结果 与普通线程池的区别: - 普通线程池:任务间独立,无依赖 - Fork/Join:任务有父子关系,需要合并结果 */ }2. 核心组件
java
// Fork/Join 框架三大核心 public class ForkJoinComponents { /* 1. ForkJoinPool(线程池) - 特殊的工作窃取线程池 - 默认线程数 = CPU核心数 2. ForkJoinTask(任务基类) - RecursiveAction:无返回值的任务 - RecursiveTask<V>:有返回值的任务 - CountedCompleter:更复杂的任务 3. Work-Stealing(工作窃取算法) - 每个线程有自己的双端队列 - 空闲线程从其他线程队列尾部"窃取"任务 - 减少竞争,提高CPU利用率 */ }二、六大核心使用场景
场景1:大数据集并行计算(最经典)
java
// 大规模数组求和/统计 public class LargeArraySumTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10_000; // 拆分阈值 private final long[] array; private final int start, end; public LargeArraySumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; // 如果任务足够小,直接计算 if (length <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } // 否则,拆分成两个子任务 int middle = start + (length / 2); LargeArraySumTask leftTask = new LargeArraySumTask(array, start, middle); LargeArraySumTask rightTask = new LargeArraySumTask(array, middle, end); // 异步执行左任务(fork) leftTask.fork(); // 同步执行右任务(compute) long rightResult = rightTask.compute(); // 等待左任务完成并获取结果(join) long leftResult = leftTask.join(); // 合并结果 return leftResult + rightResult; } // 使用示例 public static void main(String[] args) { long[] array = new long[1_000_000]; Arrays.fill(array, 1L); // 填充100万个1 ForkJoinPool pool = ForkJoinPool.commonPool(); LargeArraySumTask task = new LargeArraySumTask(array, 0, array.length); long sum = pool.invoke(task); System.out.println("总和: " + sum); // 输出: 1000000 } }适用数据量:10万+ 元素的数组/集合运算
场景2:复杂递归算法并行化
java
// 并行快速排序 public class ParallelQuickSort extends RecursiveAction { private static final int THRESHOLD = 1000; private final int[] array; private final int left, right; public ParallelQuickSort(int[] array, int left, int right) { this.array = array; this.left = left; this.right = right; } @Override protected void compute() { if (right - left <= THRESHOLD) { // 小数组使用串行排序 Arrays.sort(array, left, right + 1); return; } int pivotIndex = partition(array, left, right); // 创建左右子任务 ParallelQuickSort leftTask = new ParallelQuickSort(array, left, pivotIndex - 1); ParallelQuickSort rightTask = new ParallelQuickSort(array, pivotIndex + 1, right); // 并行执行 invokeAll(leftTask, rightTask); } private int partition(int[] arr, int low, int high) { int pivot = arr[high]; int i = low - 1; for (int j = low; j < high; j++) { if (arr[j] <= pivot) { i++; swap(arr, i, j); } } swap(arr, i + 1, high); return i + 1; } private void swap(int[] arr, int i, int j) { int temp = arr[i]; arr[i] = arr[j]; arr[j] = temp; } // 性能对比 public static void comparePerformance() { int[] data1 = generateRandomArray(1_000_000); int[] data2 = data1.clone(); // 传统快排 long start1 = System.currentTimeMillis(); Arrays.sort(data1); long time1 = System.currentTimeMillis() - start1; // 并行快排 ForkJoinPool pool = new ForkJoinPool(); ParallelQuickSort task = new ParallelQuickSort(data2, 0, data2.length - 1); long start2 = System.currentTimeMillis(); pool.invoke(task); long time2 = System.currentTimeMillis() - start2; System.out.printf("串行快排: %dms, 并行快排: %dms, 加速比: %.2fx%n", time1, time2, (double)time1/time2); // 8核CPU上,通常可达到3-6倍加速 } }适用算法:快速排序、归并排序、矩阵乘法、Strassen算法等
场景3:大规模数据处理与ETL
java
// 并行文件处理:统计目录下所有Java文件的行数 public class FileLineCounter extends RecursiveTask<Long> { private final File file; private static final int FILE_THRESHOLD = 10; // 小文件直接处理 public FileLineCounter(File file) { this.file = file; } @Override protected Long compute() { if (file.isFile()) { if (file.getName().endsWith(".java")) { return countLines(file); } return 0L; } // 如果是目录,拆分成子任务 File[] children = file.listFiles(); if (children == null || children.length == 0) { return 0L; } List<FileLineCounter> tasks = new ArrayList<>(); for (File child : children) { FileLineCounter task = new FileLineCounter(child); if (child.isDirectory() || child.isFile() && !child.getName().endsWith(".java")) { // 目录或非Java文件,直接fork task.fork(); tasks.add(task); } else { // Java文件,如果数量少则直接计算 if (tasks.size() < FILE_THRESHOLD) { tasks.add(task); } else { task.fork(); tasks.add(task); } } } // 合并结果 long total = 0; for (FileLineCounter task : tasks) { total += task.join(); } return total; } private long countLines(File file) { try (BufferedReader reader = new BufferedReader(new FileReader(file))) { return reader.lines().count(); } catch (IOException e) { return 0L; } } // 使用示例 public static void main(String[] args) { File projectDir = new File("/path/to/project"); ForkJoinPool pool = new ForkJoinPool(); FileLineCounter task = new FileLineCounter(projectDir); long totalLines = pool.invoke(task); System.out.println("总代码行数: " + totalLines); } }适用场景:日志分析、数据清洗、文件批量处理、数据迁移
场景4:图像/视频处理
java
// 并行图像处理:高斯模糊 public class GaussianBlurTask extends RecursiveAction { private final int[][] image; private final int[][] result; private final int startRow, endRow; private final int radius; private static final int ROW_THRESHOLD = 100; // 每任务处理行数阈值 public GaussianBlurTask(int[][] image, int[][] result, int startRow, int endRow, int radius) { this.image = image; this.result = result; this.startRow = startRow; this.endRow = endRow; this.radius = radius; } @Override protected void compute() { int height = endRow - startRow; if (height <= ROW_THRESHOLD) { // 直接处理这部分行 applyGaussianBlur(startRow, endRow); return; } // 拆分成上下两部分 int middle = startRow + (height / 2); GaussianBlurTask topTask = new GaussianBlurTask(image, result, startRow, middle, radius); GaussianBlurTask bottomTask = new GaussianBlurTask(image, result, middle, endRow, radius); // 并行执行 invokeAll(topTask, bottomTask); } private void applyGaussianBlur(int start, int end) { int width = image[0].length; double[][] kernel = createGaussianKernel(radius); for (int y = start; y < end; y++) { for (int x = 0; x < width; x++) { double sum = 0; double weightSum = 0; // 应用卷积核 for (int ky = -radius; ky <= radius; ky++) { for (int kx = -radius; kx <= radius; kx++) { int nx = Math.min(Math.max(x + kx, 0), width - 1); int ny = Math.min(Math.max(y + ky, 0), image.length - 1); double weight = kernel[ky + radius][kx + radius]; sum += image[ny][nx] * weight; weightSum += weight; } } result[y][x] = (int)(sum / weightSum); } } } private double[][] createGaussianKernel(int radius) { int size = 2 * radius + 1; double[][] kernel = new double[size][size]; double sigma = radius / 3.0; double sum = 0; for (int y = -radius; y <= radius; y++) { for (int x = -radius; x <= radius; x++) { double value = Math.exp(-(x*x + y*y) / (2 * sigma * sigma)); kernel[y + radius][x + radius] = value; sum += value; } } // 归一化 for (int y = 0; y < size; y++) { for (int x = 0; x < size; x++) { kernel[y][x] /= sum; } } return kernel; } // 使用示例 public static void main(String[] args) { // 假设已有图像数据 int[][] image = loadImage("input.jpg"); int[][] result = new int[image.length][image[0].length]; ForkJoinPool pool = new ForkJoinPool(); GaussianBlurTask task = new GaussianBlurTask( image, result, 0, image.length, 3); pool.invoke(task); saveImage(result, "output.jpg"); } }适用场景:图像滤波、特征提取、视频编码、3D渲染
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
场景5:机器学习与数据分析
java
// 并行K-Means聚类算法 public class ParallelKMeans extends RecursiveTask<Void> { private final double[][] dataPoints; private final double[][] centroids; private final int[] assignments; private final int start, end; private final int k; private static final int POINT_THRESHOLD = 1000; public ParallelKMeans(double[][] dataPoints, double[][] centroids, int[] assignments, int start, int end, int k) { this.dataPoints = dataPoints; this.centroids = centroids; this.assignments = assignments; this.start = start; this.end = end; this.k = k; } @Override protected Void compute() { int length = end - start; if (length <= POINT_THRESHOLD) { // 分配阶段:为每个数据点找最近的中心 assignPointsToCentroids(); return null; } // 拆分任务 int middle = start + (length / 2); ParallelKMeans leftTask = new ParallelKMeans( dataPoints, centroids, assignments, start, middle, k); ParallelKMeans rightTask = new ParallelKMeans( dataPoints, centroids, assignments, middle, end, k); invokeAll(leftTask, rightTask); return null; } private void assignPointsToCentroids() { for (int i = start; i < end; i++) { double minDist = Double.MAX_VALUE; int bestCluster = 0; for (int j = 0; j < k; j++) { double dist = euclideanDistance(dataPoints[i], centroids[j]); if (dist < minDist) { minDist = dist; bestCluster = j; } } assignments[i] = bestCluster; } } private double euclideanDistance(double[] a, double[] b) { double sum = 0; for (int i = 0; i < a.length; i++) { double diff = a[i] - b[i]; sum += diff * diff; } return Math.sqrt(sum); } // 完整的K-Means流程 public static class KMeansClusterer { public void cluster(double[][] data, int k, int maxIterations) { int n = data.length; int dim = data[0].length; // 初始化中心点 double[][] centroids = initializeCentroids(data, k); int[] assignments = new int[n]; ForkJoinPool pool = new ForkJoinPool(); for (int iter = 0; iter < maxIterations; iter++) { // 并行分配阶段 ParallelKMeans assignTask = new ParallelKMeans( data, centroids, assignments, 0, n, k); pool.invoke(assignTask); // 串行更新中心点(可以进一步并行化) double[][] newCentroids = updateCentroids(data, assignments, k); // 检查收敛 if (centroidsConverged(centroids, newCentroids, 1e-6)) { break; } centroids = newCentroids; } } private double[][] updateCentroids(double[][] data, int[] assignments, int k) { int dim = data[0].length; double[][] newCentroids = new double[k][dim]; int[] counts = new int[k]; // 计算新的中心点 for (int i = 0; i < data.length; i++) { int cluster = assignments[i]; for (int d = 0; d < dim; d++) { newCentroids[cluster][d] += data[i][d]; } counts[cluster]++; } // 求平均值 for (int j = 0; j < k; j++) { if (counts[j] > 0) { for (int d = 0; d < dim; d++) { newCentroids[j][d] /= counts[j]; } } } return newCentroids; } } }适用场景:数据聚类、神经网络训练、特征工程、模型评估
场景6:Web服务器请求处理
java
// 并行处理API聚合请求 public class ParallelApiAggregator extends RecursiveTask<AggregateResult> { private final List<ApiRequest> requests; private final int start, end; private static final int REQUEST_THRESHOLD = 3; // 小批次直接处理 public ParallelApiAggregator(List<ApiRequest> requests, int start, int end) { this.requests = requests; this.start = start; this.end = end; } @Override protected AggregateResult compute() { int batchSize = end - start; if (batchSize <= REQUEST_THRESHOLD) { // 直接串行调用API return callApisSequentially(); } // 拆分成两个批次 int middle = start + (batchSize / 2); ParallelApiAggregator leftTask = new ParallelApiAggregator(requests, start, middle); ParallelApiAggregator rightTask = new ParallelApiAggregator(requests, middle, end); leftTask.fork(); AggregateResult rightResult = rightTask.compute(); AggregateResult leftResult = leftTask.join(); // 合并结果 return mergeResults(leftResult, rightResult); } private AggregateResult callApisSequentially() { AggregateResult result = new AggregateResult(); HttpClient client = HttpClient.newHttpClient(); for (int i = start; i < end; i++) { ApiRequest request = requests.get(i); try { HttpResponse<String> response = client.send( request.toHttpRequest(), HttpResponse.BodyHandlers.ofString() ); result.addResponse(request, response.body()); } catch (Exception e) { result.addError(request, e); } } return result; } // Web层使用示例 @RestController public class ApiController { @PostMapping("/aggregate") public ResponseEntity<AggregateResult> aggregateApis( @RequestBody List<ApiRequest> requests) { ForkJoinPool pool = new ForkJoinPool(); ParallelApiAggregator task = new ParallelApiAggregator(requests, 0, requests.size()); AggregateResult result = pool.invoke(task); return ResponseEntity.ok(result); } } // 性能对比:串行 vs 并行调用多个微服务 public static void performanceComparison() { List<ApiRequest> requests = generateRequests(10); // 串行调用 long start1 = System.currentTimeMillis(); AggregateResult serialResult = callApisSequentially(requests); long time1 = System.currentTimeMillis() - start1; // 并行调用 long start2 = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ParallelApiAggregator task = new ParallelApiAggregator(requests, 0, requests.size()); AggregateResult parallelResult = pool.invoke(task); long time2 = System.currentTimeMillis() - start2; System.out.printf("串行调用: %dms, 并行调用: %dms, 加速比: %.2fx%n", time1, time2, (double)time1/time2); // 假设每个API耗时100ms,10个API: // 串行:~1000ms,并行:~100ms,加速比10x } }适用场景:微服务聚合、数据拼装、API网关、批量处理
三、工作窃取(Work-Stealing)机制详解
1. 工作原理
java
// 工作窃取算法的核心逻辑 public class WorkStealingMechanism { /* 双端队列(Deque)设计: - 每个工作线程有一个自己的双端队列 - 线程从自己队列的头部取任务(LIFO) - 窃取线程从其他队列的尾部偷任务(FIFO) 为什么这样设计? 1. 头部操作(LIFO):最近的任务,缓存友好 2. 尾部窃取(FIFO):最老的任务,减少竞争 优势: - 减少线程空闲时间 - 自动负载均衡 - 减少锁竞争 */ } // ForkJoinPool 内部队列结构示意 public class ForkJoinPoolInternal { /* +---------------------+ | Worker 1's Deque | | [T1][T2][T3][T4] | ← 自己从头部取任务 +---------------------+ ← 别人从尾部偷任务 +---------------------+ | Worker 2's Deque | | [T5][T6] | +---------------------+ +---------------------+ | Worker 3 (空闲) | | 正在偷Worker1的T4 | +---------------------+ */ }2. 性能优势对比
java
// 传统线程池 vs ForkJoinPool public class PerformanceComparison { public static void main(String[] args) { int taskCount = 10000; int threshold = 100; // 测试数据 long[] data = generateData(taskCount * threshold); // 1. ForkJoinPool 测试 ForkJoinPool forkJoinPool = new ForkJoinPool(); long start1 = System.currentTimeMillis(); ForkJoinTask<Long> task = new RecursiveSumTask(data, 0, data.length, threshold); long result1 = forkJoinPool.invoke(task); long time1 = System.currentTimeMillis() - start1; // 2. 固定线程池测试 ExecutorService fixedPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors()); long start2 = System.currentTimeMillis(); List<Future<Long>> futures = new ArrayList<>(); int chunkSize = data.length / 8; // 8个固定分区 for (int i = 0; i < 8; i++) { int start = i * chunkSize; int end = (i == 7) ? data.length : (i + 1) * chunkSize; futures.add(fixedPool.submit(() -> { long sum = 0; for (int j = start; j < end; j++) { sum += data[j]; } return sum; })); } long result2 = 0; for (Future<Long> future : futures) { result2 += future.get(); } long time2 = System.currentTimeMillis() - start2; System.out.printf("ForkJoinPool: %dms, FixedPool: %dms, 加速: %.2f%%%n", time1, time2, (time2 - time1) * 100.0 / time2); // 典型结果:ForkJoinPool快15-30% fixedPool.shutdown(); } }四、最佳实践与调优指南
1. 阈值(Threshold)选择策略
java
// 动态阈值调整 public class DynamicThreshold { /* 阈值选择原则: 1. 计算密集型任务:阈值较小(如1000-10000) - CPU密集型,小任务利于负载均衡 2. IO密集型任务:阈值较大(如10000+) - 避免任务拆分过细,减少调度开销 3. 经验公式: threshold = max(任务总数 / (CPU核心数 * 4), 最小阈值) 4. 动态调整: - 根据任务执行时间动态调整 - 监控CPU利用率和任务队列长度 */ // 自适应阈值示例 public class AdaptiveRecursiveTask extends RecursiveTask<Long> { private static final int MIN_THRESHOLD = 1000; private static final int MAX_THRESHOLD = 100000; private static volatile double adaptiveFactor = 1.0; private final long[] data; private final int start, end; @Override protected Long compute() { int size = end - start; // 计算动态阈值 int dynamicThreshold = (int)(MIN_THRESHOLD * adaptiveFactor); dynamicThreshold = Math.min(Math.max(dynamicThreshold, MIN_THRESHOLD), MAX_THRESHOLD); if (size <= dynamicThreshold) { long startTime = System.nanoTime(); long result = computeDirectly(); long duration = System.nanoTime() - startTime; // 根据执行时间调整因子 adjustThresholdFactor(duration, size); return result; } // 拆分逻辑... return null; } private void adjustThresholdFactor(long duration, int size) { // 理想执行时间:100ms - 500ms long idealTime = 100_000_000L; // 100ms in nanoseconds if (duration < idealTime / 2) { // 执行太快,减小阈值 adaptiveFactor *= 0.9; } else if (duration > idealTime * 2) { // 执行太慢,增大阈值 adaptiveFactor *= 1.1; } } } }2. 避免常见陷阱
java
// Fork/Join 使用陷阱与解决方案 public class ForkJoinPitfalls { // 陷阱1:任务拆分过细 public class OverSplittingTask extends RecursiveTask<Long> { // 错误:阈值太小,导致任务太多 private static final int BAD_THRESHOLD = 10; // 太小! // 解决方案:合理设置阈值,监控任务数量 } // 陷阱2:join()使用不当导致死锁 public class DeadlockTask extends RecursiveTask<Long> { @Override protected Long compute() { RecursiveTask<Long> subtask1 = new SubTask(); RecursiveTask<Long> subtask2 = new SubTask(); subtask1.fork(); subtask2.fork(); // 错误:按顺序join可能导致死锁 // long result1 = subtask1.join(); // 可能阻塞 // long result2 = subtask2.join(); // 正确:使用invokeAll或先compute后join invokeAll(subtask1, subtask2); long result1 = subtask1.join(); long result2 = subtask2.join(); return result1 + result2; } } // 陷阱3:共享可变状态 public class SharedStateTask extends RecursiveTask<Void> { private static List<String> sharedList = Collections.synchronizedList(new ArrayList<>()); // 性能瓶颈! // 解决方案:使用线程本地存储或避免共享 private final ThreadLocal<List<String>> localList = ThreadLocal.withInitial(ArrayList::new); } // 陷阱4:异常处理不当 public class ExceptionHandlingTask extends RecursiveTask<Long> { @Override protected Long compute() { try { // 任务逻辑... return doCompute(); } catch (Exception e) { // 记录异常,返回默认值或重新抛出 ForkJoinTask.getPool().getAsyncMode(); // 使用completeExceptionally() completeExceptionally(e); return null; } } } }3. 性能监控与调优
java
// ForkJoinPool 监控工具 public class ForkJoinMonitor { public static void monitorPool(ForkJoinPool pool) { System.out.println("=== ForkJoinPool 监控 ==="); System.out.printf("并行度: %d%n", pool.getParallelism()); System.out.printf("活跃线程数: %d%n", pool.getActiveThreadCount()); System.out.printf("运行线程数: %d%n", pool.getRunningThreadCount()); System.out.printf("窃取次数: %d%n", pool.getStealCount()); System.out.printf("任务队列大小: %d%n", pool.getQueuedTaskCount()); System.out.printf("队列提交数: %d%n", pool.getQueuedSubmissionCount()); // 建议的调优参数 if (pool.getStealCount() / (double)pool.getParallelism() < 10) { System.out.println("建议: 考虑减小任务阈值,增加任务粒度"); } if (pool.getActiveThreadCount() < pool.getParallelism() / 2) { System.out.println("建议: 可能存在锁竞争或IO阻塞"); } } // JVM参数调优 public class JVMTuning { /* 关键JVM参数: 1. 并行度设置 -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 2. 内存设置 -Xmx4g -Xms4g -XX:+UseG1GC # G1对并发友好 3. 偏向锁优化 -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 4. 自旋锁优化 -XX:+UseSpinning -XX:PreBlockSpin=100 */ } }4. 使用CountedCompleter处理复杂依赖
java
// CountedCompleter 示例:有向无环图任务 public class GraphProcessingTask extends CountedCompleter<Void> { private final Node node; private final Set<Node> visited; public GraphProcessingTask(CountedCompleter<?> parent, Node node, Set<Node> visited) { super(parent); this.node = node; this.visited = visited; } @Override public void compute() { if (visited.contains(node)) { tryComplete(); return; } visited.add(node); // 处理当前节点 processNode(node); // 为每个子节点创建子任务 List<Node> children = node.getChildren(); if (!children.isEmpty()) { // 设置待完成子任务数 setPendingCount(children.size()); for (Node child : children) { // 创建并fork子任务 new GraphProcessingTask(this, child, visited).fork(); } } else { // 没有子节点,直接完成 tryComplete(); } } @Override public void onCompletion(CountedCompleter<?> caller) { // 所有子任务完成后执行 System.out.println("节点 " + node + " 及其子节点处理完成"); } // 使用示例 public static void processGraph(Node root) { ForkJoinPool pool = new ForkJoinPool(); Set<Node> visited = ConcurrentHashMap.newKeySet(); GraphProcessingTask task = new GraphProcessingTask(null, root, visited); pool.invoke(task); } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
五、Fork/Join 不适用场景
1. 应该避免使用的情况
java
public class NotSuitableScenarios { /* 1. **简单任务或小数据量** - 任务拆分和合并的开销可能超过收益 - 经验:数据量 < 10000,任务执行时间 < 1ms 2. **强顺序依赖的任务** - 任务间有严格的执行顺序要求 - 如:B必须等A完成后才能开始 3. **IO密集型为主的任务** - Fork/Join适合CPU密集型 - IO阻塞会浪费线程资源 4. **频繁共享可变状态** - 需要大量同步,抵消并发优势 - 考虑使用Actor模型或消息传递 5. **递归深度过深** - 可能导致栈溢出 - 考虑使用迭代或尾递归优化 6. **实时性要求高的任务** - Fork/Join的调度有不确定性 - 考虑使用实时线程或专用线程 */ } // 替代方案推荐 public class AlternativeSolutions { /* 场景 替代方案 1. IO密集型任务 → CompletableFuture + 异步IO 2. 简单并行任务 → 固定线程池 + Future 3. 流式数据处理 → Parallel Stream 4. 事件驱动任务 → Reactor/RxJava 5. 分布式计算 → Spark/Flink 6. 有向无环图任务 → CountedCompleter或Akka */ }六、与Java 8+ Stream API的结合
1. Parallel Stream底层使用Fork/Join
java
// Parallel Stream 的底层实现 public class ParallelStreamUnderTheHood { public static void main(String[] args) { List<Integer> numbers = IntStream.range(0, 1_000_000) .boxed() .collect(Collectors.toList()); // Parallel Stream会自动使用ForkJoinPool.commonPool() long sum = numbers.parallelStream() .filter(n -> n % 2 == 0) .mapToLong(Integer::longValue) .sum(); System.out.println("偶数总和: " + sum); // 自定义ForkJoinPool ForkJoinPool customPool = new ForkJoinPool(4); long customSum = customPool.submit(() -> numbers.parallelStream() .filter(n -> n % 2 == 0) .mapToLong(Integer::longValue) .sum() ).join(); System.out.println("自定义池计算结果: " + customSum); } }2. 性能对比:手写Fork/Join vs Parallel Stream
java
public class ForkJoinVsParallelStream { public static void benchmark() { int size = 10_000_000; List<Integer> data = new ArrayList<>(size); Random random = new Random(); for (int i = 0; i < size; i++) { data.add(random.nextInt(1000)); } // 1. Parallel Stream long start1 = System.currentTimeMillis(); long sum1 = data.parallelStream() .mapToLong(Integer::longValue) .sum(); long time1 = System.currentTimeMillis() - start1; // 2. 手写Fork/Join long start2 = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); RecursiveSumTask task = new RecursiveSumTask(data.stream().mapToLong(i -> i).toArray(), 0, size, 10000); long sum2 = pool.invoke(task); long time2 = System.currentTimeMillis() - start2; // 3. 传统for循环 long start3 = System.currentTimeMillis(); long sum3 = 0; for (int num : data) { sum3 += num; } long time3 = System.currentTimeMillis() - start3; System.out.printf("Parallel Stream: %dms%n", time1); System.out.printf("手写Fork/Join: %dms%n", time2); System.out.printf("传统循环: %dms%n", time3); System.out.printf("Stream加速比: %.2fx%n", (double)time3/time1); System.out.printf("Fork/Join加速比: %.2fx%n", (double)time3/time2); } static class RecursiveSumTask extends RecursiveTask<Long> { private final long[] data; private final int start, end; private final int threshold; RecursiveSumTask(long[] data, int start, int end, int threshold) { this.data = data; this.start = start; this.end = end; this.threshold = threshold; } @Override protected Long compute() { int length = end - start; if (length <= threshold) { long sum = 0; for (int i = start; i < end; i++) { sum += data[i]; } return sum; } int middle = start + (length / 2); RecursiveSumTask left = new RecursiveSumTask(data, start, middle, threshold); RecursiveSumTask right = new RecursiveSumTask(data, middle, end, threshold); invokeAll(left, right); return left.join() + right.join(); } } }总结:Fork/Join框架最适合计算密集型的递归可分治问题。关键要掌握任务拆分策略、阈值选择和结果合并模式。对于现代Java开发,如果问题适合,优先考虑使用Parallel Stream,它的语法更简洁且性能足够优秀。对于复杂场景或需要精细控制时,再考虑手写Fork/Join任务。