카프카 스트림즈 개념 정리 / Java 카프카 스트림즈 애플리케이션 개발 방법
카프카 스트림즈 개념 정리
카프카 스트림즈를 이용하면 토픽 스트림 데이터 처리 후, 토픽 또는 외부 시스템에 저장할 수 있습니다.
처리할 토픽 파티션이 3개면 스트림즈 애플리케이션 내부에 3개의 태스크가 자동으로 할당됩니다.
파티션 수만큼 생성된 태스크들이 병렬로 데이터를 처리하므로 성능이 좋습니다.
같은 application.id로 스트림즈 애플리케이션들을 실행하면,
프로세스-스레드가 분리되어 일부 프로세스에 장애가 발생해도 안정적인 운영이 가능합니다.
스파크 스트리밍은 여러 데이터 소스로부터 데이터를 입력받을 수 있지만,
카프카 스트림즈는 카프카 토픽만 입력 소스로 사용합니다.
스트림즈 프로세서 종류
소스 프로세서 | 토픽에서 데이터를 가져오는 역할 |
스트림즈 프로세서 | 다른 프로세서가 반환한 데이터를 처리하는 역할 |
싱크 프로세서 | 처리가 완료된 데이터를 다른 토픽에 저장하는 역할 |
스트림즈 DSL
스트림즈 애플리케이션 개발 시 주로 사용되는 고수준 추상화 API입니다.
메시지 값을 기반으로 토픽 분기, 지난 10분간 들어온 데이터 수 집계 등이 가능합니다.
KStream 컨슈머처럼 토픽을 실시간으로 구독하고, 레코드를 순차적으로 하나씩 처리하는 비상태 스트림 처리 모델입니다.
KTable
토픽의 메시지 키 기준으로 가장 최신 값만 유지하는 상태 기반 테이블 모델입니다.
각 스트림즈 태스크가 일부 파티션 데이터를 로컬 상태 저장소에 Materialized View (구체화된 뷰) 로 저장합니다.
KTable 조인 시에는 반드시 코파티셔닝이 필요합니다.
KTable 데이터를 keyValueStore로 조회하면, 로컬 캐시처럼 키-값 상태 저장소로 활용 가능합니다.
GlobalKTable
토픽의 메시지 키 기준으로 가장 최신 값만 유지하는 전역 상태 기반 테이블 모델입니다.
각 스트림즈 태스크가 전체 파티션 데이터를 로컬 상태 저장소에 Materialized View (구체화된 뷰) 로 저장합니다.
조인 대상 테이블 파티션 수가 다르거나 코파티셔닝이 어려운 경우 사용하면 좋습니다.
너무 많은 데이터를 가진 토픽으로 GlobalKTable 선언해서 조인하면 부하가 생길 수 있습니다.
프로세서 API
스트림즈 애플리케이션 개발 시 스트림즈 DSL 보다 세밀한 커스텀 로직 구현이 가능한 API입니다.
스트림즈 DSL과 사용하면 KStream, KTable, GlobalKTable 개념도 연동할 수 있습니다.
메시지 값 종류에 따라 동적으로 토픽 전송, 일정한 시간 간격으로 데이터 처리 등이 가능합니다.
Processor 인터페이스
입력 데이터 처리 후 다음 프로세서로 데이터를 자동 전달하지 않고 직접 제어할 때 사용합니다.
ProcessorContext를 이용하면 필요한 시점에만 데이터를 전달할 수 있습니다.
Transformer 인터페이스
입력 데이터 처리 후 다음 프로세서로 데이터를 전달할 때 사용합니다.
카프카 스트림즈 DSL 애플리케이션 개발
Java 프로젝트 생성 후 아래와 같이 구현하고 Java 프로세스를 실행하면 됩니다.
카프카 스트림즈 라이브러리 추가
dependencies {
implementation 'org.apache.kafka:kafka-streams:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
java 프로젝트 내 build.gradle 파일에 카프카 브로커와 동일한 버전의 스트림즈 라이브러리를 추가합니다.
카프카 스트림즈 로그 출력을 위해 slf4j 라이브러리 의존성도 추가합니다.
KStream 필터링 스트림즈 애플리케이션 예시
public class StreamsFilter {
private final static Logger LOGGER = LoggerFactory.getLogger(StreamsFilter.class);
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String APPLICATION_ID = "스트림즈애플리케이션그룹ID";
public static void main(String[] args) {
// 스트림즈 DSL 필수 옵션 설정
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
// 스트림즈 DSL 선택 옵션 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 스트림즈 빌더 생성
StreamsBuilder builder = new StreamsBuilder();
// 토픽 KStream 생성
KStream<String, String> streamLog = builder.stream("소스토픽명");
// 소스 토픽 필터링
KStream<String, String> filteredStream = streamLog.filter(
(key, value) -> value != null && value.length() > 5
);
// 필터링 결과 싱크 토픽에 저장
filteredStream.to("싱크토픽명");
// 카프카 스트림즈 실행
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 카프카 스트림즈 종료 셧다운 훅
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
토픽에 들어온 레코드의 메시지 키 또는 값을 필터링하는 비상태 스트림즈 애플리케이션 예시입니다.
실시간으로 소스 프로세서에서 받은 소스 토픽 레코드를 스트림 프로세서에서 필터링합니다.
조건에 맞는 레코드만 싱크 프로세서로 전달하여 싱크 토픽에 저장합니다.
KStream, KTable 조인 예시
// 스트림즈 빌더 생성
StreamsBuilder builder = new StreamsBuilder();
// 소스 프로세서 이용하여 토픽 KStream, KTable 생성
KStream<String, String> orderStream = builder.stream("주문_소스토픽명");
KTable<String, String> addressTable = builder.table("주소_소스토픽명");
// KStream, KTable 조인 후 싱크토픽으로 전송
orderStream.join(addressTable, (order, adress) -> order + "send to" + address)
.to("싱크토픽명");
// 카프카 스트림즈 실행
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
주문 소스토픽에 데이터가 들어오면 최신 주소만 유지하는 주소 소스토픽 KTable과 조인하고,
조인 결과를 싱크 토픽에 저장하는 스트림즈 애플리케이션 예시입니다.
메시지 키가 동일한 데이터만 실시간으로 조인되며, 메시지 키가 null이면 조인되지 않습니다.
KStream, KTable 조인 시에는 두 토픽이 반드시 코파티셔닝 되어 있어야 합니다.
KStream, GlobalKTable 조인 예시
// 스트림즈 빌더 생성
StreamsBuilder builder = new StreamsBuilder();
// 소스 프로세서 이용하여 토픽 KStream, GlobalKTable 생성
KStream<String, String> orderStream = builder.stream("주문_소스토픽명");
GlobalKTable<String, String> addressGlobalTable = builder.globalTable("주소_소스토픽명");
// KStream, GlobalKTable 조인 후 싱크 토픽으로 전송
orderStream.join(
addressGlobalTable,
(orderKey, orderValue) -> orderKey,
(order, address) -> order + "send to" + address)
).to("싱크토픽명");
// 카프카 스트림즈 실행
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
GlobalKTable과 조인 시에는 조인 기준 메시지 키 지정 함수가 필요합니다.
GlobalKTable은 모든 파티션에 접근할 수 있어서 두 토픽 간 코파티셔닝이 필요하지 않습니다.
KTable 키-값 스토어 활용 예시
// 토픽 KTable 생성 및 상태저장소명 지정
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table("주소_소스토픽명", Materialized.as("상태저장소명"));
// 카프카 스트림즈 객체 생성
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 키-값 스토어 생성
ReadOnlyKeyValueStore<String, String> keyValueStore;
// KTable이 저장된 상태 저장소에서 keyValueStore 조회
keyValueStore = streams.store(StoreQueryParameters.fromNameAndType("상태저장소명"), QueryableStoreTypes.keyValueStore());
// keyValueStore에서 모든 키, 값 쌍 가져오기
KeyValueIterator<String, String> address = keyValueStore.all();
// 반복하며 전체 값 출력
address.forEachRemaining(keyValue -> log.info(keyValue.toString()));
스트림즈 DSL 윈도우 프로세싱
윈도우 프로세싱은 시간 기반으로 스트림 데이터를 분석할 때 많이 활용됩니다.
특정 시간 동안의 집계, 평균 계산, 카운트 등을 할 수 있습니다.
모든 프로세싱은 메시지 키를 기준으로 취합됩니다.
커스텀 파티셔너를 사용하면 정확한 취합이 어려울 수 있습니다.
텀블링 윈도우 프로세싱
고정된 시간 간격으로 윈도우를 서로 겹치지 않게 나누어 데이터를 집계합니다.
하나의 이벤트는 하나의 윈도우에만 포함됩니다.
스트림즈 DSL 텀블링 윈도우 예시
// 스트림즈 빌더 생성
StreamsBuilder builder = new StreamsBuilder();
// 토픽 KTable 생성
KStream<String, String> stream = builder.stream("로그_소스토픽명");
// 메시지 키 그룹화 후 텀블링 윈도우로 5초 단위 데이터 집계
KTable<Windowed<String>, Long> countTable = stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(); // 각 윈도우 내 레코드 수 카운트
// 윈도우 시작 시간, 종료 시간, 카운트 출력
countTable.toStream().foreach((key, value) -> {
log.info("[" + key.window().startTime() + " ~ " + key.window().endTime() + "] " + key.key() + " count : " + value);
});
로그 토픽으로 들어오는 동일 메시지 키 데이터가 5초에 몇 개인지 지속적으로 카운트해서,
KTable로 저장하고 출력하는 텀플링 윈도우 프로세싱 예시입니다.
실제 출력 결과는 아래와 같습니다.
[2025-05-15T00:00:00Z ~ 2025-05-15T00:00:05Z] userA count : 1
[2025-05-15T00:00:00Z ~ 2025-05-15T00:00:05Z] userA count : 2
[2025-05-15T00:00:00Z ~ 2025-05-15T00:00:05Z] userB count : 1
[2025-05-15T00:00:05Z ~ 2025-05-15T00:00:10Z] userA count : 1
[2025-05-15T00:00:05Z ~ 2025-05-15T00:00:10Z] userA count : 2
중간 집계 결과가 출력되므로, 동일 윈도우 데이터는 나중에 출력된 데이터로 덮어써야 합니다.
호핑 윈도우 프로세싱
일정 시간 간격으로 윈도우를 서로 겹치게 나누어 데이터를 처리합니다.
동일 메시지 키를 가진 레코드는 여러 윈도우에 포함될 수 있습니다.
슬라이딩 윈도우 프로세싱
레코드 타임스탬프 기준으로 윈도우를 서로 겹치게 나누어 데이터를 처리합니다.
이벤트의 발생 시점에 따라 윈도우가 겹칠 수 있습니다.
세션 윈도우 프로세싱
동일 메시지 키를 가진 레코드를 한 세션에 묶어 연산할 때 사용합니다.
세션 만료 시간에 따라 윈도우 사이즈가 달라지며, 사용자 행동 분석 등에 적합합니다.
프로세서 API 사용한 스트림즈 애플리케이션 개발
Processor 인터페이스 구현 예시
public class FilterProcessor implements Processor<String, String> { // 메시지 키, 메시지 값 타입 지정
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
// 컨텍스트 초기화
this.context = context;
}
@Override
public void process(String key, String value) {
if (value.length() > 5) {
// 컨텍스트 이용해서 다음 프로세서로 데이터 전달
context.forward(key, value);
}
// 오프셋 커밋 (필요 시 호출)
context.commit();
}
@Override
public void close() {
// 리소스 해제 처리
}
}
메시지 처리 흐름을 제어하는 Processor 인터페이스를 구현합니다.
아래와 같이, 구현한 Processor 클래스를 스트림 Processor 노드로 지정하면 됩니다.
// 토폴로지 생성
Topology topology = new Topology();
// 소스 프로세서, 스트림 프로세서, 싱크 프로세서 지정
topology.addSource("Source", 소스토픽명)
.addProcessor("Process",
() -> new FilterProcessor(),
"Source")
.addSink("Sink", 싱크토픽명, "Process");
// 카프카 스트림즈 인스턴스 생성
KafkaStreams streaming = new KafkaStreams(topology, props);
// 스트림즈 애플리케이션 실행
streaming.start();