
프로듀서가 전송한 데이터는 카프카 브로커의 토픽에 적재된다.
컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.
컨슈머는 데이터를 가져와서 데이터베이스에 저장하거나 다른 애플리케이션과 통신하는 역할을 한다.

Fetcher : 리더 파티션으로부터 레코드들을 미리 가져와서 대기poll() : Fetcher에 있는 레코드들을 리턴하는 레코드ConsumerRecords : 처리하고자 하는 레코드들의 모음. 각 레코드에는 오프셋이 포함카프카 클러스터의 리더 파티션이 컨슈머로 레코드를 보내면 Fetcher가 받는다.
데이터를 충분히 받고 나면 내부의 poll() 메소드를 통해 레코드 모음인 ConsumerReocrds라는 객체로 받는다.
이때 poll() 메소드를 사용하기 전에 이미 데이터를 가져온 상태이기 때문에 처리하고자 하는 양만큼 속도를 유지하여 처리할 수 있다.
프로듀서에서 브로커에 레코드를 저장할 때, 각 레코드에 오프셋이 저장된다.
컨슈머에서 레코드를 가져올 때도 이 오프셋 정보를 포함해서 가져오기 때문에 오프셋을 확인할 수 있다.
컨슈머에서 처리가 완료되면 커밋을 수행하여 어느 레코드까지 처리했는지 표시해놓을 수 있다.
컨슈머 그룹으로 운영하는 방법은 컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카만의 독특한 방식이다.
컨슈머 그룹은 특정 토픽에 대해서 목적에 따라 데이터를 처리하는 컨슈머들을 묶은 그룹이다.
같은 데이터일지라도 목적에 따라 다르게 처리하고 싶을 때 컨슈머 그룹을 나눈다.
동일한 컨슈머 그룹에 속하는 컨슈머 애플리케이션(쓰레드)은 동일한 로직을 가진다.
같은 토픽을 소비하는 컨슈머 그룹이 여러 개가 있다면, 각각의 컨슈머 그룹은 다른 로직을 가지고 있으며, 각 컨슈머 그룹 간은 철저히 격리된 환경에서 데이터를 처리한다.
컨슈머가 데이터를 가져간다고 해서 토픽의 데이터는 삭제되지 않는다.
그러므로 하나의 컨슈머 그룹이 데이터를 이미 가져갔다고 하더라도, 다른 컨슈머 그룹이 새로 데이터를 가져갈 수 있다.

컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.
컨슈머가 토픽을 구독(subscribe)하면 토픽의 전체 파티션에 대해서 적절히 컨슈머에 할당한다.
컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당된다.
그리고 1개 컨슈머는 여러 개의 파티션에 할당될 수 있다.
즉, 컨슈머는 1개 이상의 파티션을 할당할 수 있고, 파티션은 1개의 컨슈머에만 할당되어야 한다.
그러므로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 적어야 한다.
파티션 개수와 동일하게 컨슈머 개수를 지정하여 운영하는 것이 성능이 제일 좋다.
컨슈머 개수가 파티션 개수보다 많을 경우는 어떨까?
파티션은 최대 한개의 컨슈머에만 할당될 수 있기 때문에 할당되지 않은 나머지 컨슈머는 유휴(idle)상태가 된다.
파티션이 할당된 컨슈머는 지속적으로 데이터를 처리중이지만, 나머지 파티션은 파티션이 할당되어 있지 않기 때문에 좀비 쓰레드처럼 데이터를 받지도 처리하지도 않는 상태가 된다.
그러므로 파티션 개수보다 컨슈머 개수를 더 많이 띄울 필요는 없다.
컨슈머 그룹은 왜 사용하는 것일까?
예를 들어 운영 서버에는 리소스 상태를 판단하기 위해 주요 리소스인 CPU, 메모리 정보를 대용량으로 수집하는 리소스 수집 에이전트가 필요하고, 이를 적재하는 로직이 필요할 것이다.

데이터를 수집하고 난 후 SYNC 방식으로 엘라스틱서치에 저장하고 하둡에 저장하여 활용하는 경우를 알아보자.
운영서버가 1~10개 정도라면 주요 리소스에 대한 데이터양이 그렇게 많지 않지만, 그 이상의 서버를 갖추고 있다면 그 데이터양이 굉장히 많아진다.
에이전트가 리소스를 수집하고, 네트워크에 연동하여 엘라스틱서치에 저장하고, 또 네트워크에 연동하여 하둡에 저장하는 Sync 방식을 사용하면 문제가 생길 가능성이 있다.
예를 들어 엘라스틱서치에 장애가 발생한다면 그 리소스들을 엘라스틱서치에 쌓지 못한 채 하둡에 저장하거나, 엘라스틱서치에 데이터를 쌓기 위해 대기해야 하는 상황이 발생한다.

앞의 경우와 같은 강한 coupling을 끊기 위해 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저장소의 장애에 격리되어 운영할 수 있다.
일단은 프로듀서를 통해 데이터를 카프카 토픽으로 보내고, 그 데이터를 각각의 용도에 따라 다른 컨슈머 그룹으로 운영할 수 있다.
이렇게 되면 특정 이벤트(리소스 생성)가 발생에 대해 우선은 카프카에 다 저장을 하고, 각각의 목적을 가진 컨슈머 그룹이 데이터를 가져갈 수 있다.
이 경우 엘라스틱서치에 장애가 발생하더라도 하둡에 데이터를 적재하는 것에는 문제가 없다.
엘라스틱서치에 데이터를 적재하는 것 또한 다시 복구될 때 까지 기다리는 시간이 있기 때문에 데이터가 조금 늦게 적재될 뿐, 복구 완료되면 마지막으로 적재된 데이터부터 다시 적재를 시작하기 때문에 데이터 유실은 막을 수 있다.
또한 컨슈머 그룹마다 컨슈머 개수를 다르게 설정하여 상황에 따라 유연하게 운영할 수 있다는 장점이 있다.
데이터 처리량이 많을 경우 파티션 개수만큼 컨슈머 개수를 늘려 데이터 처리량을 늘릴 수 있다.
데이터 처리량이 적을 경우 컨슈머 개수를 줄일 수 있다는 장점이 있다.
또한 각각의 컨슈머 그룹이 격리되어 있다는 것 또한 컨슈머 그룹을 운영하는 것의 큰 장점이 된다.
MySql DB저장과 같이 또 다른 목적의 데이터 저장이 필요하더라도 컨슈머 그룹만 추가하여 운영하면 되기 때문에 기존의 파이프라인을 건들 필요 없이 새로운 파이프라인을 따기만 하면 된다.
이처럼 데이터 수집, 적재하는 파이프라인이 필요하다면 Coupling이 강한 환경보다는 일단 데이터를 토픽에 넣은 후, 목적에 따라 컨슈머 그룹을 만들어 적재하는 것이 유연하고 처리량을 극대화하는 방식이다.
카프카 컨슈머는 리밸런싱이라는 failover 방식이 있다.
failover는 장애 조치 기능으로 시스템 장애 이벤트 발생 시 예비 백업 시스템로 자동 전환되는 것을 의미하다.
즉, Fail(실패)를 Over(끝낸다) 는 의미로, 시스템 장애 시 준비되어있는 다른 시스템으로 대체되어 운영되는 것이다.

리밸런싱이란 파티션과 컨슈머의 할당을 변경하는 과정이다.
일부 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당되어 있던 파티션의 데이터를 지속적으로 처리해줘야 한다.
장애 발생으로 인해 해당 파티션의 데이터를 처리해주지 못하면 데이터 처리에 지연이 발생하게 되고, 지연이 지속된다면 retention 기간에 의해 데이터가 삭제되어 데이터 유실이 발생하게 될 것이다.
카프카는 이런 장애 발생 상황에서 장애가 발생한 컨슈머를 제외하고 나머지 컨슈머 중 하나에 파티션을 재할당하여 지속적으로 데이터를 처리하게 한다.
이런 파티션 재할당 과정을 리밸런싱이라고 한다.
리밸런싱은 컨슈머 제외, 컨슈머 추가의 두가지 상황에서 일어난다.

첫번째로 앞서 살펴본 바와 같이 이슈가 발생하여 컨슈머 그룹에서 컨슈머를 제외하는 경우가 있다.
이 경우 장애가 발생한 컨슈머에 매핑되어 있던 파티션을 나머지 컨슈머 중 하나에 재할당해준다.


두번째로 데이터 처리량 향상을 위해 컨슈머를 추가하는 경우가 있다.
이런 경우 다른 컨슈머에 할당되어 있던 파티션을 추가된 컨슈머에 재할당해준다.
장애가 발생하여 컨슈머 그룹에서 제외되었던 컨슈머가 복구되면 해당 컨슈머에 파티션을 재할당해주기도 한다.
리밸런싱을 통해 파티션을 컨슈머에 재할당하는 것은 내부적으로 컴퓨팅 과정을 거쳐서 처리하는 것이다.
파티션 개수가 적다면 그 시간이 짧지만, 파티션 개수가 100개 이상과 같이 많다면 그 시간이 길어질 수 있다.
그러므로 리밸런싱이 자주 발생하지 않도록 하는 것이 가장 좋지만, 리밸런싱은 컨슈머가 데이터를 처리하는 동안 언제든지 발생할 수 있으므로 RebalanceListener를 통해 리밸런싱에 대응하는 코드를 작성해둬야 한다.

컨슈머 애플리케이션에서 poll() 메소드를 통해 리더 파티션의 레코드를 소비한다.
그 후 컨슈머는 카프카 브로커로부터 어느 파티션의 어느 오프셋까지 가져갔는지 커밋(commit) 메소드를 통해 기록한다.
특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽인 __consumer_offsets에 기록된다.
레코드 처리 후 commit() 메소드를 호출하지 않거나 컨슈머 동작 이슈가 발생하여 __consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못한 경우 이미 처리한 레코드를 다시 가져갈 수 있으므로 데이터 처리의 중복이 발생할 수 있다.
그러므로 이런 데이터 처리 중복이 발생을 방지하기 위해서 컨슈머 애플리케이션이 오프셋 커밋을 정상 처리하였는지 검증할 필요가 있다.
컨슈머의 어사이너는 컨슈머와 파티션 할당 정책을 결정한다.
카프카 컨슈머 라이브러리는 RangeAssinor, RoundRobinAssignor, StickyAssignor를 제공한다.
카프카 2.5.0은 RangeAssinor가 기본값으로 설정된다.
RangeAssignor : 각 토픽에서 파티션을 숫자순, 컨슈머를 사전순으로 정렬하여 할당RoundRobinAssignor : 모든 파티션을 컨슈머에서 번갈아가면서 할당StickyAssignor : 최대한 파티션을 균등하게 배분하면서 할당파티션 개수만큼 컨슈머 개수를 늘려서 운영하는 것을 기본 원칙으로만 한다면, 파티션과 컨슈머는 1:1 매핑되어 할당되기 운영하기 때문에 무슨 어사이너를 사용할지는 그다지 중요하지 않다.
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
컨슈머 애플리케이션을 운영할 때 default 값이 없어서 필수로 지정해줘야 하는 옵션들이 있다.
만약 필수 옵션에 대한 정보를 넣어주지 않으면 컨슈머 애플리케이션을 실행할 수 없다.
프로듀서가 데이터를 어느 서버에서 가져올 지에 대한 설정이다.
프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
상용환경에서는 일반적으로 2개 이상의 브로커 정보를 작성하여 일부 브로커에 장애가 발생하더라도 접속하는 데 이슈가 없도록 설정해준다.
레코드의 메시지 키를 역직렬화하는 클래스를 지정한다.
레코드의 메시지 값을 역직렬화하는 클래스를 지정한다.
프로듀서에서는 메시지 키와 값을 직렬화(serialize)하여 토픽에 저장하므로, 토픽에 저장된 데이터를 컨슈머가 처리할 때는 역직렬화(deserialize) 해줘야 한다.
프로듀서에서 데이터를 String으로 직렬화하여 토픽에 저장하였다면, 컨슈머에서도 String으로 역직렬화 해줘야 데이터를 안전하게 사용할 수 있다.
만약 직렬화한 타입과 다른 타입으로 역직렬화하게 된다면 원하는 값이 나올 수 없다.
그러므로 데이터가 어떻게 직렬화 되어있는 지 아는 것이 중요하고, 프로듀서와 컨슈머 개발 시 직렬화, 역직렬화 방식에 대한 상호 약속이 되어 있어야 한다.
선택 옵션은 default 값이 있어서 필요할 경우에만 따로 지정해줄 수 있는 옵션이다.
기본값: null
컨슈머 그룹 아이디를 지정한다.
선택 옵션이긴 하지만 subscribe() 메소드로 토픽을 구독하여 사용할 때는 필수로 설정해줘야 하는 옵션이다.
assign() 메소드로 토픽을 할당하여 사용할 때는 선택옵션이다.
기본값: latest
컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다.
여기서 컨슈머 오프셋이 없는 경우란 한번도 커밋한 적이 없는 경우이다.
이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다.
latest : 가장 높은 (가장 최근에 넣은) 오프셋 부터 읽기 시작한다. 0~50번 오프셋이 있다면 50번 오프셋부터 읽는다.earliest : 가장 낮은 (가장 오래전에 넣은) 오프셋부터 읽기 시작한다. 0~50번 오프셋이 있다면 0번 오프셋부터 읽는다.none : 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 만약 커밋 기록이 없으면 오류를 반환하고, 커밋 기록이 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다.커밋 기록이 있다면 이 옵션이 무시되고, 커밋 기록이 없을 때만 이 옵션이 사용된다.
보통 새로 만든 컨슈머 그룹이 운영될 때 처음부터 읽을지(earliest), 마지막 오프셋부터 읽을지(latest)에 대한 설정으로 이용된다.
그 이후로는 커밋 기록이 있을 것이므로 이 옵션은 의미가 없게 된다.
기본값: true
자동 커밋 또는 수동 커밋 여부를 선택한다.
기본값: 500 (5초)
자동 커밋일 경우(enable.auto.commit=true) 오프셋 커밋 간격을 지정한다.
기본값: 500
poll() 메소드를 통해 반환되는 레코드 개수를 지정한다.
배치로 하여 한번에 더 많은 양의 데이터를 처리하고 싶다면 이 옵션값을 높게 설정하면 된다.
기본값: 10000 (10초)
컨슈머가 브로커와 연결이 끊기는 최대 시간이다.
기본값: 3000 (3초)
하트비트를 전송하는 시간 간격이다.
해당 시간 간격은 하트비트가 브로커에 전송되지 않으면 컨슈머에 문제가 있다고 판단하여 리밸런싱을 시작하는 기준이 된다.
session.timeout.ms와 heartbeat.interval.ms 옵션은 카프카 브로커와 컨슈머 사이에서 데이터가 정상적으로 처리되고 있는지 판단하는 데 사용한다.
heartbeat.interval.ms 옵션에 의해 하트비트가 3초마다 전송되는데, session.timeout.ms 설정과 같이 하트비트가 전송된 지 10초가 지나도 오지 않으면 문제가 있다고 판단하게 된다.
기본값: 300000 (5분)
poll() 메소드를 호출하는 간격의 최대 시간이다.
poll() 메소드 호출 후 데이터를 처리하게 되는데, 설정한 시간이 지나도 poll() 메소드를 다시 호출하지 않으면 문제가 있다고 판단하여 리밸런싱을 수행하게 된다.
컨슈머가 레코드를 처리하는 시간이 길 뿐 정상적으로 처리하고 있는데, 이 옵션으로 인해 불필요한 리밸런싱이 일어나는 것을 방지하려면 옵션값을 좀 더 길게 설정할 필요가 있다.
트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다.