


KStream
- 레코드의 흐름을 표현
Key + Value 구성
토픽에 존재하는 모든 레코드 가지고 있음
컨슈머로 토픽 구독하는 것과 동일

KTable
- Message Key를 기준으로 묶어서 사용

GlobalKTable
- KTable과 동일한 구조

=> 하지만 Co-Partitioning 을 보장할 수 없다! 파티션 개수가 다를 수 있고 전략이 다를 수 있다. 만약 되어있지 않다면 Topology Exception이 발생한다
고로 Repartitioning을 하여 Co-Partitioning되도록 해야한다
-> 새로운 토픽에 새로운 메세지키를 가지도록 재배열하는 과정

토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우만 필터 링하는 스트림즈 애플리케이션을 스트림 프로세서를 사용하여 만들 수 있다. 메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때는 filter 메서드를 사용하면 된다. filter 메서드는 스트림즈DSL에서 사용 가능한 필터링 스트림 프로세서

다음으로 KTable과 KStream을 함께 사용하는 경우에 대해 알아보자. KTable과 KStream 은 메시지 키를 기준으로 조인할 수 있다. 대부분의 데이터베이스는 정적으로 저장된 데이터 를 조인하여 사용했지만 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다. 예를 들어, 이름을 메시지 ヲI, 주소를 메시지 값으로 가지고 있는 KTable이 있고 이름을 메시지 키, 주문한 물품을 메시지 값으로 가지고 있는 KStream이 존재한다고 가정하자. 사용자가 물품을 주문하면 이미 토픽에 저장된 이름:주소로 구성된 KTable과 조인하여 물품과 주소가 조합된 데이터를 새로 생성할 수 있다. 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다는 장점이 있다. 이를통해 이벤트 기반 스트리밍 데이터 파 이프라인을 구성할 수 있는 것이다.


order 토픽과 address 토픽은 코파티셔닝되어 있으므로 각각 KStream과 KTable로 선언해 서 조인을 할수 있었다. 그러나 코파티셔닝되어 있지 않은 토픽을조인해야 할 때는 어떻게 해 야 할까? 코파티셔닝되지 않은 데이터를 조인하는 방법은 두 가지가 있다. 첫 번째는 리파티셔닝을 수행한 이후에 코파티셔닝 이 된 상태로 조인 처 리를 하는 것이고 두 번째는 KTable로 사 용하는 토픽을 GlobalKTable로 선언하여 사용하는 것이다.
여기서는 GlobalKTable로 토픽을 선언하여 사용한다. 파티션 개수가 다른 2개의 토픽 을 조인하는 예제를 GlobalKTable로 선언하여 작성해 볼 것인데, 파티션 2개로 이루어진 address_v2 토픽을 새로 생성한다. address_v2 토픽은 이전 예제에서 진행했던 address 토 픽과 동일하게 이름과 주소로 이루어진 레코드를 저장한다. 다만, 다른 점은 address_v2 토픽 은 파티션이 2개이고 KStream으로 사용하는 order는 파티션이 3개이기 때문에 코파티셔닝 되지 않은 상태라는 것이다.

Apache Kafka는 대용량 데이터 스트리밍을 위한 강력한 메시징 시스템으로, 데이터를 송수신하기 위해서는 프로듀서와 컨슈머 애플리케이션을 사용하는 것이 일반적입니다. 하지만 이러한 작업이 반복되면 비효율적일 수 있습니다. 이 문제를 해결하기 위해 카프카 커넥트(Kafka Connect)는 데이터 파이프라인을 쉽게 구성하고 관리할 수 있는 도구로, 데이터 흐름을 자동화하고 효율적으로 처리할 수 있게 도와줍니다.
카프카 커넥트는 커넥터(Connector)라는 템플릿을 이용하여 데이터를 쉽게 전송하고 처리할 수 있습니다. 이 커넥터는 소스 시스템에서 데이터를 읽어와 Kafka 토픽에 저장하거나, Kafka 토픽의 데이터를 타겟 시스템에 전달할 수 있는 중요한 역할을 합니다.
Kafka Connect는 두 가지 커넥터를 사용하여 데이터를 처리합니다:
예를 들어, 파일 소스 커넥터(File Source Connector)는 파일에서 데이터를 읽어와 Kafka 토픽에 저장할 수 있으며, 파일 싱크 커넥터(File Sink Connector)는 Kafka 토픽의 데이터를 파일로 저장할 수 있습니다. 이 외에도 MySQL, MongoDB, S3 등 다양한 저장소와 연동할 수 있는 커넥터들이 제공됩니다.
Kafka Connect는 데이터를 전송하기 전에 스키마를 변환하거나 데이터를 변환할 수 있는 기능을 제공합니다.
Cast, Drop, ExtractField 등이 있습니다.단일 모드는 단일 서버에서 하나의 프로세스로 실행되며, 주로 개발 환경이나 소규모 데이터 파이프라인에서 사용됩니다. 단일 프로세스로 실행되기 때문에 고가용성 구성은 지원되지 않으며, 단일 장애점(SPOF, Single Point of Failure)이 발생할 수 있습니다. 이 때문에 중요한 상용 환경에는 적합하지 않습니다.
서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션이다.
프로듀서와 컨슈머를 사용해서 직접 미러링하는 애플리케이션을 만 들면 되지만 굳이 미러메이커2를 사용하는 이유는 토픽의 모든 것을 복제할 필요성이 있기 때문이다.
특히 동일한 파티션에 동일한 레코드가 들어가도록 하는 작업은 복제하기 전 클러스터에서 사용하던 파티셔너에 대한 정보 없이는 불가능하다. 또한, 복제하 는 토픽의 파티션 개수가 달라진다면 복제된 데이터를 저장하는 토픽의 파티션도 개수가 달라 져야 하므로 어드민까지 조합한 형태로 개발이 필요하다. 이 모든 기능을 지원하는 것이 미러메이커 2이다.
connect-standalone.properties 파일 설정
이 파일은 카프카 커넥트의 기본 설정을 담고 있습니다. 여기서 Kafka 토픽, 오프셋 저장 위치, 커넥터 플러그인 경로 등을 설정할 수 있습니다. 해당 파일은 Kafka 설치 디렉토리의 config 폴더에 위치합니다.
커넥터 설정 파일 작성
실제로 데이터를 처리할 커넥터 설정 파일을 작성해야 합니다. 예를 들어, 파일 소스 커넥터를 사용하려면 connect-file-source.properties 파일을 작성하여 파일 경로와 Kafka 토픽 이름을 정의합니다.
분산 모드는 여러 서버에서 클러스터 형태로 실행되며, 주로 상용 환경과 대규모 데이터 파이프라인에서 사용됩니다. 다수의 서버로 구성되어 고가용성을 지원하고, 데이터 처리량이 증가하면 서버를 추가하여 스케일 아웃을 할 수 있습니다.
connect-distributed.properties 파일 설정
이 파일은 여러 서버에서 Kafka Connect를 실행하기 위한 클러스터 설정을 정의합니다. 설정 파일은 Kafka 설치 디렉토리의 config 폴더에 위치합니다.
REST API를 통한 관리
분산 모드는 REST API를 통해 상태를 조회하고 커넥터를 생성, 수정, 삭제할 수 있습니다. 기본적으로 REST API는 8083 포트를 사용하며, HTTP 메서드(GET, POST, DELETE, PUT)를 지원합니다.