데브원영 DVWY님 Youtube을 보고 정리한 내용입니다.
카프카에는 다양한 데이터가 들어갈 수 있다. 이 데이터가 들어갈 수 있는 공간을 Topic이라고 한다.
토픽은 여러개 생성할 수 있으며 데이터베이스의 테이블이나 파일시스템의 폴더와 유사한 성질을 갖고 있다. 이 토픽에 Producer가 데이터를 넣게 되고 Consumer는 데이터를 가져가게 된다. 그리고 토픽은 이름을 가질 수 있는데 목적에 따라 clickLog, sendSms, locationLog 등과 같이 무슨 데이터를 담는지 명확히 명시하면 유지보수할때 편리하게 관리가 가능하다.
하나의 토픽은 여러개의 파티션으로 구성될 수 있다. 첫번째 파티션은 0번부터 시작하며 각 파티션은 큐와 같이 내부의 데이터가 파티션끝에서 부터 차곡차곡 쌓이게 된다. click_log 토픽에 kafka consumer가 붙게 되면 데이터를 가장 오래된 순서부터 가져간다. 데이터가 더이상 들어오지 않으면 또 다른 데이터가 들어올 때까지 기다리게 된다. Consumer가 토픽 내부의 데이터를 가져가더라도 해당 데이터는 파티션에서 사라지지 않는다. 이 데이터는 새로운 Consumer가 다시 0번부터 가져가 사용할 수 있다.
이 처럼 사용하게 되면 동일 데이터를 두번 처리할 수 있는데 이는 카프카를 사용하는 아주 중요한 이유기도 하다. 클릭 로그를 시각화하기 위해 엘라스틱 서치에 저장(Consumer #1)하거나 백업하기 위해 하둡에 저장(Consumer #2)할 수도 있다.
여러개의 파티션이 존재할때, 데이터 4을 어느 파티션에 집어 넣어야할까?
먼저 프로듀서는 데이터를 보낼때 키를 지정할 수가 있다.
파티션을 하나더 추가해보았다. 파티션을 늘리면 컨슈머의 개수를 늘려 데이터 처리를 분산시킬 수 있게 된다. 하지만 파티션을 늘리는 것을 조심해야 한다고 한다. 파티션을 늘리는 것은 가능하지만 다시 줄일 수가 없기 때문이다.
그렇다면 파티션의 레코드는 언제 삭제가 될까?
삭제되는 타이밍은 옵션에 따라 다르다. 레코드가 저장되는 최대 시간과 크기를 지정할 수 있다. 이를 지정하여 일정한 기간 또는 용량동안 데이터를 저장할 수 있고 적절하게 데이터가 삭제될 수 있도록 설정할 수 있다.
Producer는 데이터를 생산하는 역할을 맡는다. 즉 데이터를 kafka topic에 생성한다는 뜻이다. 프로듀서는 Topic에 해당하는 메시지를 생성하고 특정 Topic으로 데이터를 publish(전송), 그리고 kafka broker로 데이터를 전송할때 전송의 성공여부를 알 수 있으며 처리 실패시 재시도를 한다.
implementation 'org.springframework.kafka:spring-kafka'
// Producer
spring.kafka.producer.bootstrap-servers=localhost:9092
// key는 메시지를 보내면 토픽의 파티션이 지정될 때 씀
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
key가 null인 데이터를 파티션이 1개인 토픽에 보내면 이렇게 쌓이게 된다. 여기서 파티션이 한개더 생기면 어떻게 될까?
key값이 지정되지 않았으므로 라운드 로빈 방식으로 파티션 0과 1을 번갈아가며 데이터를 넣는다.
key값이 지정되었을 경우 hash값으로 partition에 데이터를 넣는다. 만약 이 상황에서 파티션을 하나 더 추가하게 된다면?
토픽에 새로운 파티션을 추가하는 순간 key와 파티션의 매칭이 깨지기 때문에 key와 파티션 연결은 보장되지 않는다. 따라서 key를 사용할경우 이 점을 유의하여 파티션 개수를 생성하고 추후에 추가로 생성하지 않는걸 권한다.
카프카 브로커는 카프카가 설치되어 있는 서버의 단위를 말하며 보통은 3개이상의 브로커로 구성해 사용하는 것을 권장한다. 만약 파티션이 1개, replication이 1인 토픽이 존재하고 브로커가 3대라면 브로커 3대중 1대에 해당 토픽의 데이터가 저장된다.
여기서 나온 Replication은 partition의 복제를 말한다. 만약 replication이 1이라면 partition은 1개만 존재한다는 뜻이고 replication이 2라면 파티션은 원본 1개와 복제본 1개로 총 2개가 존재한다. replication이 3이라면 원본 1개와 복제본 2개로 총 3개가 존재한다.
브로커 개수에 따라 replication의 개수가 제한된다. 예를 들어 브로커의 개수가 3인데 replication은 4가 될 수 없다는 것이다.
여기서 원본 파티션을 leader partition이라고도 부른다. 그리고 나머지 복제본 파티션은 follower partition이라고 부른다. 이 leader와 follower 파티션을 합쳐 ISR(In Sync Replica)라고 볼 수 있다.
그런데 왜 replication을 사용하는 것일까?🤷🏼♀️ 레플리케이션은 파티션의 고가용성을 위해 사용된다.
만약 브로커가 3개인 카프카에서 replication=1, partition=1인 토픽이 존재한다고 가정해보자. 갑자기 브로커 하나가 어떠한 이유로 사용할 수 없게된다면 더이상 해당 파티션은 복구할 수 없게 된다.
만약 replication=2라면 브로커 1개가 죽더라도 복제본이 존재하므로 복제본으로 복구를 할 수 있게 된다. 나머지 하나의 follower 파티션이 leader 파티션 역할을 승계하게 되는 것이다.
그렇다면 정확히 leader partition과 follower partition의 역할은 무엇일까?🙋🏻♀️
프로듀서가 토픽에 데이터를 전달할 때 전달받는 주체가 바로 leader partition이다. 프로듀서에는 ack라는 상세 옵션이 있는데 ack를 통해 고가용성을 유지할 수 있고 partition의 replication과 관련이 있다. ack는 0, 1 그리고 all 옵션 3개중에 1개를 골라 설정할 수 있다.
ack = 0일 경우 프로듀서는 leader 파티션에 데이터를 전송하고 응답값은 받지 않는다. 따라서 leader 파티션에 데이터가 정상적으로 전송됐는지, 나머지 파티션에 정상정으로 복제되었는지 알 수 없고 보장할 수 없다. 그래서 데이터 유실 가능성이 있긴 하지만 속도는 빠르다는 장점이 있다.
ack = 1일 경우 leader 파티션에 데이터를 전송하고 leader 파티션이 데이터를 정상적으로 받았는지 응답값을 받는다. 하지만 나머지 파티션에 복제가 되었는지 확인할 수는 없다. 만약 leader 파티션이 데이터를 받은 즉시 해당 브로커에 장애가 발생한다면 나머지 파티션에 데이터가 미처 전송되지 못한 상태이므로 마찬가지로 데이터 유실 가능성이 있다.
ack = all일 경우 1 옵션에 추가로 follower 파티션에 복제가 잘 이루어졌는지 응답값을 받는다. 즉 리더 파티션에 데이터를 보낸 후 나머지 팔로워 파티션에도 데이터가 저장되어 있는지 확인되어있는 작업이 추가적으로 발생하여 데이터의 유실은 발생하지 않는다. 하지만 0과 1 옵션에 비해 확인하는 부분이 증가해 속도가 현저히 느리다는 단점이 있다.
레플리케이션이 고가용성을 위해 중요한 역할을 한다는데 그럼 개수가 많을 수록 좋은게 아닌가? 생각이 들 수도 있다. 하지만 개수가 많아지게 되면 그만큼 브로커의 리소스 사용량도 늘어나게 된다. 따라서 카프카에 들어오는 데이터양과 retention date(저장 시간)을 잘 고려하여 레플리케이션 개수를 정하는 것이 좋다. 브로커 개수가 3일 때 레플리케이션 개수는 3으로 설정하는 것을 추천한다고 한다.
카프카에서 컨슈머가 데이터를 가져가더라도 데이터가 사라지지 않는다. 이와 같은 특징은 카프카, 컨슈머를 데이터 파이프라인으로 운영할 수 있도록 핵심적인 역할을 한다. 카프카 컨슈머가 데이터 파이프라인으로 어떻게 동작하는지, 카프카 컨슈머의 역할에 알아보도록 하자.
카프카 컨슈머는 기본적으로 토픽 내부의 파티션에 저장된 데이터를 가져온다. 이렇게 데이터를 가져오는 것을 Polling이라고 한다.
Consumer의 역할
offeset number는 토픽별로, 파티션별로 별개로 지정된다. offset은 컨슈머가 데이터를 어느 지점까지 읽었는지 확인하는 용도로 사용된다. consumer가 데이터를 읽기 시작하면 offset을 commit하게 되는데 이렇게 가져간 내용에 대한 정보는 카프카의 _consumer_offset
토픽에 저장한다. 컨슈머는 파티션이 두개인 click_log 토픽에서 데이터를 가져가게 되고 그럴 때마다 offset 정보가 저장된다.
만약 컨슈머가 갑자기 실행이 중지되었을 경우를 생각해보자. 컨슈머는 파티션0에서 3번 오프셋까지 읽고 파티션1에서 2번 오프셋까지 읽은 상태이다. 이 컨슈머가 어디까지 읽었는지에 대한 정보는 이미 _consumer_offset
에 저장되어 있어 이 컨슈머를 재실행하면 중지되었던 시점을 알고 있으므로 복구를 할 수 있다. → 데이터의 처리 시점을 복구할 수 있어 고가용성 특징을 가진다.
// Consumer
spring.kafka.consumer.bootstrap-servers=localhost:9092 // 여러개의 브로커를 지정하는 것이 좋음
spring.kafka.consumer.group-id=group-id-haha // consumer group, 컨슈머의 묶음
spring.kafka.consumer.auto-offset-reset=earliest
// key, value에 대한 역직렬화 설정
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
@KafkaListener(topics = "haha", groupId = "group-id-haha")
public void consume(String message){
System.out.println("[consume] receive message: " + message);
}
그렇다면 이러한 컨슈머는 몇개까지 생성할 수 있을까?
만약 컨슈머가 1개인 경우, 2개의 파티션에서 데이터를 가져간다.
2개의 컨슈머일 경우 각 컨슈머가 각각의 파티션을 할당해 데이터를 가져가 처리한다.
여기서 하나의 컨슈머가 추가되어 3개가 된다면, 이미 파티션들이 각 컨슈머에 할당되었으므로 더이상 할당될 파티션이 없어서 동작하지 않는다. 이와 같이 여러 파티션을 가진 토픽에 대해서 컨슈머를 병렬 처리하고 앂다면 컨슈머는 파티션의 개수보다 적은 개수로 실행시켜야 한다는 점을 기억하자!
이번에는 컨슈머 그룹이 다른 컨슈머들의 동작에 대해 알아보자. 각기 다른 컨슈머 그룹에 속한 컨슈머들은 다른 컨슈머 그룹에 영향을 미치지 않고 독립적이다.
데이터 실시간 시각화 및 분석을 위해 Elasticsearch에 데이터를 저장하는 역할을 하는 컨슈머 그룹과 데이터 백업용도로 하둡에 데이터를 저장하는 컨슈머 그룹이 있다고 가정해보자.
만약 엘라스틱 서치에 저장하는 컨슈머 그룹이 각 파티션에 특정 offset을 읽고 있어도 하둡에 저장하는 역할을 하는 컨슈머 그룹이 데이터를 읽는 데 영향을 미치지 않는다. _consumer_offset
에는 컨슈머 그룹별로, 토픽별로 offset을 나누어 저장하기 때문이다.
이러한 카프카의 특징으로 하나의 토픽으로 들어온 데이터는 다양한 역할을 하는 컨슈머들이 각자 원하는 데이터로 처리될 수 있게 된다.