Published on

[2026 Deep Dive] Mastering Distributed Caching: Event-Driven Invalidation with Spring Boot 4.0, Apache Kafka, and Redis

Authors
  • avatar
    Name
    Maria
    Twitter

Introduction: The Cache Conundrum in Distributed Systems

Distributed Caching is an indispensable technique for scaling modern microservices, dramatically reducing database load and improving response times. However, the true challenge isn't merely implementing a cache; it's managing its consistency across a distributed system. As data changes in your authoritative source (e.g., PostgreSQL via JPA), ensuring that your cached data reflects these updates in a timely and reliable manner becomes a complex dance. Stale data can lead to incorrect business decisions, frustrated users, and a loss of trust in your system.

Traditional caching strategies, such as time-to-live (TTL) or manual eviction, often fall short in high-throughput, event-driven microservice environments. They either introduce unacceptable levels of staleness or burden developers with intricate manual invalidation logic. This is where Event-Driven Invalidation shines. By leveraging the power of Apache Kafka as an immutable log of changes, we can build a highly consistent, resilient, and scalable mechanism to invalidate or update cached entries across all our Spring Boot 4.0 microservices, running on Java 25.

In this comprehensive deep-dive, we'll architect and implement a robust distributed caching solution. We'll explore how to integrate Spring Boot's powerful caching abstraction with a Redis backend, and crucially, how to use Apache Kafka to propagate data change events, ensuring that your cached data remains as fresh as your morning coffee. Prepare to unlock peak performance while maintaining strict data integrity in your mission-critical applications.

TL;DR: Distributed caching boosts performance but struggles with consistency. We'll leverage Spring Boot 4.0, Kafka, and Redis to build an event-driven invalidation strategy, ensuring cached data is always fresh by reacting to source-of-truth changes. This deep dive provides practical code examples for robust microservice architecture.

The Inevitable Trade-offs: Understanding Cache Consistency

Before diving into the implementation, it's crucial to understand the inherent trade-offs in distributed systems, particularly concerning cache consistency. The CAP theorem (Consistency, Availability, Partition tolerance) reminds us that we can only ever achieve two of these three guarantees simultaneously. For most distributed caches, especially those designed for high availability and partition tolerance, we often trade strong consistency for eventual consistency or accept a degree of data staleness.

  • Strong Consistency: Every read receives the most recent write or an error. This is hard to achieve in a distributed cache without significant performance overhead or reduced availability.
  • Eventual Consistency: All writes will eventually be visible to all reads, provided no new writes occur. This is the sweet spot for many distributed caching scenarios, where short periods of staleness are acceptable for massive performance gains.
  • Stale Data: Reads may receive older data. This is the cost of high cache hit ratios and low latency, which we aim to minimize through effective invalidation.

Our goal with event-driven invalidation is to move from unpredictable staleness to bounded eventual consistency, ensuring that cached data updates propagate quickly and reliably after the source of truth changes.

Common Caching Patterns and Their Limitations

Let's briefly recap common caching patterns and why they often fall short in complex distributed environments:

  1. Cache-Aside: The application directly manages the cache. It checks the cache first for data. If found (cache hit), it returns the data. If not (cache miss), it fetches from the database, stores it in the cache, and then returns it.

    • Limitation: Invalidation is the developer's responsibility. If data changes in the database directly or through another service, the cache becomes stale until explicitly invalidated or its TTL expires.
  2. Write-Through: Data is written synchronously to both the cache and the database.

    • Limitation: Adds latency to write operations because both writes must succeed. Still requires a strategy for other services to know about cache updates/invalidations.
  3. Write-Behind: Data is written to the cache first, and then asynchronously written to the database.

    • Limitation: Offers lower write latency but introduces data loss risk if the cache fails before data is persisted. Like write-through, it doesn't inherently solve cross-service cache consistency.

Our proposed event-driven invalidation primarily enhances the Cache-Aside pattern, providing a robust, distributed mechanism for keeping cached data fresh.

Architecture Overview: Connecting Spring Boot, Kafka, and Redis

Our architecture for event-driven cache invalidation integrates three powerful technologies:

  1. Spring Boot 4.0 (Java 25): Our microservice framework, providing the Spring Cache abstraction for easy cache integration and Spring Data JPA for database interaction with PostgreSQL.
  2. Apache Kafka: The asynchronous messaging backbone. It serves as an immutable, ordered log for all data change events originating from our services.
  3. Redis: Our high-performance, in-memory distributed data store, serving as the actual cache. It offers low-latency reads and writes, and powerful data structures.
  4. PostgreSQL: The authoritative, persistent data store for our application state, managed via JPA/Hibernate.

The Flow of Event-Driven Cache Invalidation

  1. Data Modification: A Spring Boot service modifies data in PostgreSQL (e.g., Product update).
  2. Event Publishing: Immediately after a successful database transaction, the service publishes a CacheInvalidationEvent (or a domain event like ProductUpdatedEvent) to a dedicated Kafka topic. This uses the Transactional Outbox Pattern (refer to our previous post "Mastering Distributed Transactions: The Transactional Outbox Pattern") for guaranteed delivery.
  3. Event Consumption: Other Spring Boot services (which might be caching the same data) consume messages from this Kafka topic.
  4. Cache Invalidation: Upon receiving an event for a specific entity or key, the consumer service evicts the corresponding entry from its local/shared Redis cache.
  5. Subsequent Reads: Any subsequent read requests for that invalidated data will result in a cache miss, prompting the service to fetch the freshest data from PostgreSQL and repopulate the cache.

This ensures that all services eventually converge on the latest data, with the speed of convergence determined by Kafka's low latency and the consumer's processing speed.

Step-by-Step Implementation: Building Our Event-Driven Cache

Let's walk through the implementation details using a Product entity as our example.

1. Project Setup: Spring Boot 4.0 with Java 25

First, ensure your pom.xml (or build.gradle) includes the necessary dependencies for Spring Web, Spring Data JPA, PostgreSQL driver, Spring Kafka, Spring Data Redis, and the Spring Cache abstraction.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>4.0.0-SNAPSHOT</version> <!-- Spring Boot 4.0 -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>event-driven-cache</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>event-driven-cache</name>
    <description>Mastering Distributed Caching with Event-Driven Invalidation</description>

    <properties>
        <java.version>25</java.version> <!-- Java 25 -->
        <spring-kafka.version>3.3.0</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-cache</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

2. Configure Spring Cache and Redis

Enable caching in your main application class and configure Redis as the caching provider.

// src/main/java/com/example/eventdrivencache/EventDrivenCacheApplication.java
package com.example.eventdrivencache;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching; // 캐싱 활성화 (Enable caching)

@SpringBootApplication
@EnableCaching // Enable Spring's caching abstraction
public class EventDrivenCacheApplication {

    public static void main(String[] args) {
        SpringApplication.run(EventDrivenCacheApplication.class, args);
    }

}

In application.yml, configure Redis and Kafka.

# src/main/resources/application.yml
spring:
  application:
    name: product-service
  datasource:
    url: jdbc:postgresql://localhost:5432/productdb
    username: user
    password: password
    driver-class-name: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update # For development purposes, use 'none' or 'validate' in production
    show-sql: true
    properties:
      hibernate:
        format_sql: true
  redis:
    host: localhost
    port: 6379
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*" # Trust all packages for deserialization. Be cautious in production.

cache:
  topic: product-cache-invalidation-events # 캐시 무효화 토픽 (Cache invalidation topic)

Configure Redis Cache Manager for object serialization.

// src/main/java/com/example/eventdrivencache/config/RedisCacheConfig.java
package com.example.eventdrivencache.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;

import java.time.Duration; // 캐시 지속 시간 설정 (Configure cache duration)

@Configuration
public class RedisCacheConfig {

    @Bean
    public RedisCacheConfiguration cacheConfiguration() {
        return RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofMinutes(60)) // Default TTL for cache entries
                .disableCachingNullValues()
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
    }
}

3. The Product Entity and Repository

// src/main/java/com/example/eventdrivencache/model/Product.java
package com.example.eventdrivencache.model;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable; // 직렬화 가능하도록 설정 (Make it serializable for caching)

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product implements Serializable { // Serializable for Redis caching
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String description;
    private double price;
    private int stock;
}
// src/main/java/com/example/eventdrivencache/repository/ProductRepository.java
package com.example.eventdrivencache.repository;

import com.example.eventdrivencache.model.Product;
import org.springframework.data.jpa.repository.JpaRepository; // JPA Repository 인터페이스 (JPA Repository interface)

public interface ProductRepository extends JpaRepository<Product, Long> {
}

4. Cache Invalidation Event

We'll define a simple event structure to convey invalidation messages via Kafka.

// src/main/java/com/example/eventdrivencache/event/CacheInvalidationEvent.java
package com.example.eventdrivencache.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

// 캐시 무효화 이벤트를 위한 DTO (DTO for cache invalidation events)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CacheInvalidationEvent {
    private String cacheName; // e.g., "products"
    private Object key;       // The key of the entry to invalidate, e.g., product ID
    private long timestamp;   // Event timestamp for potential ordering/deduplication
    private String sourceService; // Which service published the event
}

5. Product Service: Caching and Event Publishing

This service will use @Cacheable for reads and publish invalidation events when data is modified.

// src/main/java/com/example/eventdrivencache/service/ProductService.java
package com.example.eventdrivencache.service;

import com.example.eventdrivencache.event.CacheInvalidationEvent;
import com.example.eventdrivencache.model.Product;
import com.example.eventdrivencache.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict; // 캐시 제거 어노테이션 (Cache eviction annotation)
import org.springframework.cache.annotation.Cacheable; // 캐시 적용 어노테이션 (Cacheable annotation)
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.beans.factory.annotation.Value;

import java.util.List;
import java.util.Optional;

// 성능 최적화를 위한 서비스 레이어 (Service layer for performance optimization)
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {

    private final ProductRepository productRepository;
    private final ApplicationEventPublisher eventPublisher; // Spring의 이벤트 발행자 (Spring's event publisher)

    @Value("${cache.topic}")
    private String cacheInvalidationTopic;

    // productId로 제품을 조회, 캐시에 있으면 캐시에서 가져오고 없으면 DB에서 가져와 캐시에 저장
    // Retrieve product by ID, fetch from cache if available, otherwise from DB and cache it.
    @Cacheable(value = "products", key = "#id")
    public Optional<Product> getProductById(Long id) {
        log.info("Fetching product with ID {} from database...", id); // 데이터베이스 조회 (Database query)
        return productRepository.findById(id);
    }

    // 모든 제품 조회, 캐시에 있으면 캐시에서 가져오고 없으면 DB에서 가져와 캐시에 저장 (all products will be cached under a special key)
    @Cacheable(value = "products", key = "'allProducts'")
    public List<Product> getAllProducts() {
        log.info("Fetching all products from database..."); // 모든 제품 조회 (Fetching all products)
        return productRepository.findAll();
    }

    @Transactional // 트랜잭션 보장 (Ensure transactional integrity)
    @CacheEvict(value = "products", allEntries = true) // 초기화 및 모든 항목 캐시 무효화 (Clear and invalidate all cache entries immediately)
    public Product createProduct(Product product) {
        log.info("Creating new product: {}", product.getName()); // 새로운 제품 생성 (Creating new product)
        Product savedProduct = productRepository.save(product);
        // After successful transaction, publish an internal event.
        // This will trigger the Kafka producer to send the invalidation message.
        eventPublisher.publishEvent(new CacheInvalidationEvent(
                "products", savedProduct.getId(), System.currentTimeMillis(), "product-service"
        ));
        eventPublisher.publishEvent(new CacheInvalidationEvent(
                "products", "allProducts", System.currentTimeMillis(), "product-service"
        ));
        return savedProduct;
    }

    @Transactional
    @CacheEvict(value = "products", key = "#id", allEntries = false) // 특정 ID 캐시만 무효화 (Invalidate only specific ID cache)
    public Optional<Product> updateProduct(Long id, Product productDetails) {
        log.info("Updating product with ID {}: {}", id, productDetails.getName()); // 제품 업데이트 (Updating product)
        return productRepository.findById(id).map(existingProduct -> {
            existingProduct.setName(productDetails.getName());
            existingProduct.setDescription(productDetails.getDescription());
            existingProduct.setPrice(productDetails.getPrice());
            existingProduct.setStock(productDetails.getStock());
            Product updatedProduct = productRepository.save(existingProduct);
            // Publish internal event for specific product update
            eventPublisher.publishEvent(new CacheInvalidationEvent(
                    "products", updatedProduct.getId(), System.currentTimeMillis(), "product-service"
            ));
            // Also invalidate the 'allProducts' cache entry
            eventPublisher.publishEvent(new CacheInvalidationEvent(
                    "products", "allProducts", System.currentTimeMillis(), "product-service"
            ));
            return updatedProduct;
        });
    }

    @Transactional
    @CacheEvict(value = "products", key = "#id", allEntries = false) // 특정 ID 캐시 제거 (Evict specific ID cache)
    public boolean deleteProduct(Long id) {
        log.info("Deleting product with ID {}", id); // 제품 삭제 (Deleting product)
        if (productRepository.existsById(id)) {
            productRepository.deleteById(id);
            // Publish internal event for specific product deletion
            eventPublisher.publishEvent(new CacheInvalidationEvent(
                    "products", id, System.currentTimeMillis(), "product-service"
            ));
            // Invalidate 'allProducts' as well
            eventPublisher.publishEvent(new CacheInvalidationEvent(
                    "products", "allProducts", System.currentTimeMillis(), "product-service"
            ));
            return true;
        }
        return false;
    }
}

Important Note on @CacheEvict: Notice allEntries = true for createProduct and allEntries = false for updateProduct/deleteProduct. While @CacheEvict can remove entries from the local cache or the calling service's shared cache, it does not propagate this invalidation to other services. That's where Kafka comes in. The @CacheEvict here serves as a local cleanup. The actual distributed invalidation happens via Kafka events.

6. Kafka Event Publisher (Outbox Pattern Integration)

To ensure atomicity between database writes and Kafka message publishing, we'll use a simplified Outbox Pattern approach. We'll listen for Spring's internal transaction events.

// src/main/java/com/example/eventdrivencache/publisher/KafkaInvalidationEventPublisher.java
package com.example.eventdrivencache.publisher;

import com.example.eventdrivencache.event.CacheInvalidationEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener; // 트랜잭션 이벤트 리스너 (Transactional event listener)
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

// 분산 시스템의 데이터 동기화를 위한 이벤트 발행자 (Event publisher for distributed data synchronization)
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaInvalidationEventPublisher {

    private final KafkaTemplate<String, CacheInvalidationEvent> kafkaTemplate;

    @Value("${cache.topic}")
    private String cacheInvalidationTopic;

    // Listen to internal Spring events *after* the transaction commits.
    // This is a simplified outbox pattern; a full outbox would persist the event to DB first.
    @TransactionalEventListener // Only publish if the outer transaction succeeds
    public void handleCacheInvalidationEvent(CacheInvalidationEvent event) {
        log.info("Publishing cache invalidation event to Kafka topic {}: {}", cacheInvalidationTopic, event);
        kafkaTemplate.send(cacheInvalidationTopic, event.getCacheName() + ":" + event.getKey(), event)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        log.debug("Successfully sent event to Kafka: {}", result);
                    } else {
                        log.error("Failed to send event to Kafka: {}", ex.getMessage()); // 오류 처리 (Error handling)
                        // Consider retry mechanisms or dead-letter queues here for robust systems.
                    }
                });
    }
}

Note on TransactionalEventListener: This annotation ensures that our Kafka message is only sent if the originating database transaction successfully commits. If the transaction rolls back, the event is not published, maintaining strong data consistency between the DB and Kafka. For a truly bulletproof outbox, you'd persist the event to a local database outbox table within the same transaction, and then use a separate process (e.g., Debezium or a polling mechanism) to publish from the outbox table to Kafka. Our setup is a common and robust enough pattern for many use cases.

7. Kafka Event Consumer: Performing Distributed Cache Invalidation

Any Spring Boot service that caches Product data will need to consume from the product-cache-invalidation-events topic.

// src/main/java/com/example/eventdrivencache/consumer/CacheInvalidationConsumer.java
package com.example.eventdrivencache.consumer;

import com.example.eventdrivencache.event.CacheInvalidationEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.CacheManager; // 스프링 캐시 매니저 (Spring Cache Manager)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

// 분산 캐시 일관성 유지를 위한 이벤트 컨슈머 (Event consumer for maintaining distributed cache consistency)
@Component
@RequiredArgsConstructor
@Slf4j
public class CacheInvalidationConsumer {

    private final CacheManager cacheManager; // 캐시 매니저를 통해 캐시 제어 (Control cache via CacheManager)

    // Using an arbitrary group ID for demonstration. In production, use meaningful group IDs.
    @KafkaListener(topics = "${cache.topic}", groupId = "cache-invalidation-group", containerFactory = "kafkaListenerContainerFactory")
    public void listen(CacheInvalidationEvent event) {
        log.info("Received cache invalidation event: {}", event); // 이벤트 수신 (Event received)

        if (event.getCacheName() == null || event.getKey() == null) {
            log.warn("Received invalid cache invalidation event with null cacheName or key: {}", event);
            return;
        }

        try {
            // Get the specific cache by name
            org.springframework.cache.Cache cache = cacheManager.getCache(event.getCacheName());

            if (cache != null) {
                // Remove the specific entry from the cache
                cache.evict(event.getKey()); // 캐시에서 항목 제거 (Remove item from cache)
                log.info("Successfully evicted cache entry [cacheName={}, key={}]", event.getCacheName(), event.getKey()); // 캐시 무효화 성공 (Cache invalidation successful)
            } else {
                log.warn("Cache with name '{}' not found for invalidation event.", event.getCacheName());
            }
        } catch (Exception e) {
            log.error("Error processing cache invalidation event for [cacheName={}, key={}]: {}",
                    event.getCacheName(), event.getKey(), e.getMessage()); // 오류 처리 (Error handling)
            // Depending on resilience requirements, consider pushing to a dead-letter topic (DLT)
            // or implementing retry logic here. This is crucial for fault tolerance (내결함성).
        }
    }
}

We need to explicitly configure the KafkaListenerContainerFactory for JSON deserialization, especially when using JsonDeserializer with a trusted packages property.

// src/main/java/com/example/eventdrivencache/config/KafkaConfig.java
package com.example.eventdrivencache.config;

import com.example.eventdrivencache.event.CacheInvalidationEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map; // 카프카 설정 관리 (Kafka configuration management)

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // Producer Configuration
    @Bean
    public ProducerFactory<String, CacheInvalidationEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, CacheInvalidationEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Consumer Configuration
    @Bean
    public ConsumerFactory<String, CacheInvalidationEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // group.id is typically configured on the @KafkaListener annotation or via global properties.
        // props.put(ConsumerConfig.GROUP_ID_CONFIG, "cache-invalidation-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // For JSON deserialization (JSON 역직렬화 설정)
        // Auto-commit offset strategy. For robustness, consider manual commit.
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(CacheInvalidationEvent.class, false));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, CacheInvalidationEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, CacheInvalidationEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // For better performance (성능 최적화), increase concurrency if your consumers can handle it.
        // factory.setConcurrency(3);
        return factory;
    }
}

8. REST Controller for Product Operations

To interact with our service.

// src/main/java/com/example/eventdrivencache/controller/ProductController.java
package com.example.eventdrivencache.controller;

import com.example.eventdrivencache.model.Product;
import com.example.eventdrivencache.service.ProductService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List; // 데이터 구조 관리 (Data structure management)

// RESTful API 엔드포인트 (RESTful API endpoints)
@RestController
@RequestMapping("/products")
@RequiredArgsConstructor
public class ProductController {

    private final ProductService productService;

    @GetMapping("/{id}")
    public ResponseEntity<Product> getProductById(@PathVariable Long id) {
        return productService.getProductById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }

    @GetMapping
    public ResponseEntity<List<Product>> getAllProducts() {
        return ResponseEntity.ok(productService.getAllProducts());
    }

    @PostMapping
    public ResponseEntity<Product> createProduct(@RequestBody Product product) {
        Product createdProduct = productService.createProduct(product);
        return ResponseEntity.status(HttpStatus.CREATED).body(createdProduct);
    }

    @PutMapping("/{id}")
    public ResponseEntity<Product> updateProduct(@PathVariable Long id, @RequestBody Product product) {
        return productService.updateProduct(id, product)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }

    @DeleteMapping("/{id}")
    public ResponseEntity<Void> deleteProduct(@PathVariable Long id) {
        if (productService.deleteProduct(id)) {
            return ResponseEntity.noContent().build();
        }
        return ResponseEntity.notFound().build();
    }
}

Running the System (Local Setup with Docker)

To test this locally, you'll need Docker installed to run PostgreSQL, Kafka, and Redis.

Docker Compose Configuration

Create a docker-compose.yml file:

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO,kafka.server.KafkaApis=INFO"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms512M"

  redis:
    image: redis:7.2.4-alpine
    hostname: redis
    container_name: redis
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes # 영속성 활성화 (Enable persistence)

  postgres:
    image: postgres:16-alpine
    hostname: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: productdb
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    volumes:
      - pgdata:/var/lib/postgresql/data # 데이터 볼륨 (Data volume)

volumes:
  pgdata:

Running the Infrastructure

Open your terminal in the directory containing docker-compose.yml and run:

docker-compose up -d

This will start Zookeeper, Kafka, Redis, and PostgreSQL in the background.

Running the Spring Boot Application

Now, run your Spring Boot application from your IDE or via Maven:

# From your project root
./mvnw spring-boot:run

Testing the Cache Invalidation

  1. Read (Cache Miss & Populate):

    • GET /products/1 (Initially, you'll see "Fetching product with ID 1 from database..." in logs, then the product is cached.)
    • Repeat GET /products/1 (Now, you should not see the database message, indicating a cache hit.)
  2. Update (DB Write, Kafka Event, Cache Eviction):

    • PUT /products/1 with new data (e.g., price change).
    • Check logs: The service publishes a CacheInvalidationEvent to Kafka.
    • The CacheInvalidationConsumer in this same service (or another instance of it) picks up the event and evicts the product from Redis.
    • Repeat GET /products/1 (You'll now see "Fetching product with ID 1 from database..." again, retrieving the updated product.)

This demonstrates the end-to-end event-driven invalidation.

Multi-OS Mapping Table: Common Redis & Docker Commands

ActionWindows (PowerShell/CMD)macOS / Linux (Bash)Description
Start Docker Composedocker-compose up -ddocker-compose up -dStarts services defined in docker-compose.yml in detached mode.
Stop Docker Composedocker-compose downdocker-compose downStops and removes containers, networks, and volumes.
View Redis CLI (Container)docker exec -it redis redis-clidocker exec -it redis redis-cliConnects to the Redis CLI inside its Docker container.
Check Cache Entry (Redis)docker exec -it redis redis-cli GET "products::1"docker exec -it redis redis-cli GET "products::1"Gets the value for key "products::1" from Redis. (Note Spring Cache key format)
Delete Cache Entry (Redis)docker exec -it redis redis-cli DEL "products::1"docker exec -it redis redis-cli DEL "products::1"Manually deletes a key from Redis.
Flush All Caches (Redis)docker exec -it redis redis-cli FLUSHALLdocker exec -it redis redis-cli FLUSHALLRemoves all keys from all databases in Redis.
View Kafka Topics (Container)docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --listdocker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --listLists all Kafka topics. (Need to be careful with localhost here from inside container, might need kafka:29092)
Consume Kafka Topic (Container)docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic product-cache-invalidation-events --from-beginningdocker exec -it kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic product-cache-invalidation-events --from-beginningConsumes messages from the Kafka invalidation topic.

Advanced Considerations and Best Practices

1. Cache Key Strategy

The choice of cache key is critical. In Spring Cache, the key attribute in @Cacheable and @CacheEvict annotations allows you to define SpEL expressions.

  • Simple Keys: key="#id" is common for single entity lookups.
  • Complex Keys: For methods with multiple parameters, you might use key="#param1.id + ':' + #param2".
  • All Entries: For collections or "all" operations (like getAllProducts()), a fixed key like key="'allProducts'" is useful, but remember to invalidate it when any data changes. Using allEntries = true in @CacheEvict is a blunt instrument that clears all entries from a given cache name, which might be too aggressive for a granular event-driven strategy. Our event-driven approach allows for more surgical invalidation by sending the specific key.

2. Serialization with Redis

When caching Java objects in Redis, they need to be serialized. Spring Data Redis supports various serializers. GenericJackson2JsonRedisSerializer is a good choice as it uses JSON, making the data readable in Redis and interoperable with other systems. Ensure your cached objects implement Serializable or are compatible with your chosen serializer.

3. Idempotent Consumer for Cache Invalidation

It's possible for Kafka to deliver the same message multiple times (at-least-once delivery). Your cache invalidation consumer should be idempotent, meaning processing the same event multiple times has the same effect as processing it once.

  • Evicting a key multiple times is naturally idempotent.
  • If your invalidation logic involved updating cache entries based on event data, you would need to use a version number or timestamp in your CacheInvalidationEvent to ensure you only apply the latest update. Our timestamp field in CacheInvalidationEvent provides this capability. (Refer to our previous post "Ensuring Robustness: Mastering Idempotent Kafka Consumer Processing" for more details.)

4. Cache Warming and Cold Starts

When a service starts up, its cache is empty (cold start). The first few requests will be cache misses, hitting the database directly. For critical paths, you might consider cache warming:

  • On application startup, pre-load frequently accessed data into the cache.
  • Use a dedicated Kafka topic for full cache refreshes or bulk data synchronization if needed, particularly for materialized views or large datasets.

5. Monitoring and Metrics

Integrate monitoring tools like Micrometer with Prometheus and Grafana. Key metrics to track include:

  • Cache hit ratio: Percentage of requests served from cache. Higher is better.
  • Cache miss rate: Percentage of requests that required a database fetch. Lower is better.
  • Cache size: Number of entries and memory consumption.
  • Kafka consumer lag: How far behind your invalidation consumers are from the latest messages. High lag means stale data.
  • Database load: Confirm that caching reduces database queries.

6. Dealing with Race Conditions and Concurrent Updates

Even with event-driven invalidation, brief windows of inconsistency can occur:

  1. Service A reads item X (from cache).
  2. Service B updates item X in DB.
  3. Service B publishes invalidation event.
  4. Service A processes invalidation, removes X from cache.
  5. Service A still has item X in memory (from step 1), performs an operation.

For scenarios requiring strong consistency (e.g., financial transactions), you might need:

  • Distributed Locks: Not for cache invalidation itself, but for operations on the underlying data.
  • Optimistic Concurrency Control (OCC): Using version numbers on entities to detect and prevent conflicting updates. (Refer to our previous post "Ensuring Aggregate Consistency: Mastering Optimistic Concurrency Control")
  • Read-Through Cache: While we focus on cache-aside, a read-through cache where the cache provider itself fetches from the DB can simplify application logic but adds complexity to the cache layer.

7. Scalability and High Availability (고가용성)

  • Redis Cluster: For high availability and scalability, run Redis in cluster mode. Spring Data Redis integrates seamlessly with it.
  • Kafka Partitions: Increase the number of partitions for your cache-invalidation-events topic to allow for higher throughput and more consumer instances. Each consumer group can have up to N consumers for N partitions.
  • Multiple Microservice Instances: Each instance of your Spring Boot service will have its own Kafka consumer, ensuring that all services receive and process invalidation events.

Troubleshooting / What if it doesn't work?

Building distributed systems can be challenging. Here are common issues and debugging tips:

1. Redis Connection Issues

  • Error: Could not connect to Redis server...
  • Check:
    • Is your Redis Docker container running (docker ps)?
    • Is the port correct (default 6379)?
    • Is the host correct (localhost or redis if running inside another Docker container)?
    • Is there a firewall blocking the connection?

2. Kafka Connection/Messaging Issues

  • Error: KafkaProducer.send() failed..., NoBrokersAvailableException, TimeoutException.
  • Check:
    • Are Kafka and Zookeeper Docker containers running (docker ps)?
    • Is bootstrap-servers configured correctly (e.g., localhost:9092)?
    • Can you produce/consume messages using the kafka-console-producer and kafka-console-consumer from within the Kafka container? (See Multi-OS table above)
    • Is the Kafka topic created? (Spring Kafka often auto-creates, but manual check can help).

3. Cache Not Invalidaing

  • Symptoms: You update data, but subsequent reads still return old data, and no "Evicted cache entry" logs are seen.
  • Check:
    • Kafka Producer: Is the CacheInvalidationEvent being published to Kafka? Check your service logs for "Publishing cache invalidation event...". Is the TransactionalEventListener configured correctly? Is the transaction actually committing?
    • Kafka Topic: Is the event visible in the Kafka topic? Use kafka-console-consumer to inspect.
    • Kafka Consumer: Is the CacheInvalidationConsumer running? Is it subscribed to the correct topic and group ID? Check its logs for "Received cache invalidation event...".
    • Deserialization: Are your Kafka consumer's JsonDeserializer settings correct, especially spring.json.trusted.packages? Serialization/deserialization failures often happen silently or with cryptic errors.
    • Cache Manager: Is the cacheManager.getCache() call returning null? Ensure the cacheName in the event matches the value in your @Cacheable annotation (e.g., "products").
    • Key Mismatch: Is the key in the CacheInvalidationEvent exactly matching the key used by @Cacheable? Remember SpEL expressions in @Cacheable might generate different keys than a simple object ID. Ensure both Product.id (Long) and 'allProducts' (String) are handled correctly.
    • @TransactionalEventListener Timing: Ensure it's not trying to publish before the transaction commits. Default is AFTER_COMMIT.

4. Cache Not Populating

  • Symptoms: Every GET request results in a database hit.
  • Check:
    • Is @EnableCaching present on your main application class?
    • Are your @Cacheable annotations on the methods you expect to cache?
    • Is spring.cache.type (if set) pointing to redis?
    • Is your RedisCacheConfig properly configured with GenericJackson2JsonRedisSerializer?
    • Are the objects being cached Serializable?

5. Performance Issues

  • Symptoms: High CPU, slow responses despite caching.
  • Check:
    • Kafka Consumer Lag: Are your consumers falling behind? Increase spring.kafka.listener.concurrency if needed.
    • Redis Latency: Is Redis overloaded? Check Redis metrics.
    • Serialization Overhead: For very large objects, JSON serialization might be slower. Consider JdkSerializationRedisSerializer (less readable) or a more compact format like Protobuf or Avro (requires schema evolution strategy).
    • Too Many Cache Evictions: Are you allEntries=true too often? Try more granular invalidation.

Remember to leverage logging (slf4j and logback configured in Spring Boot) to trace the flow of execution and events through your application. Docker logs are your best friend for containerized services.

Conclusion: Empowering Your Microservices with Intelligent Caching

Mastering Distributed Caching with Event-Driven Invalidation is a powerful leap forward for any backend engineer working with modern microservice architectures. By intelligently combining the caching capabilities of Spring Boot with the robust messaging guarantees of Apache Kafka and the high-performance data storage of Redis, we can build systems that are not only incredibly fast but also maintain a high degree of data consistency.

This approach moves beyond simple TTL-based caching, offering a proactive, real-time mechanism to keep your cached data fresh and relevant across multiple services. It significantly offloads your primary data stores, reduces latency, and ultimately enhances the user experience. With Java 25 and Spring Boot 4.0 continuing to push the boundaries of performance and developer ergonomics, integrating sophisticated patterns like this is more accessible and crucial than ever. Embrace event-driven cache invalidation, and empower your microservices to achieve peak performance with confidence.


🔍 Deep-Dive Search Index & Tags

Developer Intent & Synonyms: Distributed Caching, Event-Driven Invalidation, Spring Boot 4.0 Caching, Apache Kafka Cache, Redis Microservices, Cache Consistency, Java 25 Backend, JPA PostgreSQL Cache, Microservice Performance, Real-time Cache Update, 캐시 무효화 전략, 분산 캐싱, 이벤트 기반 캐시, 스프링 부트 캐시, 카프카 Redis, 데이터 일관성, 마이크로서비스 성능 최적화, 고가용성 캐시, Cache Architecture