Published on

Mastering Structured Concurrency: Building Robust Asynchronous Flows with Java Virtual Threads and Spring Boot 4.0

Authors
  • avatar
    Name
    Maria
    Twitter

Introduction: The Wild West of Unstructured Concurrency

As backend engineers, we frequently encounter scenarios requiring parallel execution to enhance responsiveness or throughput. Whether fetching data from multiple downstream services, processing independent requests concurrently, or performing complex computational tasks, concurrency is a foundational tool. However, the traditional ExecutorService and Future approach, while powerful, often leads to what's been dubbed "unstructured concurrency."

Imagine a scenario where your Spring Boot service needs to aggregate information from three independent microservices (e.g., user profile, order history, and payment methods) to fulfill a single user request. You might spin up three CompletableFuture instances, chain them, and manage their individual lifecycles. But what happens if one fails? How do you reliably cancel the others? What if you need to propagate context (like security principals or trace IDs) across these concurrent tasks? Without explicit structure, these concerns can lead to resource leaks, hard-to-debug errors, and complex cancellation logic that resembles spaghetti code.

Java 21 (and further refined in subsequent versions like Java 25) introduces Structured Concurrency – a revolutionary approach designed to tame this chaos. It ensures that concurrent operations within a logical unit of work are managed as a single, coherent entity, making concurrent code significantly more readable, maintainable, and robust. When combined with Java Virtual Threads (which we explored in a previous post), Structured Concurrency offers an incredibly powerful and efficient way to build highly concurrent, responsive Spring Boot microservices.

Deep Dive: Bringing Order to Concurrent Operations

Structured Concurrency fundamentally changes how we think about and write concurrent code. Instead of individual threads or tasks operating independently in a flat global namespace, it introduces the concept of a "task group" or a "scope" where child tasks are bound to their parent. The lifecycle of the child tasks is tied to the lifecycle of the parent. If the parent scope finishes or fails, all its children are automatically managed (e.g., cancelled).

The Problem Structured Concurrency Solves

Consider a user request that involves fetching data from three different services: userService.getUserProfile(), orderService.getRecentOrders(), and paymentService.getPaymentMethods(). Traditionally, you might:

// Traditional unstructured approach
CompletableFuture<UserProfile> userFuture = CompletableFuture.supplyAsync(() -> userService.getUserProfile());
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(() -> orderService.getRecentOrders());
CompletableFuture<List<PaymentMethod>> paymentsFuture = CompletableFuture.supplyAsync(() -> paymentService.getPaymentMethods());

try {
    UserProfile user = userFuture.get(5, TimeUnit.SECONDS);
    List<Order> orders = ordersFuture.get(5, TimeUnit.SECONDS);
    List<PaymentMethod> payments = paymentsFuture.get(5, TimeUnit.SECONDS);
    // Combine results
} catch (Exception e) {
    // Handle error, but what about cancelling the other futures?
    // It's not automatic. You might need to manually cancel.
}

This approach has several drawbacks:

  1. Implicit Cancellation: If userFuture fails, ordersFuture and paymentsFuture might continue running unnecessarily, wasting resources. Manually canceling them is boilerplate.
  2. Error Propagation: Propagating errors from children back to the parent isn't always straightforward.
  3. Lifecycle Management: The lifecycle of child tasks isn't explicitly tied to the parent's, leading to potential resource leaks if not carefully managed.
  4. Readability: For complex fan-out/fan-in patterns, the CompletableFuture API can become dense.

StructuredTaskScope: The Heart of Structured Concurrency

The core abstraction for Structured Concurrency is java.util.concurrent.StructuredTaskScope. It acts as a supervisory entity for a group of concurrently executing tasks (children). All tasks within a scope complete or fail together.

When you create a StructuredTaskScope, you define a block of code. Within this block, you fork() child tasks. The StructuredTaskScope provides two primary policies for handling task completion and errors:

  1. StructuredTaskScope.ShutdownOnFailure: This is the most common policy. If any child task fails (throws an exception), the scope immediately shuts down, interrupting any other running children. The parent task will then receive the exception from the first failed child. This is ideal for scenarios where the failure of one sub-task renders the entire operation meaningless.

  2. StructuredTaskScope.ShutdownOnSuccess: If any child task completes successfully, the scope immediately shuts down, interrupting other running children. The parent task will receive the result from the first successfully completed child. This is useful for "race to success" scenarios (e.g., fetching data from multiple mirrored services, taking the first response).

After forking tasks, you join() the scope. This blocks the current thread until all children either complete (successfully or with an exception) or the scope shuts down due to its policy. After join(), you can throwIfFailed() (for ShutdownOnFailure) or result() (for ShutdownOnSuccess) to get the outcome. Importantly, StructuredTaskScope implements AutoCloseable, ensuring that resources are properly cleaned up using a try-with-resources statement.

Structured Concurrency and Virtual Threads

This is where it gets really exciting. StructuredTaskScope is designed to work seamlessly with Virtual Threads. When you fork() a task, you can explicitly tell it to use a Virtual Thread:

scope.fork(() -> {
    // This task will run on a Virtual Thread
    return someIoBoundOperation();
});

Because Virtual Threads are cheap and abundant, you can fork hundreds or thousands of them within a StructuredTaskScope without worrying about thread pool exhaustion or significant context switching overhead. This allows you to truly embrace a synchronous-style, imperative coding model for highly concurrent, I/O-bound operations, benefiting from simplified error handling and cancellation provided by Structured Concurrency.

Code Implementation: Aggregating Data with Structure in Spring Boot

Let's implement our user profile aggregation example using StructuredTaskScope and Virtual Threads in a Spring Boot service.

First, ensure you're using Java 21+ (or Java 25) and Spring Boot 4.0. You might need to configure your pom.xml to use the preview features if not on a GA version where they are standard. For Spring Boot 4.0, Virtual Threads are first-class citizens.

Let's define some dummy services to simulate external calls:

// src/main/java/com/example/structuredconcurrency/service/ExternalUserService.java
package com.example.structuredconcurrency.service;

import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

@Service
public class ExternalUserService {

    public UserProfile getUserProfile(String userId) throws InterruptedException {
        System.out.printf("[%s] Fetching user profile for %s%n", Thread.currentThread().getName(), userId);
        Thread.sleep(Duration.ofMillis(ThreadLocalRandom.current().nextInt(200, 500))); // Simulate network delay
        if (ThreadLocalRandom.current().nextDouble() < 0.1) { // Simulate 10% failure
            throw new RuntimeException("Failed to fetch user profile for " + userId);
        }
        return new UserProfile(userId, "John Doe", "john.doe@example.com");
    }
}

// src/main/java/com/example/structuredconcurrency/service/ExternalOrderService.java
package com.example.structuredconcurrency.service;

import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@Service
public class ExternalOrderService {

    public List<Order> getRecentOrders(String userId) throws InterruptedException {
        System.out.printf("[%s] Fetching recent orders for %s%n", Thread.currentThread().getName(), userId);
        Thread.sleep(Duration.ofMillis(ThreadLocalRandom.current().nextInt(150, 400))); // Simulate network delay
        if (ThreadLocalRandom.current().nextDouble() < 0.05) { // Simulate 5% failure
            throw new RuntimeException("Failed to fetch recent orders for " + userId);
        }
        return List.of(new Order("ORD-001", 99.99), new Order("ORD-002", 15.50));
    }
}

// src/main/java/com/example/structuredconcurrency/service/ExternalPaymentService.java
package com.example.structuredconcurrency.service;

import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@Service
public class ExternalPaymentService {

    public List<PaymentMethod> getPaymentMethods(String userId) throws InterruptedException {
        System.out.printf("[%s] Fetching payment methods for %s%n", Thread.currentThread().getName(), userId);
        Thread.sleep(Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 300))); // Simulate network delay
        // No failure for payment methods for this example
        return List.of(new PaymentMethod("Visa", "**** 1234"), new PaymentMethod("PayPal", "john.doe@paypal.com"));
    }
}

And their respective DTOs:

// src/main/java/com/example/structuredconcurrency/service/UserProfile.java
package com.example.structuredconcurrency.service;

public record UserProfile(String id, String name, String email) {}

// src/main/java/com/example/structuredconcurrency/service/Order.java
package com.example.structuredconcurrency.service;

public record Order(String orderId, double amount) {}

// src/main/java/com/example/structuredconcurrency/service/PaymentMethod.java
package com.example.structuredconcurrency.service;

public record PaymentMethod(String type, String details) {}

Now, let's create our aggregated service using StructuredTaskScope and Virtual Threads. We'll use @RestController to expose an endpoint.

// src/main/java/com/example/structuredconcurrency/controller/UserController.java
package com.example.structuredconcurrency.controller;

import com.example.structuredconcurrency.service.ExternalOrderService;
import com.example.structuredconcurrency.service.ExternalPaymentService;
import com.example.structuredconcurrency.service.ExternalUserService;
import com.example.structuredconcurrency.service.Order;
import com.example.structuredconcurrency.service.PaymentMethod;
import com.example.structuredconcurrency.service.UserProfile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors; // For Virtual Thread factory

@RestController
@RequestMapping("/users")
public class UserController {

    private final ExternalUserService userService;
    private final ExternalOrderService orderService;
    private final ExternalPaymentService paymentService;

    public UserController(ExternalUserService userService, ExternalOrderService orderService, ExternalPaymentService paymentService) {
        this.userService = userService;
        this.orderService = orderService;
        this.paymentService = paymentService;
    }

    public record UserDetails(UserProfile userProfile, List<Order> recentOrders, List<PaymentMethod> paymentMethods) {}

    @GetMapping("/{userId}/details")
    public UserDetails getUserDetails(@PathVariable String userId) throws Exception {
        System.out.printf("Parent Thread: [%s] - Starting request for user %s%n", Thread.currentThread().getName(), userId);

        // Define a Virtual Thread factory if Spring Boot's default isn't configured for it
        // In Spring Boot 4.0, `spring.threads.virtual.enabled=true` handles this automatically
        // For explicit usage or older versions, you might need:
        // ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();

        // Using ShutdownOnFailure policy: if any sub-task fails, the whole scope fails
        try (var scope = new StructuredTaskScope.ShutdownOnFailure("user-details", Thread.ofVirtual().factory())) { // Use Virtual Threads
            Future<UserProfile> userProfileFuture = scope.fork(() -> userService.getUserProfile(userId));
            Future<List<Order>> ordersFuture = scope.fork(() -> orderService.getRecentOrders(userId));
            Future<List<PaymentMethod>> paymentsFuture = scope.fork(() -> paymentService.getPaymentMethods(userId));

            // Join blocks until all tasks complete or the scope shuts down due to failure
            scope.join();
            // If any task failed, throwIfFailed will rethrow the exception
            scope.throwIfFailed();

            // All tasks completed successfully, retrieve their results
            UserProfile userProfile = userProfileFuture.resultNow();
            List<Order> orders = ordersFuture.resultNow();
            List<PaymentMethod> paymentMethods = paymentsFuture.resultNow();

            System.out.printf("Parent Thread: [%s] - Completed request for user %s%n", Thread.currentThread().getName(), userId);
            return new UserDetails(userProfile, orders, paymentMethods);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // Restore interrupt status
            System.err.printf("Parent Thread: [%s] - Request for user %s was interrupted: %s%n", Thread.currentThread().getName(), userId, e.getMessage());
            throw new RuntimeException("Request interrupted", e);
        } catch (Exception e) {
            System.err.printf("Parent Thread: [%s] - Failed to get user details for %s: %s%n", Thread.currentThread().getName(), userId, e.getMessage());
            throw e; // Re-throw the exception from the failed child task
        }
    }

    // Example with ShutdownOnSuccess: Race to find a payment method (e.g., from multiple providers)
    @GetMapping("/{userId}/fastest-payment")
    public PaymentMethod getFastestPaymentMethod(@PathVariable String userId) throws Exception {
        System.out.printf("Parent Thread: [%s] - Starting fastest payment request for user %s%n", Thread.currentThread().getName(), userId);

        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<>("fast-payment", Thread.ofVirtual().factory())) {
            Future<PaymentMethod> primaryPaymentFuture = scope.fork(() -> {
                Thread.sleep(Duration.ofMillis(ThreadLocalRandom.current().nextInt(250, 400)));
                System.out.printf("[%s] Primary payment method service completed.%n", Thread.currentThread().getName());
                return new PaymentMethod("Primary-Card", "**** 5678");
            });

            Future<PaymentMethod> secondaryPaymentFuture = scope.fork(() -> {
                Thread.sleep(Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 200)));
                System.out.printf("[%s] Secondary payment method service completed.%n", Thread.currentThread().getName());
                return new PaymentMethod("Secondary-EWallet", "user@ewallet.com");
            });

            scope.join(); // Blocks until one task succeeds or all fail/cancel
            // After join, the first successful task's result is available, others are cancelled.
            PaymentMethod fastestMethod = scope.result(); // Retrieves the result from the first successful task

            System.out.printf("Parent Thread: [%s] - Completed fastest payment request for user %s%n", Thread.currentThread().getName(), userId);
            return fastestMethod;

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.printf("Parent Thread: [%s] - Fastest payment request for user %s was interrupted: %s%n", Thread.currentThread().getName(), userId, e.getMessage());
            throw new RuntimeException("Request interrupted", e);
        } catch (Exception e) {
            System.err.printf("Parent Thread: [%s] - Failed to get fastest payment method for %s: %s%n", Thread.currentThread().getName(), userId, e.getMessage());
            throw e;
        }
    }
}

Spring Boot 4.0 Configuration (application.properties): To ensure Spring Boot uses Virtual Threads for its general task execution (e.g., for CompletableFuture.supplyAsync if you were still using it, or for the main HTTP request handling thread if you wanted), you can enable it:

# application.properties
spring.threads.virtual.enabled=true

With spring.threads.virtual.enabled=true, any TaskExecutor that Spring Boot provides by default for async operations will likely leverage Virtual Threads, and the main request handling thread might also be a Virtual Thread (depending on the embedded server configuration). However, for StructuredTaskScope, explicitly providing Thread.ofVirtual().factory() guarantees that the forked tasks run on Virtual Threads, giving you precise control and ensuring that the blocking join() call doesn't tie up platform threads.

Explanation of the getUserDetails Method:

  1. try (var scope = new StructuredTaskScope.ShutdownOnFailure(...)): We create a StructuredTaskScope with the ShutdownOnFailure policy. This means if getUserProfile, getRecentOrders, or getPaymentMethods throws an exception, the scope will immediately shut down, interrupting any other tasks still running. We also provide Thread.ofVirtual().factory() to ensure the forked tasks run on Virtual Threads.
  2. scope.fork(() -> ...): Each external service call is wrapped in a scope.fork() call. This submits the task to be run concurrently within the scope and returns a Future for its result. These tasks are executed on Virtual Threads.
  3. scope.join(): The parent thread waits here until all child tasks complete or the scope shuts down due to its policy (i.e., one task fails in ShutdownOnFailure).
  4. scope.throwIfFailed(): If the scope shut down because a child task failed, this method re-throws the exception from the first failing child. This means you only need one catch block for all potential failures within the scope.
  5. future.resultNow(): Once join() and throwIfFailed() indicate success, we can safely retrieve the results from the Future instances. Unlike Future.get(), resultNow() is a non-blocking call that directly returns the result (or throws an exception if the task failed, but throwIfFailed already handled that).

This example demonstrates how Structured Concurrency, especially with Virtual Threads, allows you to write highly concurrent, I/O-bound code in a sequential, imperative style, without the complexity of manual cancellation, error handling, or managing large thread pools.

Considerations and Trade-offs

While Structured Concurrency, especially with Virtual Threads, offers compelling advantages, it's crucial to understand its nuances:

  • When to Use It: Structured Concurrency excels in situations where you have a clear parent-child relationship between tasks and a well-defined boundary for their execution. The fan-out/fan-in pattern for aggregating data, processing batches of items that logically belong together, or implementing short-lived concurrent workflows are prime candidates. It shines for I/O-bound tasks where Virtual Threads are a natural fit.
  • Error Handling and Cancellation: This is arguably its biggest strength. The ShutdownOnFailure policy provides robust "fail-fast" behavior, simplifying error handling significantly. The automatic cancellation of sibling tasks upon failure prevents wasted computation and resource leaks.
  • Observability: Because tasks are logically grouped, it becomes easier to monitor the health and progress of an entire operation rather than tracking individual Future instances. Context propagation (like tracing IDs) across tasks within a scope is also more natural.
  • Performance Implications: Structured Concurrency itself is a programming model, not primarily a performance optimization. Its combination with Virtual Threads is where the performance gains come from for I/O-bound tasks, as it allows maximal utilization of platform threads without context-switching overhead for blocked operations. For CPU-bound tasks, platform threads and careful thread pool management are still essential.
  • Alternative Paradigms (Reactive Programming): Structured Concurrency with Virtual Threads offers an imperative alternative to reactive programming frameworks (like Reactor or RxJava) for certain types of asynchronous operations. For complex data streams, backpressure, and highly asynchronous event-driven pipelines, reactive programming might still be a better fit. However, for many common microservice patterns, Structured Concurrency can achieve similar goals with a more traditional, easier-to-reason-about code style.
  • Scope Overhead: While minimal, there's a slight overhead in creating and managing StructuredTaskScope instances compared to raw Thread.start() or ExecutorService.submit(). This is a small price to pay for the significant benefits in reliability and maintainability.
  • Thread Local Limitations: While Virtual Threads can use ThreadLocals, it's generally discouraged for widespread use due to potential memory leaks if not managed carefully across the lifecycle of numerous short-lived threads. Structured Concurrency can help manage context implicitly, but for complex context propagation, consider InheritableThreadLocal (with care) or explicit parameter passing. Spring's context propagation mechanisms often handle this for you in an HTTP request scope.

Conclusion

Structured Concurrency, in conjunction with Java Virtual Threads, represents a significant leap forward in how we approach concurrent programming in the Java ecosystem. For Senior Backend Engineers operating in a Spring Boot microservices world, it provides a powerful, elegant, and robust way to build resilient and performant asynchronous flows.

By formalizing the relationship between parent and child tasks, Structured Concurrency eliminates much of the boilerplate associated with manual cancellation and complex error propagation. It allows us to write concurrent code that is not only more efficient (thanks to Virtual Threads) but also dramatically easier to reason about, maintain, and debug. Embracing this pattern will lead to more reliable systems and a clearer understanding of your application's concurrent behavior, freeing you to tackle even deeper architectural challenges.