들어가며
마이크로서비스 환경에서 실시간 데이터 처리는 더 이상 선택이 아닌 필수가 되었습니다. 주문이 발생하면 즉시 재고를 업데이트하고, 사용자 행동 로그를 실시간으로 분석해 추천 시스템에 반영해야 합니다. Apache Kafka Streams는 이러한 실시간 스트림 처리를 별도의 클러스터 없이 Java 애플리케이션 안에서 수행할 수 있게 해주는 강력한 라이브러리입니다.
이 글에서는 Kafka Streams의 핵심 아키텍처부터 KStream과 KTable의 차이, Topology 구성, Windowed Aggregation, State Store, 장애 복구 메커니즘, 그리고 Spring Boot와의 연동까지 실전 예제와 함께 깊이 있게 다루겠습니다.
Kafka Streams 아키텍처 이해
Kafka Streams란?
Kafka Streams는 Apache Kafka 위에서 동작하는 클라이언트 라이브러리입니다. Spark Streaming이나 Flink 같은 별도의 클러스터가 필요 없고, 일반 Java 애플리케이션에 의존성만 추가하면 됩니다.
- 별도 클러스터 불필요: JAR 파일로 배포 가능, 운영 부담 최소화
- Exactly-Once 처리: Kafka 트랜잭션 기반으로 정확히 한 번 처리 보장
- 수평 확장: 파티션 수만큼 인스턴스를 늘려 자연스러운 스케일아웃
- 내결함성: State Store의 변경로그를 Kafka 토픽에 백업
- 이벤트 시간 처리: 이벤트 발생 시점 기준의 윈도우 처리 지원
핵심 개념 비교
| 개념 | 설명 | 비유 |
|---|---|---|
| KStream | 끊임없는 레코드 스트림 (INSERT 의미) | 로그 파일의 각 라인 |
| KTable | 키별 최신 값 (UPDATE/UPSERT 의미) | 데이터베이스 테이블 |
| GlobalKTable | 모든 파티션 데이터를 각 인스턴스에 복제 | 브로드캐스트 조인용 룩업 테이블 |
| Topology | 데이터 흐름을 정의하는 DAG(방향성 비순환 그래프) | 데이터 파이프라인 청사진 |
| State Store | 중간 결과를 저장하는 로컬 저장소 (RocksDB) | 인메모리 캐시 + 디스크 백업 |
KStream vs KTable 상세 비교
KStream - 이벤트 스트림
KStream은 토픽의 각 메시지를 독립적인 이벤트로 취급합니다. 같은 키로 여러 메시지가 오면 모두 별개의 레코드로 처리됩니다.
// KStream 예시: 주문 이벤트 스트림
KStream<String, OrderEvent> orderStream = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderEventSerde)
);
// 각 주문 이벤트를 개별 처리
orderStream
.filter((key, order) -> order.getAmount() > 10000)
.mapValues(order -> new OrderNotification(order.getUserId(), order.getAmount()))
.to("high-value-orders", Produced.with(Serdes.String(), notificationSerde));
KTable - 변경 로그 테이블
KTable은 같은 키의 새 메시지가 오면 이전 값을 덮어씁니다. 데이터베이스의 UPDATE 시맨틱과 동일합니다.
// KTable 예시: 사용자 프로필 (항상 최신 상태 유지)
KTable<String, UserProfile> userTable = builder.table(
"user-profiles",
Consumed.with(Serdes.String(), userProfileSerde),
Materialized.<String, UserProfile, KeyValueStore<Bytes, byte[]>>
as("user-profile-store")
.withKeySerde(Serdes.String())
.withValueSerde(userProfileSerde)
);
KStream-KTable 조인
실시간 주문 스트림에 사용자 정보를 조인하는 대표적인 패턴입니다.
// 주문 스트림 + 사용자 테이블 조인
KStream<String, EnrichedOrder> enrichedOrders = orderStream
.selectKey((key, order) -> order.getUserId()) // 조인 키 변경
.join(
userTable,
(order, user) -> new EnrichedOrder(
order.getOrderId(),
order.getAmount(),
user.getName(),
user.getGrade()
),
Joined.with(Serdes.String(), orderEventSerde, userProfileSerde)
);
enrichedOrders.to("enriched-orders");
Topology 구성
StreamsBuilder를 이용한 DSL 방식
Kafka Streams는 High-Level DSL과 Low-Level Processor API 두 가지 방식을 제공합니다. 대부분의 경우 DSL로 충분합니다.
@Configuration
public class KafkaStreamsTopologyConfig {
@Bean
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-aggregation-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-state");
return new KafkaStreamsConfiguration(props);
}
}
Topology 시각화
구성한 Topology는 describe() 메서드로 확인할 수 있습니다. 운영 시 데이터 흐름을 파악하는 데 매우 유용합니다.
Topology topology = builder.build();
System.out.println(topology.describe());
// 출력 예시:
// Topologies:
// Sub-topology: 0
// Source: KSTREAM-SOURCE-0000000000 (topics: [orders])
// --> KSTREAM-FILTER-0000000001
// Processor: KSTREAM-FILTER-0000000001
// --> KSTREAM-MAPVALUES-0000000002
// Processor: KSTREAM-MAPVALUES-0000000002
// --> KSTREAM-SINK-0000000003
// Sink: KSTREAM-SINK-0000000003 (topic: high-value-orders)
Windowed Aggregation - 시간 기반 집계
윈도우 종류
| 윈도우 타입 | 설명 | 사용 예시 |
|---|---|---|
| Tumbling Window | 고정 크기, 겹침 없음 | 매 5분마다 주문 건수 집계 |
| Hopping Window | 고정 크기, 겹침 허용 | 5분 윈도우를 1분 간격으로 슬라이딩 |
| Sliding Window | 이벤트 기반, 차이 범위 내 그룹 | 10초 이내 발생한 이벤트 그룹 |
| Session Window | 비활동 간격 기반 | 사용자 세션 (30분 비활동 시 종료) |
실시간 주문 집계 예제
@Component
public class OrderAggregationTopology {
@Autowired
public void buildPipeline(StreamsBuilder builder) {
// JSON Serde 설정
JsonSerde<OrderEvent> orderSerde = new JsonSerde<>(OrderEvent.class);
JsonSerde<OrderAggregation> aggregationSerde = new JsonSerde<>(OrderAggregation.class);
KStream<String, OrderEvent> orderStream = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderSerde)
.withTimestampExtractor(new OrderTimestampExtractor())
);
// 5분 텀블링 윈도우로 카테고리별 주문 집계
KTable<Windowed<String>, OrderAggregation> aggregated = orderStream
.groupBy(
(key, order) -> order.getCategory(),
Grouped.with(Serdes.String(), orderSerde)
)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
// 초기값
() -> new OrderAggregation(0L, 0L),
// Aggregator: 새 주문이 올 때마다 누적
(category, order, agg) -> new OrderAggregation(
agg.getCount() + 1,
agg.getTotalAmount() + order.getAmount()
),
Materialized.<String, OrderAggregation, WindowStore<Bytes, byte[]>>
as("order-aggregation-store")
.withKeySerde(Serdes.String())
.withValueSerde(aggregationSerde)
.withRetention(Duration.ofHours(24))
);
// 집계 결과를 출력 토픽으로 전송
aggregated.toStream()
.map((windowedKey, agg) -> KeyValue.pair(
windowedKey.key(),
String.format("{\"category\":\"%s\",\"window_start\":\"%s\",\"window_end\":\"%s\",\"count\":%d,\"total_amount\":%d}",
windowedKey.key(),
windowedKey.window().startTime(),
windowedKey.window().endTime(),
agg.getCount(),
agg.getTotalAmount())
))
.to("order-aggregation-results", Produced.with(Serdes.String(), Serdes.String()));
}
}
// 커스텀 TimestampExtractor
public class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
OrderEvent order = (OrderEvent) record.value();
return order.getOrderTime() != null
? order.getOrderTime().toEpochMilli()
: partitionTime;
}
}
State Store 깊이 이해하기
State Store란?
Kafka Streams의 집계, 조인 등 상태가 필요한 연산은 State Store에 중간 결과를 저장합니다. 기본 구현체는 RocksDB이며, 각 스트림 태스크마다 독립적인 Store 인스턴스를 가집니다.
- 로컬 디스크 저장: RocksDB가 로컬 디스크에 상태를 저장, 메모리 제한을 초과하는 대규모 상태도 처리 가능
- Changelog 토픽: 모든 State Store 변경사항이 Kafka 토픽에 자동 백업
- 인터랙티브 쿼리: 외부에서 State Store를 직접 조회 가능
인터랙티브 쿼리 - REST API로 State Store 조회
@RestController
@RequestMapping("/api/orders")
public class OrderAggregationController {
private final StreamsBuilderFactoryBean factoryBean;
public OrderAggregationController(StreamsBuilderFactoryBean factoryBean) {
this.factoryBean = factoryBean;
}
@GetMapping("/aggregation/{category}")
public ResponseEntity<OrderAggregation> getAggregation(
@PathVariable String category) {
KafkaStreams streams = factoryBean.getKafkaStreams();
ReadOnlyWindowStore<String, OrderAggregation> store =
streams.store(
StoreQueryParameters.fromNameAndType(
"order-aggregation-store",
QueryableStoreTypes.windowStore()
)
);
// 최근 1시간 내 윈도우 결과 조회
Instant from = Instant.now().minus(Duration.ofHours(1));
Instant to = Instant.now();
WindowStoreIterator<OrderAggregation> iterator =
store.fetch(category, from, to);
OrderAggregation latest = null;
while (iterator.hasNext()) {
latest = iterator.next().value;
}
iterator.close();
return latest != null
? ResponseEntity.ok(latest)
: ResponseEntity.notFound().build();
}
}
장애 복구 메커니즘
State Store 복구 과정
Kafka Streams 인스턴스가 다운되었다가 재시작되면 다음 과정으로 복구됩니다.
- 1단계: 로컬 디스크의 RocksDB 체크포인트 확인
- 2단계: Changelog 토픽에서 마지막 체크포인트 이후 변경분 재생(replay)
- 3단계: 최신 상태까지 복구 완료 후 처리 재개
Standby Replica 설정
// 핫 스탠바이 설정 - 장애 시 빠른 복구
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// State Store 복구 스레드 수
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
Standby Replica를 1 이상으로 설정하면, 다른 인스턴스가 State Store 사본을 미리 동기화해둡니다. 인스턴스가 죽으면 Standby가 즉시 인계받아 Changelog 전체를 재생하지 않아도 됩니다.
Spring Boot 연동 전체 구성
의존성 설정
<!-- build.gradle -->
dependencies {
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-web'
}
application.yml 설정
spring:
kafka:
streams:
application-id: order-stream-app
bootstrap-servers: localhost:9092
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
processing.guarantee: exactly_once_v2
state.dir: /var/kafka-streams/state
num.standby.replicas: 1
replication.factor: 3
commit.interval.ms: 1000
DTO 클래스
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private String userId;
private String category;
private Long amount;
private Instant orderTime;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderAggregation {
private Long count;
private Long totalAmount;
}
에러 핸들링
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> {
log.error("Kafka Streams 비정상 종료: thread={}", thread.getName(), exception);
// REPLACE_THREAD: 해당 스레드만 재시작
// SHUTDOWN_CLIENT: 인스턴스 종료
// SHUTDOWN_APPLICATION: 전체 앱 종료
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
});
factoryBean.setStateListener((newState, oldState) -> {
log.info("Kafka Streams 상태 변경: {} -> {}", oldState, newState);
});
};
}
운영 시 고려사항
성능 튜닝 포인트
| 설정 | 기본값 | 권장값 | 설명 |
|---|---|---|---|
| num.stream.threads | 1 | CPU 코어 수 | 병렬 처리 스레드 수 |
| commit.interval.ms | 30000 | 1000~5000 | 오프셋 커밋 주기 |
| cache.max.bytes.buffering | 10MB | 50~100MB | 레코드 캐시 버퍼 크기 |
| state.dir | /tmp | SSD 경로 | State Store 저장 경로 |
모니터링 메트릭
// Micrometer를 이용한 Kafka Streams 메트릭 노출
@Bean
public MeterBinder kafkaStreamsMeterBinder(StreamsBuilderFactoryBean factoryBean) {
return registry -> {
KafkaStreams streams = factoryBean.getKafkaStreams();
if (streams != null) {
streams.metrics().forEach((metricName, metric) -> {
if (metricName.name().contains("process-rate")
|| metricName.name().contains("process-latency-avg")
|| metricName.name().contains("commit-rate")) {
Gauge.builder("kafka.streams." + metricName.name(), metric, m -> {
Object val = m.metricValue();
return val instanceof Double ? (Double) val : 0.0;
}).register(registry);
}
});
}
};
}
마치며
Kafka Streams는 별도의 인프라 없이 Java 애플리케이션 안에서 실시간 스트림 처리를 구현할 수 있는 강력한 라이브러리입니다. 이 글에서 다룬 핵심 내용을 정리하면 다음과 같습니다.
- KStream은 이벤트 로그, KTable은 최신 상태 뷰로 서로 다른 의미를 가집니다
- Windowed Aggregation으로 시간 기반 실시간 집계를 구현할 수 있습니다
- State Store는 RocksDB + Changelog 토픽으로 내결함성을 보장합니다
- 인터랙티브 쿼리로 외부에서 집계 결과를 REST API로 즉시 조회할 수 있습니다
- Spring Boot와의 자연스러운 통합으로 프로덕션 레벨의 스트림 처리 애플리케이션을 빠르게 개발할 수 있습니다
다음 글에서는 Kafka Connect와 CDC(Change Data Capture)를 활용한 데이터 파이프라인 구축을 다루겠습니다. Kafka Streams와 Kafka Connect를 함께 활용하면 실시간 데이터 처리 파이프라인의 완성도를 한층 높일 수 있습니다.
'Kafka' 카테고리의 다른 글
| Kafka Connect와 CDC - 데이터 파이프라인 구축 완벽 가이드 (0) | 2026.04.03 |
|---|---|
| Kafka 운영 가이드 - 모니터링부터 장애 대응까지 (0) | 2026.03.31 |
| Kafka Consumer Group 리밸런싱 완벽 가이드 (0) | 2026.03.27 |
| Apache Kafka 핵심 개념 - 백엔드 개발자를 위한 완벽 가이드 (0) | 2026.03.25 |