프로듀서가 메시지를 생산해서 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와서 소비하는 역할을 하는 애플리케이션, 서버 등을 지칭하여 컨슈머라고 한다.
컨슈머에는 두 종류가 있는데, 올드 컨슈머 / 뉴 컨슈머이다.
올드 컨슈머는 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 반면
뉴 컨슈머는 __consumer_offset 이라는 토픽을 사용해서 오프셋을 관리한다.
프로듀서/컨슈머에 연동하는 브로커의 호스트 이름. 포트 1개이상. 2개 이상 브로커 정보를 입력하는게 좋다. 일부 브로커에 문제가 생기더라도 이슈 없도록.
레코드의 메시지 키를 역직렬화하는 클래스 지정. 스트링 -> 자바의 클래스로 역직렬화
레코드의 메시지 값을 역직렬화하는 클래스를 지정. 스트링 -> 자바 클래스
한번에 가져올 수 있는 최소 데이터 사이즈이다.
만약에 지정한 사이즈보다 작은 경우 요청에 응답하지 않고, 데이터가 누적될 때까지 기다린다.
한번에 가져올 수 있는 최대 데이터 사이즈이다.
subscribe()를 사용하는 경우에는 필수. 기본값은 null
컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.
컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 신규 offset을 reset하는 옵션이다.
이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 기본값은 latest이다.
즉 컨슈머 오프셋을 보고 어디까지 데이터를 읽었는지 알고, 그 다음 오프셋에 해당하는 메시지를 읽어야 하는데,
장애 등의 이유로 그 오프셋이 없는 경우에 어떻게 동작할지를 정의하는 옵션이다.
latest로 가장 마지맞 오프셋으로 reset하게 되면 당연히 실제 읽은 오프셋보다 더 높은 오프셋으로 reset하기 때문에
데이터를 생략해서 읽을 가능성이 있다. 따라서 실시간 데이터 처리가 필요한 경우에 적용한다.
earliest같은 경우는 데이터가 유실되지 않음을 보장해야 하는 경우에 사용한다.
무조건적으로 처리해야 하는 데이터...
하지만 중복 처리의 가능성이 있으니 이건 어플리케이션에서 중복 처리에 관한 로직을 만들어줘야 한다.
자동 커밋으로 할지, 수동 커밋으로 할지 true가 디폴트이고, true면 자동 커밋이다.
즉 true로 하면 백그라운드로 주기적으로 오프셋을 커밋하게 된다.
이건 좀 조심해야 한다.
3가지의 단점이 있는데,
1. 데이터 손실 : 데이터를 processing하다가 에러가 발생해도 try catch로 잡는 과정만 있다면 문제는 전파되지 않고 다음 메시지를 처리하게 될것이다.
2. 데이터 중복 처리 : 데이터를 처리하다가 오류나 장애가 발생하게 되면 자동 커밋을 못하게 되고 이는 재처리를 하게 된다.
3. 정확한 처리를 보장하지 않음.
자동 커밋일 경우 오프셋 커밋 간격을 지정. 기본값은 5000ms이다.
요청에 대해 응답을 기다리는 최대 시간이다.
즉 컨슈머가 브로커에 데이터를 요청했을때, 그 응답 타임아웃 기준이 된다.
컨슈머가 브로커와 연결이 끊기는 세션 타임아웃 기준값이다.
기본값은 10000ms이다.
컨슈머는 그룹 코디네이터에게 주기적으로 하트비트를 보내는데, 보내지 않고 이 시간이 지나게 되면
해당 컨슈머는 장애가 발생했다고 판단하여 그룹 코디네이터는 컨슈머 그룹에서 해당 컨슈머를 제외시킨다.
참고로 하트비트는 별도의 백그라운드 쓰레드를 통해서 브로커에게 보낸다.
즉 하트비트가 session.timeout.ms만큼 전송되지 않으면 리밸런싱이 유도된다.
이 값은 데이터 처리의 속도와 연관이 있다.
만약에 이 값이 너무 크다면 실제로 컨슈머가 죽어있지만 컨슈머 그룹에 아직 있는 시간이 길어진다.
따라서 그 시간동안은 컨슈머가 담당하고 있던 파티션은 컨슈밍이 안될것이다.
또 이 값이 너무 짧다면 컨슈머가 정상이지만 네트워크 상황이나 가비지 컬렉션, poll 루프 완료하는 시간이 길어짐으로 인해서 하트비트를 보내는게 조금 늦어질 수도 있다.
그러면 정상 컨슈머여도 강제로 리밸런싱이 일어날거고
이 때문에 리밸런싱이 일어나는 동안에는 데이터 처리가 안되니 데이터 처리 속도가 느려진다. (강제 리밸런싱이 자주 발생할것이다.)
그래서 이건 네트워크나 전송 거리를 신경써야 하는데 네트워크가 불안정하고, 전송 거리가 멀다면 이 값을 좀 크게 두고, 안정적인 네트워크 상황이라면 짧게 두어도 무방하다.
하트비트를 전송하는 시간 간격이다. 기본값은 3000ms이다.
그룹 코디네이터에게 얼마나 자주 poll() 메소드로 하트비트를 보낼 것인지 조정한다.
당연히 session.timeout.ms보다는 작아야 한다. 일반적으로 1/3로 설정한다.
poll메소드를 통해서 반환되는 레코드의 개수를 지정한다.
기본값은 500이다.
즉 한번 poll에서 최대 500개 까지의 데이터를 읽는것.
poll()메소드를 호출하는 간격의 최대 시간. 기본값은 300000ms이다.
예를 들어서 컨슈머가 하트비트만 보내고, 실제로 데이터 처리를 안하고 있을 수도 있다.
이때 poll을 주기적으로 하는지 체크하게 되는데, 이 시간 간격을 지정한다. 만약에 해당 시간 내에 poll을 하지 않으면 리밸런싱이 유도된다.
트랜잭션과 관련된 옵션인데,
fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간이다.
자바로 컨슈머 개발을 해보자.
public class KafkaTestConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "my-kafka:9092");
prop.setProperty("group.id", "sunfish-consumer-group");
prop.setProperty("enable.auto.commit", "true");
prop.setProperty("auto.offset.reset", "latest");
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singletonList("hello-sunfish"));
Set<String> subscription = consumer.subscription();
System.out.println("subscription = " + subscription);
try {
while (true) {
ConsumerRecords<String, String> polledData = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> data : polledData) {
System.out.println("data = " + data);
}
}
} catch (Exception e) {
consumer.close();
}
}
}
코드를 설명해보면,
1. enable.auto.commit을 사용해서 주기적으로 자동 커밋을 한다. 이때 메시지 처리에서 exception이 발생해도 커밋이 되기 때문에, 데이터 손실이 있을 수도 있다.
2. auto.offset.reset은 offset이 유실된경우 어디부터 데이터를 읽을지 정한다. latest이기 때문에 가장 최신 데이터부터 읽기 시작한다.
3. consumer.subscribe()
를 사용해서 특정한 토픽을 구독한다.
4. while(true)를 통해서 무한반복으로 데이터를 poll해온다. 이때 poll메소드에는 timeout ms를 지정할 수 있다. 데이터를 요청했는데 이 시간을 넘어서 받지 못하면 타임아웃으로 간주된다.
5. exception이 발생하면 consumer를 close한다.
consumer.poll을 하게 되면 다음과 같은 순서로 데이터를 가져온다.
한개 이상의 컨슈머가 컨슈머 그룹으로 묶여있는것을 의미한다.
카프카는 그 개념상 데이터를 프로세싱해도 데이터가 삭제되지 않는다.
그렇기 때문에 하나의 메시지를 여러번 프로세싱할 수 있다.
그 여러번 프로세싱하는데 중요한 개념이 컨슈머 그룹이다.
하나의 토픽에서 여러 컨슈머 그룹이 동시에 접속해서 데이터를 읽는다.
하나의 컨슈머가 있어서 그 컨슈머가 3개의 파티션을 구독하고 있는 구조인데,
메시지가 많이 쌓여서 컨슈머를 확장해야 한다고 가정해보자.
그 상황에서 컨슈머 그룹 내의 컨슈머의 수를 늘려주면 된다.
그러면 각각 1개의 파티션을 담당하게 된다.
이런 상황을 리밸런스라고 한다.
하지만 리밸런스 상황에서는 컨슈머 그룹은 일시적으로 컨슈밍이 불가능하다.
파티션을 담당하는 컨슈머를 재배치하기 때문이다.
만약에 이렇게 컨슈머 그룹내의 컨슈머의 수를 늘려줬는데됴 메시지가 쌓이고 있다면 어떻게 해야할까 ?
그러면 파티션의 수를 늘려줘야 한다
또한 컨슈머 그룹내의 하나의 컨슈머가 다운되었을때도 리밸런스가 발생하게 된다.
그러면 다른 컨슈머가 다운된 컨슈머가 담당하던 파티션을 담당하게 된다.
왜 컨슈머 그룹을 활용할까 ?
리소스 메트릭을 실시간으로 수집해서 동기 방식으로 엘라스틱 서치와 하둡에 보내는 하나의 컨슈머가 있다고 하자.
여기서 엘라스틱 서치에 장애가 발생한다면 컨슈머는 이후 로직인 하둡에 보내는 로직에 실패할것이다.
즉 각 컨슈머간의 각 기능들은 강결합이 발생하게 된다..
그래서 컨슈머 그룹을 활용해서 엘라스틱 서치 용 컨슈머 그룹, 하둡 용 컨슈머 그룹으로 관리한다.
두 컨슈머 그룹은 각 그룹이 어디까지 오프셋을 커밋했는지 상관하지 않고 별도로 데이터를 읽는다.
이런 구조는 loosley coupled 되어 있다.
즉 하나에 장애가 발생해도 데이터가 유실되지 않는다.
이렇게 컨슈머 그룹을 분리하면 각 컨슈머 그룹은 다른 컨슈머 그룹의 상태와 무관하게 동작할 수 있다.
즉 서로 격리된 환경을 만드는데 용이하다.
격리된 환경이니까, 추가 기능을 위한 컨슈머 그룹을 새롭게 만들어서 확장에 용이하다.
몽고 디비를 추가한다면, 몽고디비용 컨슈머 그룹을 새롭게 만들어서 운영하면 된다.
즉 기존의 엘라스틱 서치 / 하둡 컨슈머 그룹과는 무관하게 추가할 수 있다.
하나의 데이터를 다양한 목적으로 이용하기에 좋은 구조이다.
기존에 하나의 토픽이 3개의 파티션으로 나눠져 있다고 가정.
3개의 리더파티션을 하나의 컨슈머가 담당.
그런데 프로듀싱이 너무 빨라짐.
그러면 하나의 컨슈머의 처리속도는 한정되어 있어서, 래그가 발생하기 시작함.
이때 추가 컨슈머2,3을 동일한 컨슈머 그룹에 추가함.
기존 컨슈머1이 담당하던 3개의 파티션을 2,3이 나눠서 가져감.
이 과정 자체를 리밸런싱이라고 하는데, 가용성, 확장성 증가가 목적임.
카프카만의 독특한 failover 방식이다.
컨슈머 그룹 내부의 하나의 컨슈머에 장애가 발생했다고 하자.
그러면 해당 장애가 발생한 컨슈머는 컨슈머 그룹에서 제외된다.
그리고 장애가 발생한 컨슈머가 담당하고 있던 파티션은 컨슈머 그룹내부의 다른 컨슈머가 담당한다.
이러한 과정을 리밸런싱이라고 한다.
크게 두가지 상황에서 발생한다.
1. 컨슈머가 컨슈머 그룹에 추가되는 상황
2. 컨슈머가 컨슈머 그룹에서 제외되는 상황 -> 이슈가 발생한 컨슈머를 제외함으로써 지속적으로 데이터 처리 가능 -> 가용성 증가
리밸랜스은 컨슈머가 데이터를 처리하는 과정에서도 발생할 수 있기때문에 리밸랜싱에 대응하는 코드를 작성해야 한다.
RebalanceListener
를 구현해야 함.
토픽의 갯수가 많아질수록, 리밸런싱의 시간이 길어진다.
-> 리밸런싱이 자주 발생하지 않도록 하는게 중요하다.
-> 따라서 리밸런싱과정에서 발생하는 문제들을 잘 해결할 수 있도록 해야한다.