아파치 카프카 설정 방법 / 브로커, 토픽, 프로듀서, 컨슈머, 스트림즈 옵션 정리

카프카 브로커 설정

카프카폴더/config/server.properties 파일에 브로커 실행 시 사용되는 설정들을 정의합니다.

브로커 ID 설정

broker.id=0

같은 클러스터 내에서 현재 실행할 브로커 고유 ID를 설정합니다.

브로커 리스너 설정

-- 브로커가 요청을 수신할 주소 설정 (내부 바인딩)
listeners=PLAINTEXT://localhost:9092

-- 클라이언트가 접속할 브로커 주소 설정 (외부 연결용)
advertised.listeners=PLAINTEXT://localhost:9092

클라이언트는 advertised.listeners에 설정된 브로커 주소:포트를 통해 접속합니다ㅈㄴㅈㄴ.

로그 디렉터리 설정

log.dirs=/로그저장폴더경로

프로듀서가 보낸 메시지 로그를 해당 디렉터리 경로에 파일로 저장합니다.
‘토픽명-파티션번호’ 하위 디렉터리를 자동 생성하여 분류합니다.

카프카 도커 옵션 예시

KAFKA_LOG_DIRS: '/tmp/kafka-logs'

도커에서는 docker-compose.yml 파일에 카프카 로그 세그먼트 저장 경로를 위와 같이 정의합니다.

토픽-파티션별 카프카 로그 파일 예시

00000000000000000000.log 오프셋, 타임스탬프, 헤더, 메시지 키, 메시지 값 등 실제 메시지 데이터 저장
00000000000000000000.index 오프셋과 로그 파일 내 위치를 매핑한 인덱스 파일
00000000000000000000.timeindex 타임스탬프와 로그 파일 내 위치를 매핑한 인덱스 파일
leader-epoch-checkpoint 리더 에폭 정보 저장 (리더 변경 기록 관리)
partition.metadata 파티션 설정 정보 저장

파일명이 00000000000000000010.log면, 오프셋 10번부터의 데이터가 저장된 로그 파일입니다.
프로듀서에서 받은 메시지에 브로커가 오프셋을 부여하고, 타임스탬프를 지정하여 저장합니다.

로그 파일 생성주기 설정

-- 로그 파일 (=세그먼트 파일) 최대 크기 지정
log.segment.bytes
또는
-- 세그먼트 신규 생성 후 다음 파일로 넘어가는 주기 (기본값 7일)
log.roll.ms(hosurs)

로그 파일 삭제주기 설정

-- 세그먼트 삭제 주기. 리텐션 기간 (기본값 7일, 일반적으로 3일)
log.retention.hours=168
또는
-- 파티션당 로그 최대 크기 도달 시 삭제 (기본값 -1)
retrntion.bytes

-- 삭제 대상 체크 주기 (기본값 5분)
log.retention.check.interval.ms

카프카 데이터는 세그먼트 단위로 삭제되며, 레코드 단위 삭제 및 수정은 불가합니다.

액티브 세그먼트
쓰기가 일어나고 있는 가장 마지막 세그먼트 파일을 액티브 세그먼트라고 합니다.
액티브 세그먼트는 브로커의 삭제 대상이 되지 않습니다.
일반 세그먼트는 retention 옵션 기간이 지나면 삭제될 수 있습니다.

클린업 정책 설정

cleanup.policy=compact
delete 일정 시간 경과 또는 기준 크기 초과 시 세그먼트 전체를 삭제합니다.
compact 액티브 세그먼트에서 각 메시지 키의 가장 최신 레코드만 남기고, 오래된 레코드들은 삭제하여 압축합니다.

compact 정책 로그 상태 분류

클린 로그 (테일 영역) 압축이 완료된 로그 (중복 메시지 키 없음)
더티 로그 (헤드 영역) 압축 정책 실행 전 로그 (중복 메시지 키 있음)

압축 시작 시점 설정

min.cleanable.dirty.ratio

클린 레코드에 비한 더티 레코드 비율에 따라 압축 실행합니다.

0.9 압축 효과가 좋으나, 더티 레코드 0.9 비율이 될 때까지 용량 효율이 좋지 않습니다.
0.1 압축이 자주 일어나서 브로커에 부담이 되고, 최신 데이터만 유지할 수 있습니다.

파티션 수 설정

num.partitions=3

카프카 토픽 생성 시 기본적으로 만들 파티션 수를 정의합니다.
토픽 생성 시 파티션 수를 지정하지 않으면, 이 브로커 설정으로 생성됩니다.

주키퍼 연결 설정

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000

브로커에서 연결할 주키퍼 서버주소:포트, 연결 타임아웃 시간을 설정합니다.
카프카 3.0 이하 버전에서는 브로커 실행 전 주키퍼를 필수로 실행해야 합니다.

리더파티션 자동 분배 활성화

auto.leader.rebalance.enable=true

리더파티션이 특정 브로커에 몰린 경우, 자동으로 리더파티션을 재분배하도록 설정합니다.

신규 토픽 자동 생성 비활성화

auto.create.topics.enable=false

프로듀서, 컨슈머가 존재하지 않는 토픽 요청 시 자동으로 토픽이 생성되지 않도록 방지합니다.
불필요한 토픽 생성을 방지하며, 토픽을 CLI로 수동 생성해야 합니다.


카프카 토픽 설정

토픽 단위로 설정할 수 있는 옵션들입니다.

로그 파일 삭제주기 설정

-- 세그먼트 삭제 주기. 리텐션 기간 (기본값 7일, 일반적으로 3일)
retention.ms
또는
retention.minutes
또는
retention.hours

브로커 기본 설정 (log.retention.hours 등) 보다 우선 적용되는, 토픽별 설정 옵션입니다.

파티션 복제 수 설정

replication.factor

토픽 생성 시 파티션 복제 수는 최소 1 (복제 없음) 부터 최대 브로커 개수까지 설정 가능합니다.
운영 브로커가 5개면, 2~3으로 설정하는 것이 일반적입니다.
각 파티션을 복제하여 디스크 용량을 차지하는 대신, 시스템 안정성과 가용성을 높일 수 있습니다.

파티션 복제 수 설정 시 참고
일부 데이터가 유실되어도 되는 GPS 데이터는 복제하지 않고 운영하기도 합니다.
데이터가 유실되면 안 되는 금융 데이터는 복제 수를 3으로 설정하여 운영하기도 합니다.

ISR 최소 개수 설정

min.insync.replicas

프로듀서 데이터 전송이 성공하려면 최소 몇 개의 복제본이 동기화 상태여야 하는지 정의합니다.
프로듀서 설정이 acks=all이고 min.insync.replicas=2인 경우,
ISR이 1개로 줄어들면 해당 토픽에 프로듀서 메시지 전송이 실패합니다.

파티션 리더 승급 설정

-- 데이터 유실을 감수하고, 지속적으로 데이터 처리
-- 복제가 안된 (ISR이 아닌) 팔로워 파티션을 리더로 승급하는 옵션
unclean.leader.election.enable=true

-- 데이터 유실을 감수하지 않고, 리더 파티션이 복구될 때까지 중단
-- 복제가 안된 (ISR이 아닌) 팔로워 파티션을 리더로 승급하지 않는 옵션
unclean.leader.election.enable=false

메시지 타임스탬프 설정

message.timestamp.type

타임스탬프 기본값은 프로듀서 레코드 생성 시간이고, 브로커 적재 시간으로 변경 가능합니다.


프로듀서 옵션 설정

프로듀서 필수 옵션

bootstrap.servers 프로듀서가 레코드를 전송할 카프카 클러스터 내 브로커 호스트명:포트 목록 작성
2개 이상 작성 시 브로커 장애 대비 가능
key.serializer 메시지 키를 직렬화 할 클래스 지정
String Serializer로 통일하면 컨슈머 역직렬화 클래스 설정 및 디버깅 시 용이
value.serializer 메시지 값을 직렬화 할 클래스 지정

프로듀서 선택 옵션

acks ★ 프로듀서가 보낸 데이터에 대해 브로커로부터 성공 응답을 받을 조건 설정
복제 강도를 높여서 성능 낮추고 신뢰성 높게 저장할지 여부
복제 수가 많을수록 차이가 큼

기본값 : 1

0 : 데이터 전송 후 응답 확인 X (가장 빠르지만 브로커 장애 시 데이터 유실 위험 큼)
1 : 리더 브로커에 레코드 저장 시 성공 응답
all 또는 -1 : 리더 + ISR에 포함된 팔로워들에 레코드 저장 시 성공 응답 (가장 느리고 신뢰성 높음)
linger.ms Accumulator에 모은 배치 데이터 전송 전 기다리는 최소 시간

기본값 : 0 (Send 시 즉시 전송)
retries 브로커 전송 실패 시 재전송 시도 횟수

기본값 : Integer.MAX_VALUE (2147483647)
max.in.flight.requests.per.connection 한 커넥션에서 동시 전송할 수 있는 최대 요청 수

기본값 : 5 (순서 보장 필요 시 1로 설정)
partitioner.class 레코드를 어느 파티션에 보낼지 결정하는 파티셔너 클래스 지정

기본값 : DefaultPartitioner (카프카 2.4 이상 : UniformStickyPartitioner)
enable.idempotence 네트워크 오류, 재시도 시에도 중복 없이 한 번만 레코드를 적재하는 멱등성 보장 여부
true 설정 시 acks=all 등으로 자동 변경되어 처리 속도가 느려질 수 있음
멱등성 프로듀서로 동작할 필요가 있는 경우 true 설정
각 레코드에 프로듀서 ID, 파티션별 시퀀스 번호를 부여하여 중복 전송 방지 및 안정성 향상

카프카 3.0 이상 기본값 : true
transactional.id 레코드를 트랙잭션 단위로 묶어서 전송 여부

기본값 : null (트랜잭션 프로듀서가 아닌, 기본 프로듀서로 동작)
metadata.max.age.ms 클러스터 메타데이터 강제 갱신 요청 주기

기본값 : 5분

카프카 프로듀서는 초기 메시지 전송 전 또는 메시지 전송 중 오류가 발생했을 때 클러스터에 메타데이터를 요청하고 응답 받아서 캐시에 저장합니다.
메타데이터로 각 토픽의 리더 파티션이 어떤 브로커에 있는지 알 수 있습니다.
리더 파티션이 있는 브로커와 메시지를 통신하기 위해 프로듀서, 컨슈머는 메타데이터 갱신이 주기적으로 필요합니다.
컨슈머에서 잘못된 브로커로 데이터 요청 시에는 LEADER_NOT_AVAILABLE 익셉션이 발생합니다.
metadata.max.idle.ms 프로듀서 유휴 상태 시 메타데이터 캐시 유지 주기

기본값 : 5분

카프카 2.5.0 버전 기준 기본값이며, 사용하는 클라이언트 라이브러리 버전에 따라 기본값이 다를 수 있습니다.


컨슈머 옵션 설정

컨슈머 필수 옵션

bootstrap.servers 컨슈머가 카프카 클러스터에 연결할 때 사용할 브로커 호스트명:포트 목록 작성
2개 이상 작성 시 브로커 장애 대비 가능
key.deserializer 토픽에서 읽은 메시지 키를 역직렬화 할 클래스 지정
프로듀서에서 직렬화 한 클래스와 동일하게 설정
value.deserializer 토픽에서 읽은 메시지 값을 역직렬화 할 클래스 지정

컨슈머 선택 옵션

group.id 컨슈머가 속할 컨슈머 그룹 ID
subscribe()로 토픽 구독 시 필수 값
auto.offset.reset ★ 컨슈머 그룹이 처음 시작되어 컨슈머 오프셋이 없는 경우, 어느 오프셋부터 읽을지 설정

기본값 : latest

latest : 가장 최근 오프셋부터 읽기 시작
earliest : 가장 예전 오프셋부터 읽기 시작
none : 커밋이 이후 오프셋부터 읽기 시작, 커밋 기록이 없으면 오류 반환
enable.auto.commit 자동 커밋 여부

기본값 : true
auto.commit.interval.ms 자동 오프셋 커밋 주기

기본값 : 5000 (5초)
max.poll.records poll() 시 반환되는 레코드 개수

기본값 : 500
session.timeout.ms 컨슈머-브로커 연결 타임아웃 시간
설정한 시간 내 브로커에 컨슈머의 하트비트가 오지 않으면 리밸런싱 발생

기본값 : 10000 (10초)
heartbeat.interval.ms 컨슈머가 브로커에게 하트비트 전송 주기

기본값 : 3000 (3초)
max.poll.interval.ms poll() 호출 간격의 최대 허용 시간
리밸런싱 판단 기준 (시간 내 poll 호출이 없으면 컨슈머 세션 종료 후 리밸런싱)
레코드 처리 속도가 느리면 늘리는게 좋음

기본값 : 300000 (5분)
isolation.level 트랜잭션 메시지 처리 방식 설정
트랜잭션 프로듀서가 트랜잭션 단위로 데이터 보낼 경우 사용

기본값 : read_uncommitted

read_uncommitted : 커밋 전 메시지도 읽음
read_committed : 커밋 된 메시지만 읽음

스트림즈 DSL 옵션 설정

스트림즈 DSL 필수 옵션

bootstrap.servers 스트림즈가 카프카 클러스터에 연결할 때 사용할 브로커 호스트명:포트 목록 작성
2개 이상 작성 시 브로커 장애 대비 가능
application.id 스트림즈 애플리케이션 그룹 ID
로직이 다른 스트림즈 애플리케이션은 반드시 다른 ID 사용 필요

스트림즈 DSL 선택 옵션

default.key.serde 레코드의 메시지 키 직렬화/역직렬화 클래스 지정

기본값 : Serdes.ByteArray().getClass().getName()
default.value.serde 레코드의 메시지 값 직렬화/역직렬화 클래스 지정

기본값 : Serdes.ByteArray().getClass().getName()
num.stream.threads 스트림 프로세스 실행 시 스레드 수 지정
한 프로세스에서 병렬 처리할 태스크 수 늘려서 스케일 업 가능
병렬도는 입력 토픽의 파티션 수를 초과할 수 없음

기본값 : 1
state.dir 상태 기반 데이터 처리 시 내장 RocksDB에 상태 데이터를 저장할 디렉토리 지정
/tmp는 재부팅 시 초기화되는 휘발성 디렉토리이므로, 운영에서는 영속적인 디렉토리 사용 권장

기본값 : /tmp/kafka-streams