- Published on
Mastering Real-time Projections: Building Materialized Views with Kafka Streams, Spring Boot, and PostgreSQL
- Authors

- Name
- Maria
Introduction: The Challenge of Real-time Read Models in Event-Driven Systems
In the world of scalable microservices, especially those embracing Event Sourcing and CQRS (Command Query Responsibility Segregation), a persistent challenge emerges: how do you efficiently construct and maintain real-time, query-optimized read models (often called projections or materialized views) from a continuous stream of events? While simple event consumers can handle basic updates, the moment your read model requires complex aggregations, joins across different event types, or stateful transformations, traditional approaches often fall short.
Imagine an e-commerce platform. Orders are created, items are added or removed, and order statuses change. To display a customer's current order summary, including total value, item counts, and status, a simple OrderCreatedEvent consumer won't cut it. You need to react to a cascade of events, aggregate their data, and present a coherent, up-to-date view. Building such a system robustly and at scale, ensuring low-latency updates and consistency, can quickly become a significant architectural hurdle.
This is where Kafka Streams shines. It provides a powerful, distributed stream processing library perfectly suited for transforming raw event streams into derived, denormalized datasets ideal for query services. In this deep dive, we'll explore how to harness Kafka Streams within a Spring Boot application to build real-time materialized views, persisting them into PostgreSQL for efficient querying.
Deep Dive: Kafka Streams for Stateful Event Projection
At its core, Kafka Streams is a client-side library for building applications that process and analyze data stored in Kafka. Unlike traditional message consumers, Kafka Streams allows you to treat Kafka topics not just as message queues, but as databases or changelog streams, enabling sophisticated operations like aggregations, joins, windowing, and stateful transformations.
Why is this particularly well-suited for building read models?
- Stateful Processing: Many projections require maintaining a running state (e.g., the current total value of an order). Kafka Streams uses state stores (backed by RocksDB) that are co-located with the processing tasks, providing fast, fault-tolerant access to this state.
- Exactly-Once Semantics: Crucial for financial or critical data, Kafka Streams can guarantee exactly-once processing for its internal operations and when producing messages to other Kafka topics, ensuring data integrity even in the face of failures.
- Scalability and Fault Tolerance: Built on Kafka's consumer group mechanism, Kafka Streams applications scale horizontally by simply adding more instances. If an instance fails, Kafka reassigns its partitions and state stores to other instances, allowing processing to resume seamlessly.
- Rich DSL (Domain Specific Language): It offers a high-level API for common stream processing patterns (
map,filter,groupBy,aggregate,join,window) and a lower-level Processor API for custom logic, making complex topologies manageable. - KStream vs. KTable: Understanding the distinction between KStream (a record-by-record stream of events) and KTable (a continuously updated changelog stream representing the state of a table) is fundamental. For materialized views, KTable is often your go-to, as it inherently models a table whose state changes over time, much like a database table.
In our scenario, we'll take raw order events (OrderCreatedEvent, OrderItemAddedEvent, OrderItemRemovedEvent, OrderConfirmedEvent) from various Kafka topics. Our Kafka Streams application will merge these streams, aggregate them into a single CustomerOrderSummaryProjection per order, and continuously update this projection. The changes to this aggregated KTable will then be observed and persisted to our PostgreSQL database, serving as the read model for downstream services or UI.
This pattern centralizes the complex aggregation logic in a dedicated, scalable stream processing application, decoupling it from both the command-side write model and the query-side read model's direct event consumption.
Code Implementation: Building the Order Summary Projection
Let's walk through a concrete example. We'll define several events, a JPA entity for our materialized view, a Spring Data JPA repository, and finally, the Kafka Streams configuration to stitch it all together.
First, our event definitions. To enable polymorphic deserialization for JsonSerde<OrderEvent>, we use Jackson's @JsonTypeInfo and @JsonSubTypes annotations.
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;
// Base interface for all order-related events for polymorphic serialization
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = OrderCreatedEvent.class, name = "OrderCreated"),
@JsonSubTypes.Type(value = OrderItemAddedEvent.class, name = "OrderItemAdded"),
@JsonSubTypes.Type(value = OrderItemRemovedEvent.class, name = "OrderItemRemoved"),
@JsonSubTypes.Type(value = OrderConfirmedEvent.class, name = "OrderConfirmed")
})
interface OrderEvent {}
record OrderCreatedEvent(
UUID orderId,
UUID customerId,
Instant timestamp
) implements OrderEvent {}
record OrderItemAddedEvent(
UUID orderId,
UUID itemId,
String productName,
BigDecimal price,
int quantity,
Instant timestamp
) implements OrderEvent {}
record OrderItemRemovedEvent(
UUID orderId,
UUID itemId,
String productName,
BigDecimal price,
int quantity,
Instant timestamp
) implements OrderEvent {}
record OrderConfirmedEvent(
UUID orderId,
Instant confirmationTimestamp
) implements OrderEvent {}
Next, our JPA entity for the CustomerOrderSummaryProjection and its corresponding Spring Data JPA repository. This will represent the materialized view stored in PostgreSQL.
import jakarta.persistence.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;
import org.springframework.data.jpa.repository.JpaRepository;
@Entity
@Table(name = "customer_order_summary")
class CustomerOrderSummaryProjection {
@Id
private UUID orderId;
private UUID customerId;
private String status; // e.g., CREATED, CONFIRMED
private BigDecimal totalValue;
private int totalItems;
private Instant lastUpdated;
// JPA requires no-arg constructor
public CustomerOrderSummaryProjection() {
this.totalValue = BigDecimal.ZERO; // Initialize to avoid null pointer issues
}
public CustomerOrderSummaryProjection(UUID orderId, UUID customerId, String status, BigDecimal totalValue, int totalItems, Instant lastUpdated) {
this.orderId = orderId;
this.customerId = customerId;
this.status = status;
this.totalValue = totalValue;
this.totalItems = totalItems;
this.lastUpdated = lastUpdated;
}
// Getters and Setters (omitted for brevity, but essential for JPA)
public UUID getOrderId() { return orderId; }
public void setOrderId(UUID orderId) { this.orderId = orderId; }
public UUID getCustomerId() { return customerId; }
public void setCustomerId(UUID customerId) { this.customerId = customerId; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public BigDecimal getTotalValue() { return totalValue; }
public void setTotalValue(BigDecimal totalValue) { this.totalValue = totalValue; }
public int getTotalItems() { return totalItems; }
public void setTotalItems(int totalItems) { this.totalItems = totalItems; }
public Instant getLastUpdated() { return lastUpdated; }
public void setLastUpdated(Instant lastUpdated) { this.lastUpdated = lastUpdated; }
@Override
public String toString() {
return "CustomerOrderSummaryProjection{" +
"orderId=" + orderId +
", customerId=" + customerId +
", status='" + status + '\'' +
", totalValue=" + totalValue +
", totalItems=" + totalItems +
", lastUpdated=" + lastUpdated +
'}';
}
}
interface CustomerOrderSummaryRepository extends JpaRepository<CustomerOrderSummaryProjection, UUID> {}
Finally, our Spring Boot Kafka Streams configuration. This defines the topology for processing our order events and maintaining the CustomerOrderSummaryProjection.
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@Configuration
@EnableKafkaStreams
class KafkaStreamsConfig {
private final CustomerOrderSummaryRepository repository;
private final ObjectMapper objectMapper;
public KafkaStreamsConfig(CustomerOrderSummaryRepository repository) {
this.repository = repository;
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule()); // For Instant serialization/deserialization
}
@Bean
public KStream<UUID, String> kStream(StreamsBuilder builder) {
// Serdes for JSON events and projection
JsonSerde<OrderEvent> orderEventSerde = new JsonSerde<>(OrderEvent.class, objectMapper);
JsonSerde<CustomerOrderSummaryProjection> projectionSerde = new JsonSerde<>(CustomerOrderSummaryProjection.class, objectMapper);
// 1. Create KStreams for each event type
// Use specific serdes for consumption for robustness, then cast to generic OrderEvent
KStream<UUID, OrderEvent> createdStream =
builder.stream("order-created-events", org.apache.kafka.streams.kstream.Consumed.with(Serdes.UUID(), new JsonSerde<>(OrderCreatedEvent.class, objectMapper)))
.mapValues(event -> (OrderEvent) event);
KStream<UUID, OrderEvent> itemAddedStream =
builder.stream("order-item-added-events", org.apache.kafka.streams.kstream.Consumed.with(Serdes.UUID(), new JsonSerde<>(OrderItemAddedEvent.class, objectMapper)))
.mapValues(event -> (OrderEvent) event);
KStream<UUID, OrderEvent> itemRemovedStream =
builder.stream("order-item-removed-events", org.apache.kafka.streams.kstream.Consumed.with(Serdes.UUID(), new JsonSerde<>(OrderItemRemovedEvent.class, objectMapper)))
.mapValues(event -> (OrderEvent) event);
KStream<UUID, OrderEvent> confirmedStream =
builder.stream("order-confirmed-events", org.apache.kafka.streams.kstream.Consumed.with(Serdes.UUID(), new JsonSerde<>(OrderConfirmedEvent.class, objectMapper)))
.mapValues(event -> (OrderEvent) event);
// 2. Merge all event streams into a single stream for processing
KStream<UUID, OrderEvent> allOrderEvents =
createdStream.merge(itemAddedStream)
.merge(itemRemovedStream)
.merge(confirmedStream);
// 3. Aggregate into a KTable representing the CustomerOrderSummaryProjection
KTable<UUID, CustomerOrderSummaryProjection> orderSummariesTable =
allOrderEvents
.groupByKey(org.apache.kafka.streams.kstream.Grouped.with(Serdes.UUID(), orderEventSerde))
.aggregate(
CustomerOrderSummaryProjection::new, // Initializer: Creates a new projection for a new orderId
(orderId, event, aggregate) -> { // Aggregator: Updates the existing projection based on incoming events
if (aggregate.getOrderId() == null) { // First event for this order (should be OrderCreated)
if (event instanceof OrderCreatedEvent created) {
aggregate.setOrderId(created.orderId());
aggregate.setCustomerId(created.customerId());
aggregate.setStatus("CREATED");
aggregate.setTotalValue(BigDecimal.ZERO); // Initial total value
aggregate.setTotalItems(0); // Initial total items
aggregate.setLastUpdated(created.timestamp());
} else {
System.err.println("Warning: Received non-OrderCreatedEvent for new orderId: " + orderId + ". Event: " + event.getClass().getSimpleName());
// In a production system, you might want to dead-letter this or handle it more robustly.
// For this example, we proceed with potentially partial data until OrderCreatedEvent arrives.
// Or even better, ensure OrderCreatedEvent is always the first event for an orderId.
}
} else { // Subsequent events for an existing order
if (event instanceof OrderItemAddedEvent added) {
aggregate.setTotalValue(aggregate.getTotalValue().add(added.price().multiply(BigDecimal.valueOf(added.quantity()))));
aggregate.setTotalItems(aggregate.getTotalItems() + added.quantity());
aggregate.setLastUpdated(added.timestamp());
} else if (event instanceof OrderItemRemovedEvent removed) {
BigDecimal itemValue = removed.price().multiply(BigDecimal.valueOf(removed.quantity()));
aggregate.setTotalValue(aggregate.getTotalValue().subtract(itemValue));
aggregate.setTotalItems(aggregate.getTotalItems() - removed.quantity());
aggregate.setLastUpdated(removed.timestamp());
} else if (event instanceof OrderConfirmedEvent confirmed) {
aggregate.setStatus("CONFIRMED");
aggregate.setLastUpdated(confirmed.confirmationTimestamp());
}
}
return aggregate;
},
(orderId, event, aggregate) -> { // Deductor (retractor) - typically not used for append-only event streams
// For KStream.aggregate, the subtractor is invoked if the input stream itself is a KTable
// and a record is "retracted" or "deleted." For append-only event streams (KStream),
// this typically isn't called, or can be a no-op.
return aggregate;
},
Materialized.<UUID, CustomerOrderSummaryProjection, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("customer-order-summary-store")
.withKeySerde(Serdes.UUID())
.withValueSerde(projectionSerde)
);
// 4. Convert the KTable to a KStream to observe changes and persist to PostgreSQL
// Every update to the KTable (i.e., every new aggregate state) will trigger a record on this stream.
orderSummariesTable
.toStream()
.foreach((orderId, projection) -> {
if (projection != null && projection.getOrderId() != null) { // Ensure it's not a tombstone or uninitialized
try {
// Persist to PostgreSQL - JPA's save acts as an upsert based on @Id
repository.save(projection);
System.out.println("Saved/Updated projection for orderId " + orderId + ": " + projection);
} catch (Exception e) {
System.err.println("Error saving projection for orderId " + orderId + ": " + e.getMessage());
// IMPORTANT: For production, this error handling needs to be robust.
// Consider a Dead-Letter Queue (DLQ) for failed DB writes, or implement retry logic.
// Since `foreach` operates on a KStream (effectively a sink), Kafka Streams'
// exactly-once guarantee doesn't extend to external systems like PostgreSQL directly.
// The DB write here is "at-least-once". Idempotency in `repository.save` is key.
}
} else if (projection == null) {
// This indicates a tombstone record (key with null value), often used for deletion in KTables.
// If you want to delete from DB, implement repository.deleteById(orderId) here.
System.out.println("Received tombstone for orderId: " + orderId + ". Not deleting from DB for this example.");
}
});
return builder.build(); // The builder builds the Kafka Streams topology.
}
}
To run this, you'd need a standard Spring Boot application setup with spring-boot-starter-web, spring-boot-starter-data-jpa, postgresql-driver, spring-kafka-streams, and spring-boot-starter-json dependencies. Configure your application.yml with Kafka broker details, database connection, and Kafka Streams specific properties:
spring:
application:
name: order-projection-service
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: order-summary-projection-app
replication-factor: 1
properties:
default.key.serde: org.apache.kafka.common.serialization.UUIDSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
json.value.default.type: dev.example.OrderEvent # For generic JsonSerde<OrderEvent> to work correctly
# state.dir: /tmp/kafka-streams # Path for RocksDB state stores
datasource:
url: jdbc:postgresql://localhost:5432/orderdb
username: user
password: password
jpa:
hibernate:
ddl-auto: update # or validate, create-drop for production
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
Note on json.value.default.type: While we use specific JsonSerde instances for reading, if you were to rely solely on the default.value.serde property with JsonSerde, you'd need this property for polymorphic deserialization. With explicit new JsonSerde<>(OrderCreatedEvent.class, objectMapper), it's handled. For the Grouped.with on allOrderEvents, the JsonSerde<OrderEvent> needs the polymorphic configuration.
Considerations and Trade-offs
While Kafka Streams offers a robust solution, it's essential to understand the production implications:
Event Ordering and Idempotency:
- Event Ordering: Kafka guarantees order within a partition. If events for the same order can land on different partitions (e.g., if you don't key events by
orderId), their processing order might not be strictly sequential. Always key your events by the aggregate ID (orderIdin this case) to ensure all events for a given aggregate are processed in order by the same Kafka Streams task. - Database Write Idempotency: The
repository.save(projection)call acts as an upsert. This is crucial because Kafka Streams delivers messages to theforeachsink with at-least-once semantics for external systems. If the database write fails and the Streams application restarts, the same projection update might be delivered again. An upsert (insert if not exists, update if exists) ensures that re-processing doesn't lead to incorrect duplicate data.
- Event Ordering: Kafka guarantees order within a partition. If events for the same order can land on different partitions (e.g., if you don't key events by
State Store Management:
- RocksDB: Kafka Streams uses embedded RocksDB instances for state stores. These are local to the Kafka Streams application instances. For production, consider using persistent volumes if deploying with Docker/Kubernetes to avoid full rebuilds on container restarts.
- Rebuilding Projections: If your database is lost or you need to change the projection logic, you can easily rebuild the entire materialized view by resetting the Kafka Streams application's offsets to the beginning of the source topics and letting it re-process all historical events. This is a massive advantage over manual data migration scripts.
Monitoring and Operations:
- Metrics: Kafka Streams applications emit a rich set of metrics (Kafka consumer metrics, processing lag, state store sizes) that are vital for monitoring. Integrate with Prometheus/Grafana or your existing monitoring stack.
- Error Handling: Implement robust error handling for external writes (e.g.,
repository.save). Consider a dead-letter queue (DLQ) for events that cause persistent failures during projection updates, preventing them from blocking the stream.
Complexity vs. Simplicity:
- For very simple projections (e.g., direct 1:1 mapping from an event to a database row without aggregation), a standard Spring Kafka consumer might be sufficient and simpler to manage.
- Kafka Streams introduces a learning curve and operational overhead. Reserve it for scenarios where you genuinely need stateful processing, aggregations, joins, or robust fault tolerance at scale.
Schema Evolution: When event schemas change, your Kafka Streams topology needs to be updated. Plan for backward and forward compatibility using tools like Avro or Protobuf for event serialization, combined with a Schema Registry.
Conclusion
Building responsive, query-optimized read models in complex event-driven microservice architectures doesn't have to be an arduous task. By leveraging the power of Kafka Streams within a Spring Boot application, we can elegantly transform disparate event streams into cohesive, real-time materialized views. This approach centralizes sophisticated data aggregation, ensures scalability and fault tolerance, and ultimately provides a performant foundation for your query services.
While the path requires careful consideration of event ordering, idempotency, and operational concerns, the ability to build and rebuild complex projections from the immutable event log empowers developers to evolve their read models dynamically. Embracing Kafka Streams is a strategic move for any Senior Backend Engineer aiming to master the intricacies of modern distributed data processing.