아파치 카프카 설정 방법 / 브로커, 토픽, 프로듀서, 컨슈머, 스트림즈 옵션 정리
카프카 브로커 설정
카프카폴더/config/server.properties 파일에 브로커 실행 시 사용되는 설정들을 정의합니다.
브로커 ID 설정
broker.id=0
같은 클러스터 내에서 현재 실행할 브로커 고유 ID를 설정합니다.
브로커 리스너 설정
-- 브로커가 요청을 수신할 주소 설정 (내부 바인딩)
listeners=PLAINTEXT://localhost:9092
-- 클라이언트가 접속할 브로커 주소 설정 (외부 연결용)
advertised.listeners=PLAINTEXT://localhost:9092
클라이언트는 advertised.listeners에 설정된 브로커 주소:포트를 통해 접속합니다ㅈㄴㅈㄴ.
로그 디렉터리 설정
log.dirs=/로그저장폴더경로
프로듀서가 보낸 메시지 로그를 해당 디렉터리 경로에 파일로 저장합니다.
‘토픽명-파티션번호’ 하위 디렉터리를 자동 생성하여 분류합니다.
카프카 도커 옵션 예시
KAFKA_LOG_DIRS: '/tmp/kafka-logs'
도커에서는 docker-compose.yml 파일에 카프카 로그 세그먼트 저장 경로를 위와 같이 정의합니다.
토픽-파티션별 카프카 로그 파일 예시
00000000000000000000.log | 오프셋, 타임스탬프, 헤더, 메시지 키, 메시지 값 등 실제 메시지 데이터 저장 |
00000000000000000000.index | 오프셋과 로그 파일 내 위치를 매핑한 인덱스 파일 |
00000000000000000000.timeindex | 타임스탬프와 로그 파일 내 위치를 매핑한 인덱스 파일 |
leader-epoch-checkpoint | 리더 에폭 정보 저장 (리더 변경 기록 관리) |
partition.metadata | 파티션 설정 정보 저장 |
파일명이 00000000000000000010.log면, 오프셋 10번부터의 데이터가 저장된 로그 파일입니다.
프로듀서에서 받은 메시지에 브로커가 오프셋을 부여하고, 타임스탬프를 지정하여 저장합니다.
로그 파일 생성주기 설정
-- 로그 파일 (=세그먼트 파일) 최대 크기 지정
log.segment.bytes
또는
-- 세그먼트 신규 생성 후 다음 파일로 넘어가는 주기 (기본값 7일)
log.roll.ms(hosurs)
로그 파일 삭제주기 설정
-- 세그먼트 삭제 주기. 리텐션 기간 (기본값 7일, 일반적으로 3일)
log.retention.hours=168
또는
-- 파티션당 로그 최대 크기 도달 시 삭제 (기본값 -1)
retrntion.bytes
-- 삭제 대상 체크 주기 (기본값 5분)
log.retention.check.interval.ms
카프카 데이터는 세그먼트 단위로 삭제되며, 레코드 단위 삭제 및 수정은 불가합니다.
액티브 세그먼트
쓰기가 일어나고 있는 가장 마지막 세그먼트 파일을 액티브 세그먼트라고 합니다.
액티브 세그먼트는 브로커의 삭제 대상이 되지 않습니다.
일반 세그먼트는 retention 옵션 기간이 지나면 삭제될 수 있습니다.
클린업 정책 설정
cleanup.policy=compact
delete | 일정 시간 경과 또는 기준 크기 초과 시 세그먼트 전체를 삭제합니다. |
compact | 액티브 세그먼트에서 각 메시지 키의 가장 최신 레코드만 남기고, 오래된 레코드들은 삭제하여 압축합니다. |
compact 정책 로그 상태 분류
클린 로그 (테일 영역) | 압축이 완료된 로그 (중복 메시지 키 없음) |
더티 로그 (헤드 영역) | 압축 정책 실행 전 로그 (중복 메시지 키 있음) |
압축 시작 시점 설정
min.cleanable.dirty.ratio
클린 레코드에 비한 더티 레코드 비율에 따라 압축 실행합니다.
0.9 | 압축 효과가 좋으나, 더티 레코드 0.9 비율이 될 때까지 용량 효율이 좋지 않습니다. |
0.1 | 압축이 자주 일어나서 브로커에 부담이 되고, 최신 데이터만 유지할 수 있습니다. |
파티션 수 설정
num.partitions=3
카프카 토픽 생성 시 기본적으로 만들 파티션 수를 정의합니다.
토픽 생성 시 파티션 수를 지정하지 않으면, 이 브로커 설정으로 생성됩니다.
주키퍼 연결 설정
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
브로커에서 연결할 주키퍼 서버주소:포트, 연결 타임아웃 시간을 설정합니다.
카프카 3.0 이하 버전에서는 브로커 실행 전 주키퍼를 필수로 실행해야 합니다.
리더파티션 자동 분배 활성화
auto.leader.rebalance.enable=true
리더파티션이 특정 브로커에 몰린 경우, 자동으로 리더파티션을 재분배하도록 설정합니다.
신규 토픽 자동 생성 비활성화
auto.create.topics.enable=false
프로듀서, 컨슈머가 존재하지 않는 토픽 요청 시 자동으로 토픽이 생성되지 않도록 방지합니다.
불필요한 토픽 생성을 방지하며, 토픽을 CLI로 수동 생성해야 합니다.
카프카 토픽 설정
토픽 단위로 설정할 수 있는 옵션들입니다.
로그 파일 삭제주기 설정
-- 세그먼트 삭제 주기. 리텐션 기간 (기본값 7일, 일반적으로 3일)
retention.ms
또는
retention.minutes
또는
retention.hours
브로커 기본 설정 (log.retention.hours 등) 보다 우선 적용되는, 토픽별 설정 옵션입니다.
파티션 복제 수 설정
replication.factor
토픽 생성 시 파티션 복제 수는 최소 1 (복제 없음) 부터 최대 브로커 개수까지 설정 가능합니다.
운영 브로커가 5개면, 2~3으로 설정하는 것이 일반적입니다.
각 파티션을 복제하여 디스크 용량을 차지하는 대신, 시스템 안정성과 가용성을 높일 수 있습니다.
파티션 복제 수 설정 시 참고
일부 데이터가 유실되어도 되는 GPS 데이터는 복제하지 않고 운영하기도 합니다.
데이터가 유실되면 안 되는 금융 데이터는 복제 수를 3으로 설정하여 운영하기도 합니다.
ISR 최소 개수 설정
min.insync.replicas
프로듀서 데이터 전송이 성공하려면 최소 몇 개의 복제본이 동기화 상태여야 하는지 정의합니다.
프로듀서 설정이 acks=all이고 min.insync.replicas=2인 경우,
ISR이 1개로 줄어들면 해당 토픽에 프로듀서 메시지 전송이 실패합니다.
파티션 리더 승급 설정
-- 데이터 유실을 감수하고, 지속적으로 데이터 처리
-- 복제가 안된 (ISR이 아닌) 팔로워 파티션을 리더로 승급하는 옵션
unclean.leader.election.enable=true
-- 데이터 유실을 감수하지 않고, 리더 파티션이 복구될 때까지 중단
-- 복제가 안된 (ISR이 아닌) 팔로워 파티션을 리더로 승급하지 않는 옵션
unclean.leader.election.enable=false
메시지 타임스탬프 설정
message.timestamp.type
타임스탬프 기본값은 프로듀서 레코드 생성 시간이고, 브로커 적재 시간으로 변경 가능합니다.
프로듀서 옵션 설정
프로듀서 필수 옵션
bootstrap.servers |
프로듀서가 레코드를 전송할 카프카 클러스터 내 브로커 호스트명:포트 목록 작성 2개 이상 작성 시 브로커 장애 대비 가능 |
key.serializer |
메시지 키를 직렬화 할 클래스 지정 String Serializer로 통일하면 컨슈머 역직렬화 클래스 설정 및 디버깅 시 용이 |
value.serializer | 메시지 값을 직렬화 할 클래스 지정 |
프로듀서 선택 옵션
acks ★ |
프로듀서가 보낸 데이터에 대해 브로커로부터 성공 응답을 받을 조건 설정 복제 강도를 높여서 성능 낮추고 신뢰성 높게 저장할지 여부 복제 수가 많을수록 차이가 큼 기본값 : 1 0 : 데이터 전송 후 응답 확인 X (가장 빠르지만 브로커 장애 시 데이터 유실 위험 큼) 1 : 리더 브로커에 레코드 저장 시 성공 응답 all 또는 -1 : 리더 + ISR에 포함된 팔로워들에 레코드 저장 시 성공 응답 (가장 느리고 신뢰성 높음) |
linger.ms |
Accumulator에 모은 배치 데이터 전송 전 기다리는 최소 시간 기본값 : 0 (Send 시 즉시 전송) |
retries |
브로커 전송 실패 시 재전송 시도 횟수 기본값 : Integer.MAX_VALUE (2147483647) |
max.in.flight.requests.per.connection |
한 커넥션에서 동시 전송할 수 있는 최대 요청 수 기본값 : 5 (순서 보장 필요 시 1로 설정) |
partitioner.class |
레코드를 어느 파티션에 보낼지 결정하는 파티셔너 클래스 지정 기본값 : DefaultPartitioner (카프카 2.4 이상 : UniformStickyPartitioner) |
enable.idempotence |
네트워크 오류, 재시도 시에도 중복 없이 한 번만 레코드를 적재하는 멱등성 보장 여부 true 설정 시 acks=all 등으로 자동 변경되어 처리 속도가 느려질 수 있음 멱등성 프로듀서로 동작할 필요가 있는 경우 true 설정 각 레코드에 프로듀서 ID, 파티션별 시퀀스 번호를 부여하여 중복 전송 방지 및 안정성 향상 카프카 3.0 이상 기본값 : true |
transactional.id |
레코드를 트랙잭션 단위로 묶어서 전송 여부 기본값 : null (트랜잭션 프로듀서가 아닌, 기본 프로듀서로 동작) |
metadata.max.age.ms |
클러스터 메타데이터 강제 갱신 요청 주기 기본값 : 5분 카프카 프로듀서는 초기 메시지 전송 전 또는 메시지 전송 중 오류가 발생했을 때 클러스터에 메타데이터를 요청하고 응답 받아서 캐시에 저장합니다. 메타데이터로 각 토픽의 리더 파티션이 어떤 브로커에 있는지 알 수 있습니다. 리더 파티션이 있는 브로커와 메시지를 통신하기 위해 프로듀서, 컨슈머는 메타데이터 갱신이 주기적으로 필요합니다. 컨슈머에서 잘못된 브로커로 데이터 요청 시에는 LEADER_NOT_AVAILABLE 익셉션이 발생합니다. |
metadata.max.idle.ms |
프로듀서 유휴 상태 시 메타데이터 캐시 유지 주기 기본값 : 5분 |
카프카 2.5.0 버전 기준 기본값이며, 사용하는 클라이언트 라이브러리 버전에 따라 기본값이 다를 수 있습니다.
컨슈머 옵션 설정
컨슈머 필수 옵션
bootstrap.servers |
컨슈머가 카프카 클러스터에 연결할 때 사용할 브로커 호스트명:포트 목록 작성 2개 이상 작성 시 브로커 장애 대비 가능 |
key.deserializer |
토픽에서 읽은 메시지 키를 역직렬화 할 클래스 지정 프로듀서에서 직렬화 한 클래스와 동일하게 설정 |
value.deserializer | 토픽에서 읽은 메시지 값을 역직렬화 할 클래스 지정 |
컨슈머 선택 옵션
group.id | 컨슈머가 속할 컨슈머 그룹 ID subscribe()로 토픽 구독 시 필수 값 |
auto.offset.reset ★ |
컨슈머 그룹이 처음 시작되어 컨슈머 오프셋이 없는 경우, 어느 오프셋부터 읽을지 설정 기본값 : latest latest : 가장 최근 오프셋부터 읽기 시작 earliest : 가장 예전 오프셋부터 읽기 시작 none : 커밋이 이후 오프셋부터 읽기 시작, 커밋 기록이 없으면 오류 반환 |
enable.auto.commit |
자동 커밋 여부 기본값 : true |
auto.commit.interval.ms |
자동 오프셋 커밋 주기 기본값 : 5000 (5초) |
max.poll.records |
poll() 시 반환되는 레코드 개수 기본값 : 500 |
session.timeout.ms |
컨슈머-브로커 연결 타임아웃 시간 설정한 시간 내 브로커에 컨슈머의 하트비트가 오지 않으면 리밸런싱 발생 기본값 : 10000 (10초) |
heartbeat.interval.ms |
컨슈머가 브로커에게 하트비트 전송 주기 기본값 : 3000 (3초) |
max.poll.interval.ms |
poll() 호출 간격의 최대 허용 시간 리밸런싱 판단 기준 (시간 내 poll 호출이 없으면 컨슈머 세션 종료 후 리밸런싱) 레코드 처리 속도가 느리면 늘리는게 좋음 기본값 : 300000 (5분) |
isolation.level |
트랜잭션 메시지 처리 방식 설정 트랜잭션 프로듀서가 트랜잭션 단위로 데이터 보낼 경우 사용 기본값 : read_uncommitted read_uncommitted : 커밋 전 메시지도 읽음 read_committed : 커밋 된 메시지만 읽음 |
스트림즈 DSL 옵션 설정
스트림즈 DSL 필수 옵션
bootstrap.servers |
스트림즈가 카프카 클러스터에 연결할 때 사용할 브로커 호스트명:포트 목록 작성 2개 이상 작성 시 브로커 장애 대비 가능 |
application.id |
스트림즈 애플리케이션 그룹 ID 로직이 다른 스트림즈 애플리케이션은 반드시 다른 ID 사용 필요 |
스트림즈 DSL 선택 옵션
default.key.serde |
레코드의 메시지 키 직렬화/역직렬화 클래스 지정 기본값 : Serdes.ByteArray().getClass().getName() |
default.value.serde |
레코드의 메시지 값 직렬화/역직렬화 클래스 지정 기본값 : Serdes.ByteArray().getClass().getName() |
num.stream.threads |
스트림 프로세스 실행 시 스레드 수 지정 한 프로세스에서 병렬 처리할 태스크 수 늘려서 스케일 업 가능 병렬도는 입력 토픽의 파티션 수를 초과할 수 없음 기본값 : 1 |
state.dir |
상태 기반 데이터 처리 시 내장 RocksDB에 상태 데이터를 저장할 디렉토리 지정 /tmp는 재부팅 시 초기화되는 휘발성 디렉토리이므로, 운영에서는 영속적인 디렉토리 사용 권장 기본값 : /tmp/kafka-streams |