Java 카프카 프로듀서, 컨슈머 애플리케이션 개발 방법

Java 프로젝트 생성 (공통)

카프카 프로듀서, 컨슈머 애플리케이션은 Java 프로젝트로 개발할 수 있습니다.
Javascript, go, Python 등으로도 가능하지만, 공식 지원되는 라이브러리 언어는 Java입니다.

IntelliJ Java 프로젝트 생성 방법

File > New > Project… > 아래와 같이 입력 후 Create

Name 프로젝트명 입력
Language Java 1.8 이상 선택
Build system Gradle 선택
JDK 18 이상 선택
Gradle DSL Groovy 선택

프로젝트가 생성되면 src/main/java 폴더 안에 패키지 폴더를 만들어야 합니다.
패키지 폴더 내에 프로듀서 또는 컨슈머 애플리케이션 클래스를 작성하고 실행하면 됩니다.

카프카 클라이언트 라이브러리 추가

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.5.0'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

java 프로젝트 내 build.gradle 파일에 카프카 브로커와 동일한 버전의 클라이언트 라이브러리를 추가합니다.
카프카 클라이언트 로그 출력을 위해 slf4j 라이브러리 의존성도 추가합니다.


카프카 프로듀서 애플리케이션 개발

프로듀서 클래스 예시

public class TestProducer {
  private final static Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);
  private final static String TOPIC_NAME = "토픽명";
  private final static String BOOTSTRAP_SERVERS = "localhost:9092";

  public static void main(String[] args) {

    // 프로듀서 필수 옵션 설정
    Properties configs = new Properties();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); // 요청할 브로커 서버 정의
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 메시지 키 직렬화 클래스 정의
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 메시지 값 직렬화 클래스 정의

    // 프로듀서 생성
    KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

    // 메시지 값 프로듀서 레코드 생성
    ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC_NAME, "메시지 값");

    // 메시지 키-값 프로듀서 레코드 생성
    ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "메시지 키", "메시지 값");

    // 파티션 번호를 지정한 프로듀서 레코드 생성
    ProducerRecord<String, String> record3 = new ProducerRecord<>(TOPIC_NAME, 0, "메시지 키", "메시지 값");

    try {
      // 메시지 비동기 전송 (파티셔너 → Accumulator → 브로커 → 파티션에 로그 세그먼트 파일 저장)
      // Accumulator에서 배치로 묶이면 일정 크기 및 시간에 도달 시 전송
      producer.send(record1);
      producer.send(record2);

      // 메시지 동기 전송 (send 후 응답 get)
      RecordMetadata metadata = producer.send(record3).get();

      // 콜백 출력 결과 : 토픽명-파티션번호@오프셋번호
      // 오프셋번호가 -1이면, acks 옵션이 0이라 응답이 없는 것
      LOGGER.info(metadata.toString());

      // 메시지 비동기 전송 + 콜백 처리
      producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            // 전송 실패
          } else {
            // 전송 성공
          }
        }
      });

    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);

    } finally {
      // accumulator에 전송 대기 중인 메시지 전송
      producer.flush();
      
      // 프로듀서 종료 (리소스 해제)
      producer.close();
    }
  }
}

Java 라이브러리로 카프카 프로듀서를 생성하고, 브로커로 메시지를 보내는 예시입니다.

트랜잭션 프로듀서 예시

// 프로듀서별 고유 트랜잭션 ID 설정
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());

// 프로듀서 생성
Producer<String, String> producer = new KafkaProducer<>(configs);

// 프로듀서 트랜잭션 초기화
producer.initTransactions();

try {
  // 트랜잭션 시작
  producer.beginTransaction();

  // 레코드 전송
  producer.send(new ProducerRecord<>(TOPIC, "메시지 값1"));
  producer.send(new ProducerRecord<>(TOPIC, "메시지 값2"));

  // 레코드 트랜잭션 커밋
  producer.commitTransaction();

} catch (Exception e) {
  // 트랜잭션 롤백
  producer.abortTransaction();

} finally {
  // 프로듀서 종료
  producer.close();

}

레코드들을 트랜잭션으로 묶어서 전송하고 커밋하는 프로듀서 예시입니다.


카프카 컨슈머 애플리케이션 개발

자동 커밋 컨슈머 클래스 예시

public class TestConsumer {
  private final static Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);
  private final static String TOPIC_NAME = "토픽명";
  private final static String BOOTSTRAP_SERVERS = "localhost:9092";
  private final static String GROUP_ID = "컨슈머그룹ID";
  private static KafkaConsumer<String, String> consumer;

  public static void main(String[] args) {

    // 컨슈머 필수 옵션 설정
    Properties configs = new Properties();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 컨슈머 선택 옵션 설정
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 기본값 : true 이므로, 명시하지 않아도 자동 커밋 됨
    configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000); // 자동 커밋 주기 : 1분 (이후 poll 호출 시 자동 커밋)

    // 컨슈머 생성
    consumer = new KafkaConsumer<>(configs);

    // 런타임 종료 시 실행되는 스레드 셧다운 훅 등록
    Runtime.getRuntime().addShutdownHook(new ShutdownThread());

    // 토픽 구독
    consumer.subscribe(Arrays.asList(TOPIC_NAME));

    try {

      // 무한루프
      while (true) {
        // 최대 1초 대기하며 메시지 요청
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

        // 받은 메시지가 있으면 출력
        for (ConsumerRecord<String, String> record : records) {
          LOGGER.info("record:{}", record);
        }
      }

    } catch (WakeupException e) {
      LOGGER.error(e.getMessage(), e);

    } finally {
      // 컨슈머 종료 (컨슈머 그룹에서 제외)
      consumer.close();
    }
  }

  // 런타임 종료 시 실행되는 스레드 클래스 구현
  static class ShutdownThread extends Thread {
    public void run() {
      // 컨슈머 poll 진행 중단 (WakeupException 발생)
      // 런타임 스레드가 아닌 다른 스레드에서 호출해야 안전하게 종료 가능
      consumer.wakeup();
    }
  }
}

컨슈머 그룹이 토픽을 구독하고 전체 파티션에 대해서 메시지를 가져가는 예시입니다.
토픽의 파티션 수 : 컨슈머 그룹 내 컨슈머 수는 1:1로 운영하는 것이 좋습니다.
파티션은 1개의 컨슈머만 할당 가능하므로, 파티션 수보다 많은 컨슈머는 유휴 상태가 됩니다.

수동 커밋 컨슈머

자동 커밋 비활성화 옵션 설정

configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

수동 커밋은 컨슈머 생성 시 파라미터 configs에 자동 커밋 비활성화 옵션을 넣어야 합니다.

동기 오프셋 커밋 예시

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  for (ConsumerRecord<String, String> record : records) {
    LOGGER.info("record:{}", record);
  }

  // 동기 오프셋 커밋
  consumer.commitSync();
}

커밋 응답을 기다리는 동기 오프셋 커밋 예시입니다.
레코드 처리가 끝난 후 커밋해야 합니다.

레코드 단위 동기 오프셋 커밋 예시

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  // 파티션별 현재 오프셋 데이터 맵 생성
  Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

  for (ConsumerRecord<String, String> record : records) {
    LOGGER.info("record:{}", record);

    // 파티션별 현재 오프셋 데이터 맵 갱신
    currentOffset.put(
      new TopicPartition(record.topic(), record.partition()),
      new OffsetAndMetadata(record.offset() + 1, null)
    );

    // 레코드 단위 동기 오프셋 커밋
    // 이미 커밋된 파티션-오프셋은 무시
    consumer.commitSync(currentOffset);
  }
}

레코드 단위 커밋은 비효율적이고, 데이터가 많을수록 느려져서 잘 사용되지 않는 방식입니다.

비동기 오프셋 커밋 예시

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  for (ConsumerRecord<String, String> record : records) {
    LOGGER.info("record:{}", record);
  }

  // 비동기 오프셋 커밋
  consumer.commitAsync();
}

비동기 오프셋은 커밋 수행 시 응답을 기다리지 않으므로, 더 많은 데이터를 처리할 수 있습니다.

비동기 오프셋 커밋 콜백 예시

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  for (ConsumerRecord<String, String> record : records) {
    LOGGER.info("record:{}", record);
  }

  // 비동기 오프셋 커밋 후 완료 콜백 함수 실행
  consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
      if (e != null) {
        // 커밋 실패
        e.printStackTrace();
      } else {
        // 커밋 성공
        LOGGER.info("커밋 성공 오프셋:{}", offsets);
      }
    }
  });
}

파티션 수동 할당 컨슈머 예시

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

// 리밸런싱을 위한 컨슈머 그룹을 사용하지 않고, 컨슈머에 토픽 파티션 직접 할당
consumer.assign((Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER))));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

  for (ConsumerRecord<String, String> record : records) {
    LOGGER.info("record:{}", record);
  }
}

subscribe를 사용하지 않아서 GroupID 옵션 지정이 필수가 아닙니다.

트랜잭션 컨슈머 예시

// 트랜잭션 레벨 설정
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

// 컨슈머 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

트랜잭션 프로듀서가 커밋한 메시지만 읽는 컨슈머 설정 예시입니다.
기본값 read_uncommitted는 커밋 전 메시지도 읽습니다.


컨슈머 리밸런스 리스너 설정

리밸런스 리스너 정의

public class RebalanceListener implements ConsumerRebalanceListener {
  private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 파티션 해제 전 (리밸런싱 발생 전) 동작할 로직
    LOGGER.warn("Partitions are revoked : {}", partitions);
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 파티션 할당 후 (리밸런싱 발생 후) 동작할 로직
    LOGGER.warn("Partitions are assigned : {}", partitions);
  }
}

컨슈머에 파티션 할당/해제 시 동작할 로직을 정의합니다.

토픽 구독 시 리밸런스 리스너 전달

consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());

ConsumerRebalanceListener 인터페이스를 구현한 리밸런스 리스너 클래스를 파라미터로 넘기면 됩니다.

파티션 할당 로그 확인

[main] WARN com.example.RebalanceListener - Partitions are assigned : [토픽명-파티션번호]

컨슈머 애플리케이션 실행 시 로그에서 위와 같이 할당된 파티션을 확인할 수 있습니다.