Java 카프카 프로듀서, 컨슈머 애플리케이션 개발 방법
Java 프로젝트 생성 (공통)
카프카 프로듀서, 컨슈머 애플리케이션은 Java 프로젝트로 개발할 수 있습니다.
Javascript, go, Python 등으로도 가능하지만, 공식 지원되는 라이브러리 언어는 Java입니다.
IntelliJ Java 프로젝트 생성 방법
File > New > Project… > 아래와 같이 입력 후 Create
Name | 프로젝트명 입력 |
Language | Java 1.8 이상 선택 |
Build system | Gradle 선택 |
JDK | 18 이상 선택 |
Gradle DSL | Groovy 선택 |
프로젝트가 생성되면 src/main/java 폴더 안에 패키지 폴더를 만들어야 합니다.
패키지 폴더 내에 프로듀서 또는 컨슈머 애플리케이션 클래스를 작성하고 실행하면 됩니다.
카프카 클라이언트 라이브러리 추가
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
java 프로젝트 내 build.gradle 파일에 카프카 브로커와 동일한 버전의 클라이언트 라이브러리를 추가합니다.
카프카 클라이언트 로그 출력을 위해 slf4j 라이브러리 의존성도 추가합니다.
카프카 프로듀서 애플리케이션 개발
프로듀서 클래스 예시
public class TestProducer {
private final static Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);
private final static String TOPIC_NAME = "토픽명";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 프로듀서 필수 옵션 설정
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // 요청할 브로커 서버 정의
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 메시지 키 직렬화 클래스 정의
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 메시지 값 직렬화 클래스 정의
// 프로듀서 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
// 메시지 값 프로듀서 레코드 생성
ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC_NAME, "메시지 값");
// 메시지 키-값 프로듀서 레코드 생성
ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "메시지 키", "메시지 값");
// 파티션 번호를 지정한 프로듀서 레코드 생성
ProducerRecord<String, String> record3 = new ProducerRecord<>(TOPIC_NAME, 0, "메시지 키", "메시지 값");
try {
// 메시지 비동기 전송 (파티셔너 → Accumulator → 브로커 → 파티션에 로그 세그먼트 파일 저장)
// Accumulator에서 배치로 묶이면 일정 크기 및 시간에 도달 시 전송
producer.send(record1);
producer.send(record2);
// 메시지 동기 전송 (send 후 응답 get)
RecordMetadata metadata = producer.send(record3).get();
// 콜백 출력 결과 : 토픽명-파티션번호@오프셋번호
// 오프셋번호가 -1이면, acks 옵션이 0이라 응답이 없는 것
LOGGER.info(metadata.toString());
// 메시지 비동기 전송 + 콜백 처리
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 전송 실패
} else {
// 전송 성공
}
}
});
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
// accumulator에 전송 대기 중인 메시지 전송
producer.flush();
// 프로듀서 종료 (리소스 해제)
producer.close();
}
}
}
Java 라이브러리로 카프카 프로듀서를 생성하고, 브로커로 메시지를 보내는 예시입니다.
트랜잭션 프로듀서 예시
// 프로듀서별 고유 트랜잭션 ID 설정
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
// 프로듀서 생성
Producer<String, String> producer = new KafkaProducer<>(configs);
// 프로듀서 트랜잭션 초기화
producer.initTransactions();
try {
// 트랜잭션 시작
producer.beginTransaction();
// 레코드 전송
producer.send(new ProducerRecord<>(TOPIC, "메시지 값1"));
producer.send(new ProducerRecord<>(TOPIC, "메시지 값2"));
// 레코드 트랜잭션 커밋
producer.commitTransaction();
} catch (Exception e) {
// 트랜잭션 롤백
producer.abortTransaction();
} finally {
// 프로듀서 종료
producer.close();
}
레코드들을 트랜잭션으로 묶어서 전송하고 커밋하는 프로듀서 예시입니다.
카프카 컨슈머 애플리케이션 개발
자동 커밋 컨슈머 클래스 예시
public class TestConsumer {
private final static Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);
private final static String TOPIC_NAME = "토픽명";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "컨슈머그룹ID";
private static KafkaConsumer<String, String> consumer;
public static void main(String[] args) {
// 컨슈머 필수 옵션 설정
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 컨슈머 선택 옵션 설정
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 기본값 : true 이므로, 명시하지 않아도 자동 커밋 됨
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000); // 자동 커밋 주기 : 1분 (이후 poll 호출 시 자동 커밋)
// 컨슈머 생성
consumer = new KafkaConsumer<>(configs);
// 런타임 종료 시 실행되는 스레드 셧다운 훅 등록
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
// 토픽 구독
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
// 무한루프
while (true) {
// 최대 1초 대기하며 메시지 요청
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 받은 메시지가 있으면 출력
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
}
}
} catch (WakeupException e) {
LOGGER.error(e.getMessage(), e);
} finally {
// 컨슈머 종료 (컨슈머 그룹에서 제외)
consumer.close();
}
}
// 런타임 종료 시 실행되는 스레드 클래스 구현
static class ShutdownThread extends Thread {
public void run() {
// 컨슈머 poll 진행 중단 (WakeupException 발생)
// 런타임 스레드가 아닌 다른 스레드에서 호출해야 안전하게 종료 가능
consumer.wakeup();
}
}
}
컨슈머 그룹이 토픽을 구독하고 전체 파티션에 대해서 메시지를 가져가는 예시입니다.
토픽의 파티션 수 : 컨슈머 그룹 내 컨슈머 수는 1:1로 운영하는 것이 좋습니다.
파티션은 1개의 컨슈머만 할당 가능하므로, 파티션 수보다 많은 컨슈머는 유휴 상태가 됩니다.
수동 커밋 컨슈머
자동 커밋 비활성화 옵션 설정
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
수동 커밋은 컨슈머 생성 시 파라미터 configs에 자동 커밋 비활성화 옵션을 넣어야 합니다.
동기 오프셋 커밋 예시
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
}
// 동기 오프셋 커밋
consumer.commitSync();
}
커밋 응답을 기다리는 동기 오프셋 커밋 예시입니다.
레코드 처리가 끝난 후 커밋해야 합니다.
레코드 단위 동기 오프셋 커밋 예시
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 파티션별 현재 오프셋 데이터 맵 생성
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
// 파티션별 현재 오프셋 데이터 맵 갱신
currentOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)
);
// 레코드 단위 동기 오프셋 커밋
// 이미 커밋된 파티션-오프셋은 무시
consumer.commitSync(currentOffset);
}
}
레코드 단위 커밋은 비효율적이고, 데이터가 많을수록 느려져서 잘 사용되지 않는 방식입니다.
비동기 오프셋 커밋 예시
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
}
// 비동기 오프셋 커밋
consumer.commitAsync();
}
비동기 오프셋은 커밋 수행 시 응답을 기다리지 않으므로, 더 많은 데이터를 처리할 수 있습니다.
비동기 오프셋 커밋 콜백 예시
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
}
// 비동기 오프셋 커밋 후 완료 콜백 함수 실행
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
// 커밋 실패
e.printStackTrace();
} else {
// 커밋 성공
LOGGER.info("커밋 성공 오프셋:{}", offsets);
}
}
});
}
파티션 수동 할당 컨슈머 예시
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
// 리밸런싱을 위한 컨슈머 그룹을 사용하지 않고, 컨슈머에 토픽 파티션 직접 할당
consumer.assign((Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER))));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("record:{}", record);
}
}
subscribe를 사용하지 않아서 GroupID 옵션 지정이 필수가 아닙니다.
트랜잭션 컨슈머 예시
// 트랜잭션 레벨 설정
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 컨슈머 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
트랜잭션 프로듀서가 커밋한 메시지만 읽는 컨슈머 설정 예시입니다.
기본값 read_uncommitted는 커밋 전 메시지도 읽습니다.
컨슈머 리밸런스 리스너 설정
리밸런스 리스너 정의
public class RebalanceListener implements ConsumerRebalanceListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 파티션 해제 전 (리밸런싱 발생 전) 동작할 로직
LOGGER.warn("Partitions are revoked : {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 파티션 할당 후 (리밸런싱 발생 후) 동작할 로직
LOGGER.warn("Partitions are assigned : {}", partitions);
}
}
컨슈머에 파티션 할당/해제 시 동작할 로직을 정의합니다.
토픽 구독 시 리밸런스 리스너 전달
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
ConsumerRebalanceListener 인터페이스를 구현한 리밸런스 리스너 클래스를 파라미터로 넘기면 됩니다.
파티션 할당 로그 확인
[main] WARN com.example.RebalanceListener - Partitions are assigned : [토픽명-파티션번호]
컨슈머 애플리케이션 실행 시 로그에서 위와 같이 할당된 파티션을 확인할 수 있습니다.