Java 25 虚拟线程与结构化并发的结合:并发编程的黄金组合

张开发
2026/4/11 5:41:13 15 分钟阅读

分享文章

Java 25 虚拟线程与结构化并发的结合:并发编程的黄金组合
Java 25 虚拟线程与结构化并发的结合并发编程的黄金组合别叫我大神叫我 Alex 就好。今天我们来聊聊 Java 25 中虚拟线程与结构化并发的结合这是并发编程的黄金组合。一、虚拟线程与结构化并发的关系虚拟线程和结构化并发是 Java 近年来引入的两个革命性特性它们解决了不同但相关的问题虚拟线程解决了传统线程的资源占用高、创建成本高的问题让我们可以轻松创建数百万个线程结构化并发解决了并发任务的管理问题让我们可以更清晰地管理任务的生命周期当这两个特性结合使用时它们形成了一个强大的组合让并发编程变得更加简单、优雅和高效。二、基本用法1. 在结构化并发中使用虚拟线程public class VirtualThreadsWithStructuredConcurrency { public User getUserWithDetails(String userId) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { // 使用虚拟线程执行任务 FutureUser userFuture scope.fork(() - { return userService.getUser(userId); }); FutureUserProfile profileFuture scope.fork(() - { return profileService.getUserProfile(userId); }); FutureListOrder ordersFuture scope.fork(() - { return orderService.getUserOrders(userId); }); scope.join(); scope.throwIfFailed(); User user userFuture.resultNow(); user.setProfile(profileFuture.resultNow()); user.setOrders(ordersFuture.resultNow()); return user; } } }2. 自定义虚拟线程执行器public class VirtualThreadExecutor { private static final Executor VIRTUAL_THREAD_EXECUTOR Executors.newVirtualThreadPerTaskExecutor(); public static T T executeWithVirtualThread(SupplierT task) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { FutureT future scope.fork(task::get); scope.join(); scope.throwIfFailed(); return future.resultNow(); } } public static void executeWithVirtualThread(Runnable task) { VIRTUAL_THREAD_EXECUTOR.execute(task); } }三、高级用法1. 嵌套结构化并发public class NestedStructuredConcurrency { public OrderDetails getOrderDetails(String orderId) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { // 第一层获取订单基本信息 FutureOrder orderFuture scope.fork(() - { return orderService.getOrder(orderId); }); // 第二层获取订单相关信息 FutureOrderRelatedInfo relatedInfoFuture scope.fork(() - { try (var innerScope new StructuredTaskScope.ShutdownOnFailure()) { FutureListOrderItem itemsFuture innerScope.fork(() - { return orderItemService.getOrderItems(orderId); }); FuturePayment paymentFuture innerScope.fork(() - { return paymentService.getOrderPayment(orderId); }); FutureLogistics logisticsFuture innerScope.fork(() - { return logisticsService.getOrderLogistics(orderId); }); innerScope.join(); innerScope.throwIfFailed(); return new OrderRelatedInfo( itemsFuture.resultNow(), paymentFuture.resultNow(), logisticsFuture.resultNow() ); } }); scope.join(); scope.throwIfFailed(); return new OrderDetails( orderFuture.resultNow(), relatedInfoFuture.resultNow() ); } } }2. 超时处理public class TimeoutHandling { public User getUserWithTimeout(String userId, Duration timeout) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { FutureUser userFuture scope.fork(() - { return userService.getUser(userId); }); // 等待结果或超时 scope.join(timeout); if (scope.isCompletedSuccessfully()) { return userFuture.resultNow(); } else { throw new TimeoutException(Operation timed out); } } } }四、性能优化1. 批量操作public class BatchProcessing { public ListUser getUsersBatch(ListString userIds) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { ListFutureUser futures userIds.stream() .map(userId - scope.fork(() - userService.getUser(userId))) .collect(Collectors.toList()); scope.join(); scope.throwIfFailed(); return futures.stream() .map(Future::resultNow) .collect(Collectors.toList()); } } }2. 背压控制public class BackpressureControl { public ListUser getUsersWithBackpressure(ListString userIds, int batchSize) { ListUser results new ArrayList(); // 分批处理控制并发度 for (int i 0; i userIds.size(); i batchSize) { int end Math.min(i batchSize, userIds.size()); ListString batchIds userIds.subList(i, end); try (var scope new StructuredTaskScope.ShutdownOnFailure()) { ListFutureUser futures batchIds.stream() .map(userId - scope.fork(() - userService.getUser(userId))) .collect(Collectors.toList()); scope.join(); scope.throwIfFailed(); futures.stream() .map(Future::resultNow) .forEach(results::add); } } return results; } }五、实践案例电商平台订单处理场景描述电商平台需要处理订单创建、库存检查、支付处理等多个并发操作。实现方案Service public class OrderProcessingService { private final InventoryService inventoryService; private final PaymentService paymentService; private final OrderRepository orderRepository; private final EventPublisher eventPublisher; // 构造函数省略 public Order createOrder(CreateOrderRequest request) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { // 1. 检查库存 FutureBoolean stockCheckFuture scope.fork(() - { return inventoryService.checkStock( request.getProductId(), request.getQuantity()); }); // 2. 验证用户 FutureUser userFuture scope.fork(() - { return userService.getUser(request.getUserId()); }); // 3. 计算价格 FutureBigDecimal priceFuture scope.fork(() - { return pricingService.calculatePrice( request.getProductId(), request.getQuantity()); }); scope.join(); scope.throwIfFailed(); // 检查库存 if (!stockCheckFuture.resultNow()) { throw new InsufficientStockException(Not enough stock); } // 创建订单 Order order Order.builder() .userId(request.getUserId()) .productId(request.getProductId()) .quantity(request.getQuantity()) .totalPrice(priceFuture.resultNow()) .status(OrderStatus.CREATED) .build(); // 保存订单 order orderRepository.save(order); // 4. 处理支付 scope.fork(() - { PaymentRequest paymentRequest PaymentRequest.builder() .orderId(order.getId()) .userId(request.getUserId()) .amount(priceFuture.resultNow()) .build(); return paymentService.processPayment(paymentRequest); }); // 5. 扣减库存 scope.fork(() - { return inventoryService.decreaseStock( request.getProductId(), request.getQuantity()); }); scope.join(); scope.throwIfFailed(); // 发布订单创建事件 eventPublisher.publishEvent(new OrderCreatedEvent(order)); return order; } } }六、常见问题与解决方案1. 异常处理public class ExceptionHandling { public User getUserWithErrorHandling(String userId) { try (var scope new StructuredTaskScope.ShutdownOnFailure()) { FutureUser userFuture scope.fork(() - { try { return userService.getUser(userId); } catch (UserNotFoundException e) { // 处理特定异常 logger.warn(User not found: {}, userId); return new User(userId, Unknown, unknownexample.com); } }); scope.join(); try { scope.throwIfFailed(); } catch (Exception e) { // 处理其他异常 logger.error(Failed to get user: {}, e.getMessage()); return new User(userId, Error, errorexample.com); } return userFuture.resultNow(); } } }2. 资源管理public class ResourceManagement { public ListUser getUsersWithResourceManagement(ListString userIds) { try (var scope new StructuredTaskScope.ShutdownOnFailure(); var connection dataSource.getConnection()) { ListFutureUser futures userIds.stream() .map(userId - scope.fork(() - { try (var statement connection.prepareStatement( SELECT * FROM users WHERE id ?)) { statement.setString(1, userId); try (var resultSet statement.executeQuery()) { if (resultSet.next()) { return new User( resultSet.getString(id), resultSet.getString(name), resultSet.getString(email) ); } else { throw new UserNotFoundException(userId); } } } })) .collect(Collectors.toList()); scope.join(); scope.throwIfFailed(); return futures.stream() .map(Future::resultNow) .collect(Collectors.toList()); } } }七、总结与建议虚拟线程与结构化并发的结合为 Java 并发编程带来了革命性的变化。它们不仅让代码更加清晰、易读还提高了系统的性能和可靠性。这其实可以更优雅一点建议大家在以下场景优先考虑使用虚拟线程与结构化并发的组合IO 密集型任务如数据库查询、HTTP 调用等批量处理需要并行处理多个独立任务复杂业务流程涉及多个步骤的业务流程高并发场景需要处理大量并发请求别叫我大神叫我 Alex 就好。希望这篇文章能帮助你更好地理解和使用 Java 25 的虚拟线程与结构化并发。欢迎在评论区分享你的使用经验

更多文章