Published on

Building Stateful Microservices: Rehydrating Domain Aggregates from Kafka Event Streams with Spring Boot and JPA

Authors
  • avatar
    Name
    Maria
    Twitter

Introduction: The Challenge of Distributed State in Event-Driven Systems

In a landscape dominated by microservices and event-driven architectures, direct database coupling between services is a strict anti-pattern. While this promotes autonomy and scalability, it introduces a critical challenge: how does a service maintain a consistent and up-to-date view of the domain data it depends on, if that data is owned by another service? For instance, an Order service needs to know the current price and stock level of a Product, but the Product service owns the authoritative source of this data. Constantly querying the Product service introduces tight coupling, latency, and potential points of failure.

The solution often lies in consuming events. But simply consuming events isn't enough; a service must effectively rehydrate or update its own local representation of the relevant domain aggregate, maintaining its internal state autonomously. This post will guide you through building stateful microservices with Spring Boot 4.0 and JPA, leveraging Apache Kafka to keep your domain aggregates current and consistent without direct service-to-service RPC calls.

Deep Dive: Rehydrating Domain Aggregates from Event Streams

The core idea is to transform transient events, which represent facts about what has happened, into the current, authoritative state of a domain aggregate within a specific microservice. Instead of querying another service's API for the current Product details, our Order service will subscribe to Product-related events (e.g., ProductPriceUpdated, ProductStockChanged) and use these events to update its own local Product aggregate. This local aggregate then becomes the source of truth for that specific service's bounded context.

This pattern is distinct from Event Sourcing, where the sequence of events is the aggregate state. Here, events are merely the input to update a traditional, mutable aggregate persisted in a relational database. It also differs from CQRS materialized views, which are typically read-only projections. Our goal is to build an aggregate that is part of the service's write model and domain logic.

Key Principles:

  1. Event-Driven Updates: Services emit domain events (e.g., ProductPriceUpdatedEvent) whenever their authoritative data changes.
  2. Autonomous State: Each service maintains its own local, authoritative copy of dependent aggregates, derived from these events.
  3. Eventually Consistent: The local aggregate state will eventually reflect the global truth as events are processed. This implies handling potential eventual consistency implications.
  4. Idempotency: Event consumers must be idempotent, meaning processing the same event multiple times has the same effect as processing it once. This is crucial for Kafka's at-least-once delivery guarantee.
  5. Clear Ownership: While a service maintains a local copy of an aggregate, it does not become the authoritative owner. It is merely reacting to changes owned by another service.

The Flow:

  1. Source Service (e.g., ProductService): Publishes domain events to Kafka (e.g., ProductPriceUpdatedEvent, ProductStockChangedEvent) when a product's state changes.
  2. Target Service (e.g., OrderService): Contains a Kafka consumer that listens for these Product events.
  3. Event Processing: Upon receiving an event, the OrderService maps the event data to its local Product aggregate, updates its state, and persists it using JPA.

This approach provides strong decoupling, resilience (if the ProductService is down, OrderService can still operate on its last known product state), and often improves performance by eliminating remote calls for frequently accessed data.

Code Implementation: Rehydrating Product Aggregates in an Order Service

Let's illustrate this with a concrete Spring Boot example. Imagine an OrderService that needs to know about Product details. It will consume ProductEvents published by a ProductService.

First, define our event contracts. We'll use simple records for clarity.

// Common library or shared DTOs
public record ProductPriceUpdatedEvent(
    String productId,
    double newPrice,
    long timestamp,
    String eventId // For idempotency
) {}

public record ProductStockChangedEvent(
    String productId,
    int newStock,
    long timestamp,
    String eventId // For idempotency
) {}

public record ProductCreatedEvent(
    String productId,
    String name,
    String description,
    double price,
    int stock,
    long timestamp,
    String eventId // For idempotency
) {}

Now, the OrderService will maintain its own Product aggregate. Note that this Product entity is distinct from the Product entity owned by the ProductService. It only contains the data the OrderService needs.

// Order Service - Local Product Aggregate
package com.example.orderservice.product.domain;

import jakarta.persistence.*;
import java.time.Instant;

@Entity
@Table(name = "order_service_products") // Differentiate from actual Product service table
public class Product {

    @Id
    private String id;
    private String name;
    private String description;
    private double price;
    private int stock;
    private Instant lastUpdated;
    private String lastProcessedEventId; // For idempotency

    public Product() {}

    public Product(String id, String name, String description, double price, int stock, Instant lastUpdated, String lastProcessedEventId) {
        this.id = id;
        this.name = name;
        this.description = description;
        this.price = price;
        this.stock = stock;
        this.lastUpdated = lastUpdated;
        this.lastProcessedEventId = lastProcessedEventId;
    }

    // Getters
    public String getId() { return id; }
    public String getName() { return name; }
    public String getDescription() { return description; }
    public double getPrice() { return price; }
    public int getStock() { return stock; }
    public Instant getLastUpdated() { return lastUpdated; }
    public String getLastProcessedEventId() { return lastProcessedEventId; }

    // Setters - Used by internal service logic or event processing
    public void setName(String name) { this.name = name; }
    public void setDescription(String description) { this.description = description; }
    public void setPrice(double price) { this.price = price; }
    public void setStock(int stock) { this.stock = stock; }
    public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
    public void setLastProcessedEventId(String lastProcessedEventId) { this.lastProcessedEventId = lastProcessedEventId; }

    // Business logic methods might go here
    public void updatePrice(double newPrice, Instant updateTime, String eventId) {
        this.price = newPrice;
        this.lastUpdated = updateTime;
        this.lastProcessedEventId = eventId;
    }

    public void updateStock(int newStock, Instant updateTime, String eventId) {
        this.stock = newStock;
        this.lastUpdated = updateTime;
        this.lastProcessedEventId = eventId;
    }

    // Builder pattern or more sophisticated constructors could be used
}

Next, our JPA repository for this local Product aggregate:

// Order Service - Product Repository
package com.example.orderservice.product.infrastructure;

import com.example.orderservice.product.domain.Product;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.Optional;

public interface ProductRepository extends JpaRepository<Product, String> {
    Optional<Product> findById(String id);
}

Now, the core event consumer logic. We will create a ProductEventHandler that listens to Kafka topics and updates our local Product aggregate. Idempotency is crucial here, implemented by checking lastProcessedEventId.

// Order Service - Product Event Handler
package com.example.orderservice.product.application;

import com.example.orderservice.product.domain.Product;
import com.example.orderservice.product.infrastructure.ProductRepository;
import com.example.orderservice.product.domain.ProductCreatedEvent;
import com.example.orderservice.product.domain.ProductPriceUpdatedEvent;
import com.example.orderservice.product.domain.ProductStockChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.util.Optional;

@Service
public class ProductEventHandler {

    private static final Logger log = LoggerFactory.getLogger(ProductEventHandler.class);
    private final ProductRepository productRepository;

    public ProductEventHandler(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    @KafkaListener(topics = "${kafka.topics.product-created}", groupId = "${spring.kafka.consumer.group-id}")
    @Transactional
    public void handleProductCreated(ProductCreatedEvent event) {
        log.info("Received ProductCreatedEvent for product ID: {}", event.productId());

        // Idempotency check: if an event with this ID was already processed, skip.
        // For creation, we check if the product already exists based on ID.
        if (productRepository.existsById(event.productId())) {
            log.warn("Product with ID {} already exists. Skipping creation event {}.", event.productId(), event.eventId());
            return;
        }

        Product newProduct = new Product(
            event.productId(),
            event.name(),
            event.description(),
            event.price(),
            event.stock(),
            Instant.ofEpochMilli(event.timestamp()),
            event.eventId()
        );
        productRepository.save(newProduct);
        log.info("Product {} created in Order Service's local store.", newProduct.getId());
    }

    @KafkaListener(topics = "${kafka.topics.product-price-updated}", groupId = "${spring.kafka.consumer.group-id}")
    @Transactional
    public void handleProductPriceUpdated(ProductPriceUpdatedEvent event) {
        log.info("Received ProductPriceUpdatedEvent for product ID: {}", event.productId());
        processProductUpdate(event.productId(), event.eventId(), product -> {
            // Check if this event is newer than the last processed event
            // Or if the current event ID is the same (already processed)
            if (event.eventId().equals(product.getLastProcessedEventId())) {
                log.warn("Duplicate ProductPriceUpdatedEvent for product {}. Event ID {}. Skipping.", event.productId(), event.eventId());
                return false; // Indicate no change
            }
            if (Instant.ofEpochMilli(event.timestamp()).isBefore(product.getLastUpdated())) {
                log.warn("Out-of-order ProductPriceUpdatedEvent for product {}. Event timestamp {} is older than last update {}. Skipping.",
                        event.productId(), event.timestamp(), product.getLastUpdated());
                return false; // Indicate no change
            }
            product.updatePrice(event.newPrice(), Instant.ofEpochMilli(event.timestamp()), event.eventId());
            log.info("Product {} price updated to {}.", product.getId(), product.getPrice());
            return true; // Indicate change
        });
    }

    @KafkaListener(topics = "${kafka.topics.product-stock-changed}", groupId = "${spring.kafka.consumer.group-id}")
    @Transactional
    public void handleProductStockChanged(ProductStockChangedEvent event) {
        log.info("Received ProductStockChangedEvent for product ID: {}", event.productId());
        processProductUpdate(event.productId(), event.eventId(), product -> {
            if (event.eventId().equals(product.getLastProcessedEventId())) {
                log.warn("Duplicate ProductStockChangedEvent for product {}. Event ID {}. Skipping.", event.productId(), event.eventId());
                return false; // Indicate no change
            }
            if (Instant.ofEpochMilli(event.timestamp()).isBefore(product.getLastUpdated())) {
                log.warn("Out-of-order ProductStockChangedEvent for product {}. Event timestamp {} is older than last update {}. Skipping.",
                        event.productId(), event.timestamp(), product.getLastUpdated());
                return false; // Indicate no change
            }
            product.updateStock(event.newStock(), Instant.ofEpochMilli(event.timestamp()), event.eventId());
            log.info("Product {} stock updated to {}.", product.getId(), product.getStock());
            return true; // Indicate change
        });
    }

    // Helper method for common update logic and idempotency
    private void processProductUpdate(String productId, String eventId, java.util.function.Function<Product, Boolean> updateFunction) {
        Optional<Product> productOpt = productRepository.findById(productId);
        if (productOpt.isPresent()) {
            Product product = productOpt.get();
            if (updateFunction.apply(product)) { // Apply the specific update function and check if a change occurred
                productRepository.save(product);
            }
        } else {
            log.warn("Product with ID {} not found in local store. This could indicate an out-of-order event (update before creation) or a missing initial creation event. Event ID {}.", productId, eventId);
            // Depending on business logic, you might want to create a placeholder or trigger a re-sync.
            // For simplicity, we'll log and ignore for this example.
        }
    }
}

Configuration (application.yml):

spring:
  application:
    name: order-service
  datasource:
    url: jdbc:postgresql://localhost:5432/order_db
    username: user
    password: password
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
    properties:
      hibernate:
        format_sql: true
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-service-product-consumer-group
      auto-offset-reset: earliest # Important for initial sync
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: 'com.example.orderservice.product.domain' # Specify package for event deserialization
        spring.json.type.mapping: 'productCreatedEvent:com.example.orderservice.product.domain.ProductCreatedEvent,productPriceUpdatedEvent:com.example.orderservice.product.domain.ProductPriceUpdatedEvent,productStockChangedEvent:com.example.orderservice.product.domain.ProductStockChangedEvent'

kafka:
  topics:
    product-created: product-created-events
    product-price-updated: product-price-updated-events
    product-stock-changed: product-stock-changed-events

Note on Deserialization: For JsonDeserializer to work with multiple event types on the same topic or distinct topics with different types, you often need to configure spring.json.type.mapping or include a __TypeId__ header in the Kafka message. For this example, we assume separate topics, making spring.json.type.mapping per consumer simpler if all messages on a given topic are of the same explicit type. If a single topic had mixed event types, you would typically use JsonDeserializer with a DefaultJackson2JavaTypeMapper that inspects a type header. For simplicity and clarity in this example, each Kafka listener is bound to a specific topic with a known event type.

This setup ensures that:

  • Each event type is handled by a dedicated listener.
  • Transactions wrap the event processing, ensuring atomicity of database updates.
  • Idempotency is maintained by checking the lastProcessedEventId and event timestamps, preventing duplicate or out-of-order processing issues.
  • The OrderService now has a local, up-to-date Product aggregate that its own domain logic can directly query, eliminating remote calls.

Considerations and Trade-offs

While powerful, this pattern comes with its own set of considerations:

  1. Eventual Consistency: The local aggregate will only be eventually consistent with the source. This means there might be a short delay where the OrderService has slightly stale Product data. Domain logic must account for this.
  2. Idempotency is Paramount: As demonstrated, robust idempotency checks are non-negotiable. Without them, duplicate events (guaranteed by Kafka's at-least-once delivery) will corrupt your local state.
  3. Initial State Bootstrap: When a new OrderService instance starts or an existing one is brought online, how does it get its initial Product data?
    • Kafka earliest offset: Configure the consumer to read from the beginning of the topic (auto-offset-reset: earliest). This is common for smaller, non-critical datasets.
    • Snapshotting + Events: For very large datasets, the ProductService could provide an initial snapshot via a dedicated API, followed by event stream processing for subsequent updates.
    • Dedicated "Snapshot" Topic: The ProductService could publish a complete state snapshot periodically to a specific Kafka topic, which the OrderService consumes.
  4. Out-of-Order Events: While Kafka guarantees order within a partition, events from different partitions or topics (or even the same partition if re-partitioning occurs) might arrive out of order from a global perspective. Using timestamps and event IDs to determine the "latest" state, as shown in the example, is a common mitigation.
  5. Schema Evolution: As event schemas evolve, consumers need to be robust enough to handle older versions or migrate data. Using schema registries (like Confluent Schema Registry) with Avro or Protobuf is highly recommended for production systems to manage compatibility.
  6. Data Duplication: This pattern involves duplicating data across services. This is a deliberate trade-off for autonomy and reduced coupling. It means more storage and potentially more complex data migration strategies.
  7. Error Handling & Dead-Letter Queues (DLQ): What happens if an event cannot be processed (e.g., deserialization error, business validation failure)? Integrating a DLQ mechanism is crucial to capture and reprocess failed messages, preventing consumer blockages. Spring for Kafka offers good support for DLQs.
  8. Resource Consumption: Running numerous Kafka consumers can consume network and CPU resources. Careful sizing and scaling of consumer groups are necessary.

Conclusion

Building stateful microservices by rehydrating domain aggregates from Kafka event streams is a powerful pattern that fosters autonomy, resilience, and scalability in complex event-driven architectures. By embracing eventual consistency and meticulously implementing idempotency, you can decouple services, reduce chatty network calls, and ensure each service has the authoritative data it needs within its own bounded context. While it introduces challenges like initial state synchronization and schema evolution, the benefits in terms of architectural flexibility and system robustness make it an essential technique for modern backend development with Spring Boot, JPA, and Apache Kafka. Master this pattern, and you'll unlock a new level of microservice design proficiency.