카프카 커넥트 개념 정리 / Java 카프카 커넥트 개발 방법

카프카 커넥트 개념 정리

카프카 커넥트를 이용하면 외부 시스템과 카프카 토픽 간
반복적인 데이터 수집 및 전송 파이프라인을 자동화할 수 있습니다.
멀티 스레드로 실행되어 병렬 처리를 지원합니다.
데이터 수집량, 처리량, 오류율 등은 JMX를 통해 모니터링 할 수 있습니다.

커넥트 내부 커넥터는 태스크들을 설정 및 관리하고, 태스크는 데이터 처리를 합니다.

MySQL, MongoDB, S3, Elasticsearch 등 외부 시스템용 오픈소스 커넥터들이 많습니다.
Java 커넥트 라이브러리를 이용해 커스텀 커넥트 jar를 개발해야 할 수도 있습니다.
커넥터 플러그인 jar 파일을 플러그인 디렉토리에 추가하여 사용하면 됩니다.

커넥터 내부 기능

컨버터 바이트 배열 메시지를 외부 시스템 포맷으로 직렬화/역직렬화 시 사용
트랜스폼 메시지 단위 전처리/후처리 시 사용 (JSON 메시지 특정 키 삭제, 값 변경 등)

카프카 커넥트 운영

단일 모드 커넥트

커넥트를 단일 JVM 프로세스로 실행하는 방식입니다.
주로 개발 환경이나 중요도가 낮은 파이프라인 운영 시 사용합니다.

단일 모드 커넥트 설정

-- 카프카 브로커 주소 목록 설정
bootstrap.servers=localhost:9092

-- 메시지 카/값 직렬화/역직렬화 컨버터 클래스 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

-- 메시지 키/값 스키마 정보 포함 여부 (스키마 정보 없이 메시지 처리)
key.converter.schemas.enable=false
value.converter.schemas.enable=false

-- 오프셋 파일 저장 경로
offset.storage.file.filename=/tmp/connect.offsets

-- 오프셋 정보 디스크 기록 주기 (10초)
offset.flush.interval.ms=10000

-- 커넥터 플러그인 JAR 파일 경로
plugin.path=/usr/local/share/kafka/plugins

단일 모드 커넥트 설정 파일 (connect-standalone.properties) 수정 예시입니다.

단일 모드 커넥터 설정

-- 커넥터명 설정
name=local-file-source

-- 사용할 커넥터 클래스 (파일 읽는 Source 커넥터)
connector.class=FileStreamSource

-- 동시 실행할 태스크 수 설정 (1 = 단일 스레드)
-- 싱크 커넥터 태스크 수는 토픽 파티션 수와 같게 설정하는 것이 좋음
tasks.max=1

-- 읽을 파일 경로
file=/tmp/test.txt

-- 커넥터가 파일 데이터를 전송할 카프카 토픽 이름
topic=토픽명

단일 모드 커넥터 설정 파일 (connect-file-source.properties) 수정 예시입니다.

단일 모드 커넥트 실행 명령어

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

단일 모드 커넥트 워커 실행 시 커넥트 설정파일, 커넥터 설정파일들을 파라미터로 넣으면 됩니다.

분산 모드 커넥트

2대 이상의 서버에서 같은 그룹 ID의 여러 커넥트 프로세스가 하나의 클러스터로 동작하는 방식입니다.
데이터 양에 따라 커넥트 프로세스 무중단 스케일 아웃, 스케일 인이 가능합니다.
커넥터 내 스레드(태스크) 개수도 REST API로 조절 가능합니다.

분산 모드 커넥트 설정

-- 카프카 브로커 주소 목록 설정
bootstrap.servers=localhost:9092

-- 메시지 카/값 직렬화/역직렬화 컨버터 클래스 설정
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

-- 메시지 키/값 스키마 정보 포함 여부 (스키마 정보 없이 메시지 처리)
key.converter.schemas.enable=false
value.converter.schemas.enable=false

-- 오프셋 정보 디스크 기록 주기 (10초)
offset.flush.interval.ms=10000

-- 커넥터 플러그인 JAR 파일 경로
plugin.path=/usr/local/share/java/usr/local/share/kafka/plugins

-- 커넥트 클러스터 그룹 ID
group.id=connect-cluster

-- 커넥트가 저장할 오프셋/정보/상태 내부 토픽명 (그룹 ID마다 별도 생성 필요)
-- 브로커 토픽에 모든 상태를 저장하여 장애 복구 가능
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-config
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

분산 모드 커넥트 설정 파일 (connect-distributed.properties) 수정 예시입니다.

분산 모드 커넥트 실행 명령어

bin/connect-distributed.sh config/connect-distributed.properties

분산 모드 커넥트 워커 실행 시 커넥트 설정파일을 파라미터로 넣으면 됩니다.
커넥터 인스턴스는 REST API 요청으로 생성 시 실행됩니다.

커넥트 REST API 사용법

GET / 커넥트 클러스터 정보 확인
GET /connectors 현재 실행중 커넥터 목록 확인
POST /connectors 새 커넥터 생성 및 실행
GET /connectors/커넥터명 특정 커넥터 상세 정보 확인
GET /connectors/커넥터명/config 커넥터 설정 확인
PUT /connectors/커넥터명/config 커넥터 설정 변경
GET /connectors/커넥터명/status 커넥터 상태 확인
DELETE /connectors/커넥터명 커넥터 종료
POST /connectors/커넥터명/restart 커넥터 재시작
GET /connector-plugins 현재 사용할 수 있는 카프카 커넥터 플러그인 목록 조회

싱크 커넥터 생성 요청 예시

curl -X POST http://localhost:8083/connectors
-H 'Content-Type: application/json'
-d '{
  "name": "file-sink-test",
  "config":
  {
    "topics":"test",
    "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
    "tasks.max":1,
    "file":"/파일경로/파일명.확장자"
  }
}'

싱크 커넥터 태스크 수를 파티션 수와 같게 설정하면 병렬 처리가 최대화되어 가장 빠릅니다.
데이터 처리가 느리면 태스크 수, 파티션 수를 늘려서 처리량 확장이 가능합니다.

커넥터 상태 확인

curl http://localhost:8083/connectors/커넥터명/status

아래와 같이 커넥터 실행 상태 및 주소, 태스크 실행 상태 및 주소를 확인할 수 있습니다.

{
  "name":"file-sink-test",
  "connector": {
    "state":"RUNNING",
    "worker_id":"127.0.0.1:8083"
  },
  "tasks": [
    {
      "id":0,
      "state":"RUNNING",
      "worker_id":"127.0.0.1:8083"
    }
  ],
  "type":"sink"
}

커스텀 소스 커넥터 개발 예시

디펜던시 추가

dependencies {
  implementation 'org.apache.kafka:connect-api:2.5.0'
}

소스 커넥터 개발에 필요한 라이브러리를 추가합니다.

파일 소스 커넥터 설정 정의

public class SingleFileSourceConnectorConfig extends AbstractConfig {

  // 읽을 파일 정보
  public static final String DIR_FILE_NAME = "file";
  private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";  // file 옵션 미지정 시 사용할 기본 파일 경로
  private static final String DIR_FILE_NAME_DOC = "파일경로/파일명.확장자"; // 문서화용 파일 설명 문구

  // 전송할 토픽 정보
  public static final String TOPIC_NAME = "topic";
  private static final String TOPIC_NAME_DEFAULT_VALUE = "test-topic"; // topic 옵션 미지정 시 사용할 기본 토픽명
  private static final String TOPIC_NAME_DOC = "토픽명";

  // 파일 소스 커넥터 설정 기본값 정의
  public static final ConfigDef CONFIG = new ConfigDef()
      .define(
        DIR_FILE_NAME,
        ConfigDef.Type.STRING,
        DIR_FILE_NAME_DEFAULT_VALUE,
        ConfigDef.Importance.HIGH,
        DIR_FILE_NAME_DOC
      )
      .define(
        TOPIC_NAME,
        ConfigDef.Type.STRING,
        TOPIC_NAME_DEFAULT_VALUE,
        ConfigDef.Importance.HIGH,
        TOPIC_NAME_DOC
      );

  // 파일 소스 커넥터 설정 참조
  public SingleFileSourceConnectorConfig(Map<String, String> props) {
    super(CONFIG, props);
  }
}

파일 소스 커넥터에서 사용할 모든 설정을 정의합니다.

파일 소스 커넥터 개발

public class SingleFileSourceConnector extends SourceConnector {
  private Map<String, String> configProperties;

  @Override
  public String version() {
    // 커넥터 버전 정의
    return "1.0";
  }

  @Override
  public void start(Map<String, String> props) {
    this.configProperties = props;

    try {
      // 파일 소스 커넥터 설정 정의
      new SingleFileSourceConnectorConfig(props);

    } catch (ConfigException e) {
      // 설정 누락 시 에러 발생
      throw new ConnectException(e.getMessage(), e);
    }
  }

  @Override
  public Class<? extends Task> taskClass() {
    // 파일 소스 태스크 클래스 리턴
    return SingleFileSourceTask.class;
  }

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    // 태스크 설정 리스트에 파일 소스 커넥터 설정값 추가
    // 파일 읽기 시에는 태스크 1개가 일반적이어서, singletonList 생성
    return Collections.singletonList(new HashMap<>(configProperties));
  }

  @Override
  public ConfigDef config() {
    // 파일 소스 커넥터 설정 정의 반환
    return SingleFileSourceConnectorConfig.CONFIG;
  }

  @Override
  public void stop() {
    // 커넥터 종료 시 리소스 정리
  }
}

소스 태스크 실행 전 커넥터 설정파일 초기화, 태스크 클래스 정의하는 소스 커넥터 예시입니다.

파일 소스 태스크 개발

public class SingleFileSourceTask extends SourceTask {
  private static final Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);

  private static final String FILENAME_FIELD = "파일명";
  private static final String POSITION_FIELD = "포지션명";

  private Map<String, String> fileNamePartition;
  private Map<String, Object> offset;
  private String topic;
  private String file;
  private long position = 0L;

  @Override
  public void start(Map<String, String> props) {
    try {
      // 파일 소스 커넥터 설정
      SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props);
      topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME); // 토픽명 가져오기
      file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME); // 파일명 가져오기

      // 파티션 구분 기준 설정
      fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);

      // 소스 커넥터로 저장된 파일 내부 번호 기록용 오프셋 가져오기
      offset = context.offsetStorageReader().offset(fileNamePartition);

      if (offset != null) {
        Object lastReadFileOffset = offset.get(POSITION_FIELD);

        if (lastReadFileOffset != null) {
          // 마지막으로 읽은 오프셋 번호 세팅
          position = (Long) lastReadFileOffset;
        }
      } else {
        // 저장된 오프셋이 없는 경우, 0 세팅
        position = 0;
      }
    }
  }

  @Override
  public List<SourceRecord> poll() { // 외부 시스템에서 새로운 데이터를 가져오기 위해 주기적으로 호출되는 메서드
    List<SourceRecord> results = new ArrayList<>();

    try {
      // 1초마다 읽기
      Thread.sleep(1000);

      // 오프셋 라인 가져오기
      List<String> lines = getLines(position);

      // 읽을 라인이 있다면
      if (!lines.isEmpty()) {
        lines.forEach(line -> {
          // 오프셋 +1 라인 읽어서 레코드 생성 후 추가
          Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FIELD, ++position);
          SourceRecord sourceRecord = new SourceRecord(
            fileNamePartition,
            sourceOffset,
            topic,
            Schema.STRING_SCHEMA,
            null,
            Schema.STRING_SCHEMA,
            line
          );
          results.add(sourceRecord);
        });
      }

      // 커넥트 내부에서 토픽으로 send 될 List 반환
      return results;

    } catch (Exception e) {
      logger.error(e.getMessage(), e);
      throw new ConnectException(e.getMessage());
    }
  }

  @Override
  public void stop() {
    // 태스크 종료 시 리소스 정리
  }
}

파일에서 데이터를 가져와서 카프카 토픽으로 전송하는 소스 태스크 개발 예시입니다.
커넥터당 1개의 태스크를 실행하는 것이 일반적이고, 퍼일 읽기는 병렬처리 하지 않습니다.

커넥트에 추가할 커넥터 jar 빌드

jar {
  from {
    configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
  }
}

런타임에 필요한 모든 클래스, 의존성을 포함해 빌드하는 우버 jar 스크립트 작성 후 빌드합니다.
빌드하여 jar 파일 생성 후 커넥트 플러그인 디렉토리에 추가하고 사용하면 됩니다.


커스텀 싱크 커넥터 개발 예시

디펜던시 추가

dependencies {
  implementation 'org.apache.kafka:connect-api:2.5.0'
}

싱크 커넥터 개발에 필요한 라이브러리를 추가합니다.

파일 싱크 커넥터 설정 정의

public class SingleFileSinkConnectorConfig extends AbstractConfig {

  public static final String DIR_FILE_NAME = "file";
  private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt"; // file 옵션 미지정 시 사용할 기본 파일 경로
  private static final String DIR_FILE_NAME_DOC = "파일경로/파일명.확장자"; // 문서화용 파일 설명 문구

  public static ConfigDef CONFIG = new ConfigDef()
    .define(
      DIR_FILE_NAME,
      Type.STRING,
      DIR_FILE_NAME_DEFAULT_VALUE,
      Importance.HIGH,
      DIR_FILE_NAME_DOC
    );

  public SingleFileSinkConnectorConfig(Map<String, String> props) {
      super(CONFIG, props);
  }
}

파일 싱크 커넥터에서 사용할 모든 설정을 정의합니다.
토픽 정보는 싱크 커넥터 실행 시 파라미터로 받아야 합니다.

파일 싱크 커넥터 개발

public class SingleFileSinkConnector extends SinkConnector {

  private Map<String, String> configProperties;

  @Override
  public String version() {
    return "1.0";
  }

  @Override
  public void start(Map<String, String> props) {
    this.configProperties = props;
    try {
      // 파일 싱크 커넥터 설정 정의
      new SingleFileSinkConnectorConfig(props);
    } catch (ConfigException e) {

      // 설정 누락 시 에러 발생
      throw new ConnectException(e.getMessage(), e);
    }
  }

  @Override
  public Class<? extends Task> taskClass() {
    // 파일 싱크 태스크 클래스 리턴
    return SingleFileSinkTask.class;
  }

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> taskConfigs = new ArrayList<>();
    Map<String, String> taskProps = new HashMap<>();
    taskProps.putAll(configProperties);

    // 태스크 설정 리스트에 파일 싱크 커넥터 설정값 추가
    for (int i = 0; i < maxTasks; i++) {
      taskConfigs.add(taskProps);
    }

    return taskConfigs;
  }

  @Override
  public ConfigDef config() {
    // 파일 싱크 커넥터 설정 정의 반환
    return SingleFileSinkConnectorConfig.CONFIG;
  }

  @Override
  public void stop() {
    // 커넥터 종료 시 리소스 정리
  }
}

싱크 태스크 실행 전 커넥터 설정파일 초기화, 태스크 클래스 정의하는 싱크 커넥터 예시입니다.

파일 싱크 태스크 개발

public class SingleFileSinkTask extends SinkTask {
  private SingleFileSinkConnectorConfig config;
  private File file;
  private FileWriter fileWriter;

  @Override
  public String version() {
    return "1.0";
  }

  @Override
  public void start(Map<String, String> props) {
    try {
      // 파일 싱크 커넥터 설정
      config = new SingleFileSinkConnectorConfig(props);

      // 파일 쓰기 객체 초기화
      file = new File(config.getString(config.DIR_FILE_NAME));
      fileWriter = new FileWriter(file, true);

    } catch (Exception e) {
      throw new ConnectException(e.getMessage(), e);
    }
  }

  @Override
  public void put(Collection<SinkRecord> records) { // 토픽에서 읽은 새로운 데이터를 외부 시스템에 전송하기 위해 호출되는 메서드
    try {
      // 토픽 레코드 데이터를 파일에 쓰기 (실시간)
      for (SinkRecord record : records) {
        fileWriter.write(record.value().toString() + "\n");
      }
    } catch (IOException e) {
      throw new ConnectException(e.getMessage(), e);
    }
  }

  @Override
  public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { // 커밋 시점마다 호출되는 메서드
    try {
      // 버퍼에 전송 대기 중인 데이터 쓰기
      fileWriter.flush();

    } catch (IOException e) {
      throw new ConnectException(e.getMessage(), e);
    }
  }

  @Override
  public void stop() {
    try {
      // 태스크 종료 시 리소스 정리
      fileWriter.close();
    } catch (IOException e) {
      throw new ConnectException(e.getMessage(), e);
    }
  }
}

카프카 토픽에서 데이터를 가져와서 파일에 쓰는 싱크 태스크 개발 예시입니다.