Published on

Mastering CQRS: Decoupling Read and Write Models for Scalable Microservices with Spring Boot, Kafka, and PostgreSQL

Authors
  • avatar
    Name
    Maria
    Twitter

Introduction: The Monolithic Data Model Bottleneck

As senior backend engineers, we frequently encounter systems where a single, rich domain model serves both write (create, update, delete) and read (query) operations. While initially straightforward, this approach quickly becomes a bottleneck in high-throughput, data-intensive microservices. Complex read queries can contend with write operations, locking resources, degrading performance, and making schema evolution a nightmare. Scaling becomes challenging; optimizing for reads often compromises writes, and vice-versa. Imagine an e-commerce platform where product updates (writes) are infrequent but product catalog lookups (reads) occur thousands of times per second. Using the same model and database for both inevitably leads to compromises, hindering both scalability and responsiveness.

This is where Command Query Responsibility Segregation (CQRS) shines. It offers a powerful architectural pattern to explicitly separate the responsibilities of data manipulation (commands) from data retrieval (queries), leading to more scalable, performant, and maintainable systems.

Deep Dive: Unpacking CQRS

At its core, CQRS proposes using distinct models to update information and to read information. This isn't just about separate methods; it often implies separate data stores, optimized for their specific purpose.

Here's how CQRS breaks down:

  1. Commands: Represent intentions to change the state of the system. They are imperative, immutable messages (e.g., CreateProductCommand, UpdateProductInventoryCommand). Commands are typically handled by a command handler.
  2. Command Model (Write Model): This is the transactional side of your application, optimized for updates. It's often a rich domain model, focusing on business rules and data integrity. It's where your aggregates and JPA entities reside.
  3. Events: When a command successfully processes, it emits one or more domain events. These are past-tense facts about something that has already occurred (e.g., ProductCreatedEvent, ProductInventoryUpdatedEvent). Events are immutable and are published to an event bus.
  4. Event Bus (Apache Kafka): A crucial component for decoupling the write and read sides. Events published by the command model are consumed by various subscribers, including the read model projectors. Kafka's durability, scalability, and pub-sub capabilities make it an ideal choice.
  5. Query Model (Read Model/Projection): This is a denormalized, read-optimized data store built by consuming events from the event bus. It's specifically designed for efficient querying, often sacrificing normalization for performance. It can be a simple SQL view, a materialized view, or even an entirely different type of database (e.g., a NoSQL store, or a separate PostgreSQL database/schema for projections).
  6. Queries: Represent requests for data. They are declarative (e.g., GetProductByIdQuery, GetAllProductsInStockQuery). Queries are handled by query handlers that retrieve data directly from the read model.

By separating concerns, we gain several advantages:

  • Independent Scaling: The write and read models can be scaled independently based on their respective loads.
  • Optimized Data Stores: Each model can use the most appropriate data store technology and schema. The write model might be highly normalized for transactional integrity, while the read model might be denormalized for query performance.
  • Improved Performance: Queries don't contend with write locks, and read models can be highly optimized for specific query patterns.
  • Enhanced Maintainability & Flexibility: Domain complexity is isolated to the write model, while the read model can evolve independently to support new UI requirements or reporting needs without impacting transactional logic.
  • Event-Driven Architecture Synergy: CQRS naturally complements event-driven architectures, using events as the primary mechanism for synchronization.

The primary trade-off is increased complexity and the introduction of eventual consistency – the read model might be slightly out of sync with the write model for a brief period. For many business domains, this is an acceptable compromise for the scalability gains.

Code Implementation: Building a Product Catalog with CQRS

Let's illustrate CQRS with a Product service using Spring Boot 4.0, Java 25, JPA/PostgreSQL for the write model, and a separate PostgreSQL schema for the read model, with Apache Kafka as the event bus.

First, ensure your pom.xml includes dependencies for spring-boot-starter-web, spring-boot-starter-data-jpa, postgresql, spring-kafka, and lombok (optional, for brevity).

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>4.0.0-SNAPSHOT</version> <!-- Assuming Spring Boot 4.0 for demonstration -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example.cqrs</groupId>
    <artifactId>product-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>product-service</name>
    <description>CQRS Product Service</description>

    <properties>
        <java.version>25</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- Add other testing dependencies as needed -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <releases>
                <enabled>false</releases>
            </releases>
        </pluginRepository>
    </pluginRepositories>
</project>

1. Common DTOs and Events

We'll use Java 25 Records for our immutable commands, queries, and events.

// src/main/java/com/example/cqrs/product/common/commands/ProductCommands.java
package com.example.cqrs.product.common.commands;

import java.math.BigDecimal;
import java.util.UUID;

public record CreateProductCommand(String name, String description, BigDecimal price, int quantity) {}
public record UpdateProductPriceCommand(UUID productId, BigDecimal newPrice) {}
public record UpdateProductQuantityCommand(UUID productId, int newQuantity) {}
// src/main/java/com/example/cqrs/product/common/events/ProductEvents.java
package com.example.cqrs.product.common.events;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;

public record ProductCreatedEvent(UUID productId, String name, String description, BigDecimal price, int quantity, Instant createdAt) {}
public record ProductPriceUpdatedEvent(UUID productId, BigDecimal newPrice, Instant updatedAt) {}
public record ProductQuantityUpdatedEvent(UUID productId, int newQuantity, Instant updatedAt) {}
// src/main/java/com/example/cqrs/product/common/queries/ProductQueries.java
package com.example.cqrs.product.common.queries;

import java.util.UUID;

public record GetProductByIdQuery(UUID productId) {}
public record GetAllProductsQuery() {}

2. Write Model (Command Side)

The write model manages the authoritative state of our products.

// src/main/java/com/example/cqrs/product/write/domain/ProductAggregate.java
package com.example.cqrs.product.write.domain;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;

@Entity
@Table(name = "products", schema = "write_model")
@Data
@NoArgsConstructor
public class ProductAggregate {
    @Id
    private UUID id;
    private String name;
    private String description;
    private BigDecimal price;
    private int quantity;
    private Instant createdAt;
    private Instant lastUpdatedAt;

    public ProductAggregate(UUID id, String name, String description, BigDecimal price, int quantity, Instant createdAt) {
        this.id = id;
        this.name = name;
        this.description = description;
        this.price = price;
        this.quantity = quantity;
        this.createdAt = createdAt;
        this.lastUpdatedAt = createdAt;
    }

    public void updatePrice(BigDecimal newPrice) {
        if (newPrice.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Price must be positive.");
        }
        this.price = newPrice;
        this.lastUpdatedAt = Instant.now();
    }

    public void updateQuantity(int newQuantity) {
        if (newQuantity < 0) {
            throw new IllegalArgumentException("Quantity cannot be negative.");
        }
        this.quantity = newQuantity;
        this.lastUpdatedAt = Instant.now();
    }
}
// src/main/java/com/example/cqrs/product/write/infrastructure/ProductAggregateRepository.java
package com.example.cqrs.product.write.infrastructure;

import com.example.cqrs.product.write.domain.ProductAggregate;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.UUID;

@Repository
public interface ProductAggregateRepository extends JpaRepository<ProductAggregate, UUID> {}
// src/main/java/com/example/cqrs/product/write/application/ProductCommandService.java
package com.example.cqrs.product.write.application;

import com.example.cqrs.product.common.commands.CreateProductCommand;
import com.example.cqrs.product.common.commands.UpdateProductPriceCommand;
import com.example.cqrs.product.common.commands.UpdateProductQuantityCommand;
import com.example.cqrs.product.common.events.ProductCreatedEvent;
import com.example.cqrs.product.common.events.ProductPriceUpdatedEvent;
import com.example.cqrs.product.common.events.ProductQuantityUpdatedEvent;
import com.example.cqrs.product.write.domain.ProductAggregate;
import com.example.cqrs.product.write.infrastructure.ProductAggregateRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.util.UUID;

@Service
public class ProductCommandService {

    private final ProductAggregateRepository repository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;
    private static final String PRODUCT_EVENTS_TOPIC = "product-events";

    public ProductCommandService(ProductAggregateRepository repository, KafkaTemplate<String, String> kafkaTemplate) {
        this.repository = repository;
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule()); // For Instant serialization
    }

    @Transactional
    public UUID handle(CreateProductCommand command) {
        UUID productId = UUID.randomUUID();
        Instant now = Instant.now();
        ProductAggregate product = new ProductAggregate(productId, command.name(), command.description(), command.price(), command.quantity(), now);
        repository.save(product);

        ProductCreatedEvent event = new ProductCreatedEvent(productId, command.name(), command.description(), command.price(), command.quantity(), now);
        publishEvent(event.productId().toString(), event);
        return productId;
    }

    @Transactional
    public void handle(UpdateProductPriceCommand command) {
        ProductAggregate product = repository.findById(command.productId())
                .orElseThrow(() -> new IllegalArgumentException("Product not found with ID: " + command.productId()));
        product.updatePrice(command.newPrice());
        repository.save(product);

        ProductPriceUpdatedEvent event = new ProductPriceUpdatedEvent(command.productId(), command.newPrice(), product.getLastUpdatedAt());
        publishEvent(event.productId().toString(), event);
    }

    @Transactional
    public void handle(UpdateProductQuantityCommand command) {
        ProductAggregate product = repository.findById(command.productId())
                .orElseThrow(() -> new IllegalArgumentException("Product not found with ID: " + command.productId()));
        product.updateQuantity(command.newQuantity());
        repository.save(product);

        ProductQuantityUpdatedEvent event = new ProductQuantityUpdatedEvent(command.productId(), command.newQuantity(), product.getLastUpdatedAt());
        publishEvent(event.productId().toString(), event);
    }

    private void publishEvent(String key, Object event) {
        try {
            String eventPayload = objectMapper.writeValueAsString(event);
            kafkaTemplate.send(PRODUCT_EVENTS_TOPIC, key, eventPayload);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event: " + event.getClass().getName(), e);
        }
    }
}
// src/main/java/com/example/cqrs/product/write/api/ProductCommandController.java
package com.example.cqrs.product.write.api;

import com.example.cqrs.product.common.commands.CreateProductCommand;
import com.example.cqrs.product.common.commands.UpdateProductPriceCommand;
import com.example.cqrs.product.common.commands.UpdateProductQuantityCommand;
import com.example.cqrs.product.write.application.ProductCommandService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.UUID;

@RestController
@RequestMapping("/api/products/commands")
public class ProductCommandController {

    private final ProductCommandService productCommandService;

    public ProductCommandController(ProductCommandService productCommandService) {
        this.productCommandService = productCommandService;
    }

    @PostMapping
    public ResponseEntity<UUID> createProduct(@RequestBody CreateProductCommand command) {
        UUID productId = productCommandService.handle(command);
        return new ResponseEntity<>(productId, HttpStatus.CREATED);
    }

    @PatchMapping("/{productId}/price")
    public ResponseEntity<Void> updateProductPrice(@PathVariable UUID productId, @RequestBody UpdateProductPriceCommand command) {
        productCommandService.handle(new UpdateProductPriceCommand(productId, command.newPrice()));
        return ResponseEntity.accepted().build();
    }

    @PatchMapping("/{productId}/quantity")
    public ResponseEntity<Void> updateProductQuantity(@PathVariable UUID productId, @RequestBody UpdateProductQuantityCommand command) {
        productCommandService.handle(new UpdateProductQuantityCommand(productId, command.newQuantity()));
        return ResponseEntity.accepted().build();
    }
}

3. Read Model (Query Side)

The read model is built by consuming events from Kafka. It's optimized for fast lookups.

// src/main/java/com/example/cqrs/product/read/domain/ProductProjection.java
package com.example.cqrs.product.read.domain;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;

@Entity
@Table(name = "product_projections", schema = "read_model")
@Data
@NoArgsConstructor
public class ProductProjection {
    @Id
    private UUID id;
    private String name;
    private String description;
    private BigDecimal price;
    private int quantity;
    @Column(name = "created_at")
    private Instant createdAt;
    @Column(name = "last_updated_at")
    private Instant lastUpdatedAt;

    public ProductProjection(UUID id, String name, String description, BigDecimal price, int quantity, Instant createdAt, Instant lastUpdatedAt) {
        this.id = id;
        this.name = name;
        this.description = description;
        this.price = price;
        this.quantity = quantity;
        this.createdAt = createdAt;
        this.lastUpdatedAt = lastUpdatedAt;
    }
}
// src/main/java/com/example/cqrs/product/read/infrastructure/ProductProjectionRepository.java
package com.example.cqrs.product.read.infrastructure;

import com.example.cqrs.product.read.domain.ProductProjection;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.UUID;

@Repository
public interface ProductProjectionRepository extends JpaRepository<ProductProjection, UUID> {}
// src/main/java/com/example/cqrs/product/read/application/ProductQueryService.java
package com.example.cqrs.product.read.application;

import com.example.cqrs.product.common.queries.GetAllProductsQuery;
import com.example.cqrs.product.common.queries.GetProductByIdQuery;
import com.example.cqrs.product.read.domain.ProductProjection;
import com.example.cqrs.product.read.infrastructure.ProductProjectionRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Optional;

@Service
@Transactional(readOnly = true) // Read-only transactions are typically faster
public class ProductQueryService {

    private final ProductProjectionRepository repository;

    public ProductQueryService(ProductProjectionRepository repository) {
        this.repository = repository;
    }

    public Optional<ProductProjection> handle(GetProductByIdQuery query) {
        return repository.findById(query.productId());
    }

    public List<ProductProjection> handle(GetAllProductsQuery query) {
        return repository.findAll();
    }
}
// src/main/java/com/example/cqrs/product/read/api/ProductQueryController.java
package com.example.cqrs.product.read.api;

import com.example.cqrs.product.common.queries.GetAllProductsQuery;
import com.example.cqrs.product.common.queries.GetProductByIdQuery;
import com.example.cqrs.product.read.application.ProductQueryService;
import com.example.cqrs.product.read.domain.ProductProjection;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.UUID;

@RestController
@RequestMapping("/api/products/queries")
public class ProductQueryController {

    private final ProductQueryService productQueryService;

    public ProductQueryController(ProductQueryService productQueryService) {
        this.productQueryService = productQueryService;
    }

    @GetMapping("/{productId}")
    public ResponseEntity<ProductProjection> getProductById(@PathVariable UUID productId) {
        return productQueryService.handle(new GetProductByIdQuery(productId))
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }

    @GetMapping
    public ResponseEntity<List<ProductProjection>> getAllProducts() {
        List<ProductProjection> products = productQueryService.handle(new GetAllProductsQuery());
        return ResponseEntity.ok(products);
    }
}

4. Event Listener (Projector)

This component consumes events from Kafka and updates the read model.

// src/main/java/com/example/cqrs/product/read/infrastructure/ProductEventProjector.java
package com.example.cqrs.product.read.infrastructure;

import com.example.cqrs.product.common.events.ProductCreatedEvent;
import com.example.cqrs.product.common.events.ProductPriceUpdatedEvent;
import com.example.cqrs.product.common.events.ProductQuantityUpdatedEvent;
import com.example.cqrs.product.read.domain.ProductProjection;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class ProductEventProjector {

    private final ProductProjectionRepository repository;
    private final ObjectMapper objectMapper;

    public ProductEventProjector(ProductProjectionRepository repository) {
        this.repository = repository;
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
    }

    @KafkaListener(topics = "product-events", groupId = "product-projection-group")
    @Transactional("readModelTransactionManager") // Use a dedicated transaction manager for the read model
    public void consumeProductEvent(String eventPayload) {
        try {
            // A more robust implementation would use a common event envelope with a 'type' field
            // to dynamically deserialize to the correct event class.
            // For simplicity, we try to parse known event types.
            if (eventPayload.contains("ProductCreatedEvent")) {
                ProductCreatedEvent event = objectMapper.readValue(eventPayload, ProductCreatedEvent.class);
                ProductProjection projection = new ProductProjection(
                        event.productId(), event.name(), event.description(), event.price(), event.quantity(), event.createdAt(), event.createdAt());
                repository.save(projection);
            } else if (eventPayload.contains("ProductPriceUpdatedEvent")) {
                ProductPriceUpdatedEvent event = objectMapper.readValue(eventPayload, ProductPriceUpdatedEvent.class);
                repository.findById(event.productId()).ifPresent(projection -> {
                    projection.setPrice(event.newPrice());
                    projection.setLastUpdatedAt(event.updatedAt());
                    repository.save(projection);
                });
            } else if (eventPayload.contains("ProductQuantityUpdatedEvent")) {
                ProductQuantityUpdatedEvent event = objectMapper.readValue(eventPayload, ProductQuantityUpdatedEvent.class);
                repository.findById(event.productId()).ifPresent(projection -> {
                    projection.setQuantity(event.newQuantity());
                    projection.setLastUpdatedAt(event.updatedAt());
                    repository.save(projection);
                });
            }
        } catch (JsonProcessingException e) {
            System.err.println("Error parsing product event: " + e.getMessage());
            // Log error, potentially send to a dead-letter queue
        }
    }
}

5. Database Configuration

We need two DataSource and EntityManagerFactory beans, one for the write model and one for the read model, targeting different schemas (or even different databases) in PostgreSQL.

// src/main/java/com/example/cqrs/config/WriteModelConfig.java
package com.example.cqrs.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef = "writeModelEntityManagerFactory",
        transactionManagerRef = "writeModelTransactionManager",
        basePackages = {"com.example.cqrs.product.write.infrastructure"}
)
@EntityScan(basePackages = {"com.example.cqrs.product.write.domain"})
public class WriteModelConfig {

    @Primary
    @Bean(name = "writeModelDataSourceProperties")
    @ConfigurationProperties("spring.datasource.write-model")
    public DataSourceProperties writeModelDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Primary
    @Bean(name = "writeModelDataSource")
    public DataSource writeModelDataSource(@Qualifier("writeModelDataSourceProperties") DataSourceProperties dataSourceProperties) {
        return dataSourceProperties.initializeDataSourceBuilder().build();
    }

    @Primary
    @Bean(name = "writeModelEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean writeModelEntityManagerFactory(
            EntityManagerFactoryBuilder builder, @Qualifier("writeModelDataSource") DataSource dataSource) {
        Map<String, String> jpaProperties = new HashMap<>();
        jpaProperties.put("hibernate.hbm2ddl.auto", "update"); // Or validate/none in production
        jpaProperties.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect");
        jpaProperties.put("hibernate.default_schema", "write_model"); // Specify schema

        return builder
                .dataSource(dataSource)
                .packages("com.example.cqrs.product.write.domain")
                .persistenceUnit("writeModelPU")
                .properties(jpaProperties)
                .build();
    }

    @Primary
    @Bean(name = "writeModelTransactionManager")
    public PlatformTransactionManager writeModelTransactionManager(
            @Qualifier("writeModelEntityManagerFactory") LocalContainerEntityManagerFactoryBean writeModelEntityManagerFactory) {
        return new JpaTransactionManager(writeModelEntityManagerFactory.getObject());
    }
}
// src/main/java/com/example/cqrs/config/ReadModelConfig.java
package com.example.cqrs.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef = "readModelEntityManagerFactory",
        transactionManagerRef = "readModelTransactionManager",
        basePackages = {"com.example.cqrs.product.read.infrastructure"}
)
@EntityScan(basePackages = {"com.example.cqrs.product.read.domain"})
public class ReadModelConfig {

    @Bean(name = "readModelDataSourceProperties")
    @ConfigurationProperties("spring.datasource.read-model")
    public DataSourceProperties readModelDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean(name = "readModelDataSource")
    public DataSource readModelDataSource(@Qualifier("readModelDataSourceProperties") DataSourceProperties dataSourceProperties) {
        return dataSourceProperties.initializeDataSourceBuilder().build();
    }

    @Bean(name = "readModelEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean readModelEntityManagerFactory(
            EntityManagerFactoryBuilder builder, @Qualifier("readModelDataSource") DataSource dataSource) {
        Map<String, String> jpaProperties = new HashMap<>();
        jpaProperties.put("hibernate.hbm2ddl.auto", "update"); // Or validate/none in production
        jpaProperties.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect");
        jpaProperties.put("hibernate.default_schema", "read_model"); // Specify schema

        return builder
                .dataSource(dataSource)
                .packages("com.example.cqrs.product.read.domain")
                .persistenceUnit("readModelPU")
                .properties(jpaProperties)
                .build();
    }

    @Bean(name = "readModelTransactionManager")
    public PlatformTransactionManager readModelTransactionManager(
            @Qualifier("readModelEntityManagerFactory") LocalContainerEntityManagerFactoryBean readModelEntityManagerFactory) {
        return new JpaTransactionManager(readModelEntityManagerFactory.getObject());
    }
}

6. Application Properties

In application.properties (or application.yml):

# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=product-projection-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest # For development, use latest in production

# Write Model DataSource Configuration
spring.datasource.write-model.url=jdbc:postgresql://localhost:5432/productdb
spring.datasource.write-model.username=user
spring.datasource.write-model.password=password
spring.datasource.write-model.driver-class-name=org.postgresql.Driver

# Read Model DataSource Configuration
spring.datasource.read-model.url=jdbc:postgresql://localhost:5432/productdb
spring.datasource.read-model.username=user
spring.datasource.read-model.password=password
spring.datasource.read-model.driver-class-name=org.postgresql.Driver

# Spring JPA for Write Model - primary
spring.jpa.open-in-view=false
spring.jpa.hibernate.ddl-auto=none # Handled by specific configs

# General Server Port
server.port=8080

Before running, ensure you have PostgreSQL running with a database named productdb and Kafka running. The application will create write_model and read_model schemas and their respective tables.

Considerations and Trade-offs

Implementing CQRS is a significant architectural decision that comes with its own set of complexities:

  1. Eventual Consistency: This is the most crucial consideration. The read model is eventually consistent, meaning there will be a delay (milliseconds to seconds, depending on processing speed) before updates to the write model are reflected in the read model. For many business cases (e.g., product catalog browsing), this is acceptable. For highly consistent, immediate feedback requirements (e.g., bank transfers), CQRS might be unsuitable or require additional patterns like optimistic locking and sagas.
  2. Increased Complexity: You are managing multiple data models, potentially multiple databases, an event bus, and the projection logic. This means more code, more infrastructure, and more moving parts to monitor and maintain.
  3. Data Synchronization Challenges: Ensuring reliable event delivery and idempotent projection processing is critical. If an event is missed or processed multiple times, your read model can become corrupted. Idempotency on Kafka consumers (as discussed in a previous post!) is paramount here.
  4. Debugging and Observability: Tracing a request from command to event publication and then to read model projection requires robust distributed tracing (which we've also covered!). Understanding the state of the system during an eventual consistency delay can be tricky.
  5. Schema Evolution: While CQRS can simplify read model evolution, changes to events themselves can be complex. You need robust strategies for event versioning and schema migration to avoid breaking existing projectors.
  6. Infrastructure Cost: Running multiple databases, Kafka clusters, and potentially separate microservices for command and query sides increases infrastructure cost and operational overhead.

When to use CQRS:

  • When your application experiences high read loads that disproportionately outnumber write loads.
  • When read queries are complex and performance-critical, requiring highly optimized, denormalized data structures.
  • When the write model needs strong transactional consistency and complex business logic, but the read model needs different optimizations (e.g., search, reporting).
  • When you are already operating in an event-driven microservices architecture.

When to reconsider CQRS:

  • For simple CRUD applications where the read and write models are naturally aligned.
  • When eventual consistency is not acceptable for the core business logic.
  • When your team lacks experience with distributed systems, event streams, and complex data synchronization.

Conclusion

CQRS is not a silver bullet, but it is an incredibly powerful architectural pattern for building scalable, high-performance microservices, especially within an event-driven ecosystem. By thoughtfully separating your command and query responsibilities, leveraging Apache Kafka as a resilient event bus, and optimizing your data stores (like PostgreSQL) for their specific roles, you can overcome the limitations of monolithic data models.

While it introduces complexity, the benefits of independent scaling, improved performance, and enhanced domain clarity often outweigh the initial learning curve and operational overhead for the right problems. Embrace CQRS not just as a pattern, but as a paradigm shift in how you design and evolve your data-intensive backend systems. Are you ready to decouple and conquer your data challenges?