Kafka

Kafka Streams 실전 가이드 - 실시간 스트림 처리의 모든 것

백엔드 개발자 김승원 2026. 4. 3. 10:51

들어가며

마이크로서비스 환경에서 실시간 데이터 처리는 더 이상 선택이 아닌 필수가 되었습니다. 주문이 발생하면 즉시 재고를 업데이트하고, 사용자 행동 로그를 실시간으로 분석해 추천 시스템에 반영해야 합니다. Apache Kafka Streams는 이러한 실시간 스트림 처리를 별도의 클러스터 없이 Java 애플리케이션 안에서 수행할 수 있게 해주는 강력한 라이브러리입니다.

이 글에서는 Kafka Streams의 핵심 아키텍처부터 KStream과 KTable의 차이, Topology 구성, Windowed Aggregation, State Store, 장애 복구 메커니즘, 그리고 Spring Boot와의 연동까지 실전 예제와 함께 깊이 있게 다루겠습니다.

Kafka Streams 아키텍처 이해

Kafka Streams란?

Kafka Streams는 Apache Kafka 위에서 동작하는 클라이언트 라이브러리입니다. Spark Streaming이나 Flink 같은 별도의 클러스터가 필요 없고, 일반 Java 애플리케이션에 의존성만 추가하면 됩니다.

  • 별도 클러스터 불필요: JAR 파일로 배포 가능, 운영 부담 최소화
  • Exactly-Once 처리: Kafka 트랜잭션 기반으로 정확히 한 번 처리 보장
  • 수평 확장: 파티션 수만큼 인스턴스를 늘려 자연스러운 스케일아웃
  • 내결함성: State Store의 변경로그를 Kafka 토픽에 백업
  • 이벤트 시간 처리: 이벤트 발생 시점 기준의 윈도우 처리 지원

핵심 개념 비교

개념 설명 비유
KStream 끊임없는 레코드 스트림 (INSERT 의미) 로그 파일의 각 라인
KTable 키별 최신 값 (UPDATE/UPSERT 의미) 데이터베이스 테이블
GlobalKTable 모든 파티션 데이터를 각 인스턴스에 복제 브로드캐스트 조인용 룩업 테이블
Topology 데이터 흐름을 정의하는 DAG(방향성 비순환 그래프) 데이터 파이프라인 청사진
State Store 중간 결과를 저장하는 로컬 저장소 (RocksDB) 인메모리 캐시 + 디스크 백업

KStream vs KTable 상세 비교

KStream - 이벤트 스트림

KStream은 토픽의 각 메시지를 독립적인 이벤트로 취급합니다. 같은 키로 여러 메시지가 오면 모두 별개의 레코드로 처리됩니다.

// KStream 예시: 주문 이벤트 스트림
KStream<String, OrderEvent> orderStream = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), orderEventSerde)
);

// 각 주문 이벤트를 개별 처리
orderStream
    .filter((key, order) -> order.getAmount() > 10000)
    .mapValues(order -> new OrderNotification(order.getUserId(), order.getAmount()))
    .to("high-value-orders", Produced.with(Serdes.String(), notificationSerde));

KTable - 변경 로그 테이블

KTable은 같은 키의 새 메시지가 오면 이전 값을 덮어씁니다. 데이터베이스의 UPDATE 시맨틱과 동일합니다.

// KTable 예시: 사용자 프로필 (항상 최신 상태 유지)
KTable<String, UserProfile> userTable = builder.table(
    "user-profiles",
    Consumed.with(Serdes.String(), userProfileSerde),
    Materialized.<String, UserProfile, KeyValueStore<Bytes, byte[]>>
        as("user-profile-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(userProfileSerde)
);

KStream-KTable 조인

실시간 주문 스트림에 사용자 정보를 조인하는 대표적인 패턴입니다.

// 주문 스트림 + 사용자 테이블 조인
KStream<String, EnrichedOrder> enrichedOrders = orderStream
    .selectKey((key, order) -> order.getUserId())  // 조인 키 변경
    .join(
        userTable,
        (order, user) -> new EnrichedOrder(
            order.getOrderId(),
            order.getAmount(),
            user.getName(),
            user.getGrade()
        ),
        Joined.with(Serdes.String(), orderEventSerde, userProfileSerde)
    );

enrichedOrders.to("enriched-orders");

Topology 구성

StreamsBuilder를 이용한 DSL 방식

Kafka Streams는 High-Level DSL과 Low-Level Processor API 두 가지 방식을 제공합니다. 대부분의 경우 DSL로 충분합니다.

@Configuration
public class KafkaStreamsTopologyConfig {

    @Bean
    public KafkaStreamsConfiguration kafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregation-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-state");
        return new KafkaStreamsConfiguration(props);
    }
}

Topology 시각화

구성한 Topology는 describe() 메서드로 확인할 수 있습니다. 운영 시 데이터 흐름을 파악하는 데 매우 유용합니다.

Topology topology = builder.build();
System.out.println(topology.describe());

// 출력 예시:
// Topologies:
//    Sub-topology: 0
//     Source: KSTREAM-SOURCE-0000000000 (topics: [orders])
//       --> KSTREAM-FILTER-0000000001
//     Processor: KSTREAM-FILTER-0000000001
//       --> KSTREAM-MAPVALUES-0000000002
//     Processor: KSTREAM-MAPVALUES-0000000002
//       --> KSTREAM-SINK-0000000003
//     Sink: KSTREAM-SINK-0000000003 (topic: high-value-orders)

Windowed Aggregation - 시간 기반 집계

윈도우 종류

윈도우 타입 설명 사용 예시
Tumbling Window 고정 크기, 겹침 없음 매 5분마다 주문 건수 집계
Hopping Window 고정 크기, 겹침 허용 5분 윈도우를 1분 간격으로 슬라이딩
Sliding Window 이벤트 기반, 차이 범위 내 그룹 10초 이내 발생한 이벤트 그룹
Session Window 비활동 간격 기반 사용자 세션 (30분 비활동 시 종료)

실시간 주문 집계 예제

@Component
public class OrderAggregationTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder builder) {

        // JSON Serde 설정
        JsonSerde<OrderEvent> orderSerde = new JsonSerde<>(OrderEvent.class);
        JsonSerde<OrderAggregation> aggregationSerde = new JsonSerde<>(OrderAggregation.class);

        KStream<String, OrderEvent> orderStream = builder.stream(
            "orders",
            Consumed.with(Serdes.String(), orderSerde)
                .withTimestampExtractor(new OrderTimestampExtractor())
        );

        // 5분 텀블링 윈도우로 카테고리별 주문 집계
        KTable<Windowed<String>, OrderAggregation> aggregated = orderStream
            .groupBy(
                (key, order) -> order.getCategory(),
                Grouped.with(Serdes.String(), orderSerde)
            )
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                // 초기값
                () -> new OrderAggregation(0L, 0L),
                // Aggregator: 새 주문이 올 때마다 누적
                (category, order, agg) -> new OrderAggregation(
                    agg.getCount() + 1,
                    agg.getTotalAmount() + order.getAmount()
                ),
                Materialized.<String, OrderAggregation, WindowStore<Bytes, byte[]>>
                    as("order-aggregation-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(aggregationSerde)
                    .withRetention(Duration.ofHours(24))
            );

        // 집계 결과를 출력 토픽으로 전송
        aggregated.toStream()
            .map((windowedKey, agg) -> KeyValue.pair(
                windowedKey.key(),
                String.format("{\"category\":\"%s\",\"window_start\":\"%s\",\"window_end\":\"%s\",\"count\":%d,\"total_amount\":%d}",
                    windowedKey.key(),
                    windowedKey.window().startTime(),
                    windowedKey.window().endTime(),
                    agg.getCount(),
                    agg.getTotalAmount())
            ))
            .to("order-aggregation-results", Produced.with(Serdes.String(), Serdes.String()));
    }
}

// 커스텀 TimestampExtractor
public class OrderTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        OrderEvent order = (OrderEvent) record.value();
        return order.getOrderTime() != null
            ? order.getOrderTime().toEpochMilli()
            : partitionTime;
    }
}

State Store 깊이 이해하기

State Store란?

Kafka Streams의 집계, 조인 등 상태가 필요한 연산은 State Store에 중간 결과를 저장합니다. 기본 구현체는 RocksDB이며, 각 스트림 태스크마다 독립적인 Store 인스턴스를 가집니다.

  • 로컬 디스크 저장: RocksDB가 로컬 디스크에 상태를 저장, 메모리 제한을 초과하는 대규모 상태도 처리 가능
  • Changelog 토픽: 모든 State Store 변경사항이 Kafka 토픽에 자동 백업
  • 인터랙티브 쿼리: 외부에서 State Store를 직접 조회 가능

인터랙티브 쿼리 - REST API로 State Store 조회

@RestController
@RequestMapping("/api/orders")
public class OrderAggregationController {

    private final StreamsBuilderFactoryBean factoryBean;

    public OrderAggregationController(StreamsBuilderFactoryBean factoryBean) {
        this.factoryBean = factoryBean;
    }

    @GetMapping("/aggregation/{category}")
    public ResponseEntity<OrderAggregation> getAggregation(
            @PathVariable String category) {

        KafkaStreams streams = factoryBean.getKafkaStreams();
        ReadOnlyWindowStore<String, OrderAggregation> store =
            streams.store(
                StoreQueryParameters.fromNameAndType(
                    "order-aggregation-store",
                    QueryableStoreTypes.windowStore()
                )
            );

        // 최근 1시간 내 윈도우 결과 조회
        Instant from = Instant.now().minus(Duration.ofHours(1));
        Instant to = Instant.now();

        WindowStoreIterator<OrderAggregation> iterator =
            store.fetch(category, from, to);

        OrderAggregation latest = null;
        while (iterator.hasNext()) {
            latest = iterator.next().value;
        }
        iterator.close();

        return latest != null
            ? ResponseEntity.ok(latest)
            : ResponseEntity.notFound().build();
    }
}

장애 복구 메커니즘

State Store 복구 과정

Kafka Streams 인스턴스가 다운되었다가 재시작되면 다음 과정으로 복구됩니다.

  • 1단계: 로컬 디스크의 RocksDB 체크포인트 확인
  • 2단계: Changelog 토픽에서 마지막 체크포인트 이후 변경분 재생(replay)
  • 3단계: 최신 상태까지 복구 완료 후 처리 재개

Standby Replica 설정

// 핫 스탠바이 설정 - 장애 시 빠른 복구
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

// State Store 복구 스레드 수
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);

Standby Replica를 1 이상으로 설정하면, 다른 인스턴스가 State Store 사본을 미리 동기화해둡니다. 인스턴스가 죽으면 Standby가 즉시 인계받아 Changelog 전체를 재생하지 않아도 됩니다.

Spring Boot 연동 전체 구성

의존성 설정

<!-- build.gradle -->
dependencies {
    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-web'
}

application.yml 설정

spring:
  kafka:
    streams:
      application-id: order-stream-app
      bootstrap-servers: localhost:9092
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        processing.guarantee: exactly_once_v2
        state.dir: /var/kafka-streams/state
        num.standby.replicas: 1
        replication.factor: 3
        commit.interval.ms: 1000

DTO 클래스

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String userId;
    private String category;
    private Long amount;
    private Instant orderTime;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderAggregation {
    private Long count;
    private Long totalAmount;
}

에러 핸들링

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
            kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> {
                log.error("Kafka Streams 비정상 종료: thread={}", thread.getName(), exception);
                // REPLACE_THREAD: 해당 스레드만 재시작
                // SHUTDOWN_CLIENT: 인스턴스 종료
                // SHUTDOWN_APPLICATION: 전체 앱 종료
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
        });
        factoryBean.setStateListener((newState, oldState) -> {
            log.info("Kafka Streams 상태 변경: {} -> {}", oldState, newState);
        });
    };
}

운영 시 고려사항

성능 튜닝 포인트

설정 기본값 권장값 설명
num.stream.threads 1 CPU 코어 수 병렬 처리 스레드 수
commit.interval.ms 30000 1000~5000 오프셋 커밋 주기
cache.max.bytes.buffering 10MB 50~100MB 레코드 캐시 버퍼 크기
state.dir /tmp SSD 경로 State Store 저장 경로

모니터링 메트릭

// Micrometer를 이용한 Kafka Streams 메트릭 노출
@Bean
public MeterBinder kafkaStreamsMeterBinder(StreamsBuilderFactoryBean factoryBean) {
    return registry -> {
        KafkaStreams streams = factoryBean.getKafkaStreams();
        if (streams != null) {
            streams.metrics().forEach((metricName, metric) -> {
                if (metricName.name().contains("process-rate")
                    || metricName.name().contains("process-latency-avg")
                    || metricName.name().contains("commit-rate")) {
                    Gauge.builder("kafka.streams." + metricName.name(), metric, m -> {
                        Object val = m.metricValue();
                        return val instanceof Double ? (Double) val : 0.0;
                    }).register(registry);
                }
            });
        }
    };
}

마치며

Kafka Streams는 별도의 인프라 없이 Java 애플리케이션 안에서 실시간 스트림 처리를 구현할 수 있는 강력한 라이브러리입니다. 이 글에서 다룬 핵심 내용을 정리하면 다음과 같습니다.

  • KStream은 이벤트 로그, KTable은 최신 상태 뷰로 서로 다른 의미를 가집니다
  • Windowed Aggregation으로 시간 기반 실시간 집계를 구현할 수 있습니다
  • State Store는 RocksDB + Changelog 토픽으로 내결함성을 보장합니다
  • 인터랙티브 쿼리로 외부에서 집계 결과를 REST API로 즉시 조회할 수 있습니다
  • Spring Boot와의 자연스러운 통합으로 프로덕션 레벨의 스트림 처리 애플리케이션을 빠르게 개발할 수 있습니다

다음 글에서는 Kafka Connect와 CDC(Change Data Capture)를 활용한 데이터 파이프라인 구축을 다루겠습니다. Kafka Streams와 Kafka Connect를 함께 활용하면 실시간 데이터 처리 파이프라인의 완성도를 한층 높일 수 있습니다.