컨슈머그룹에 컨슈머가 추가되거나 삭제되었을 때 카프카 브로커가 어떻게 동작하는지 알아보자.

리벨런싱 테스트

시작

다음과 같은 상황을 가정한다

컨슈머 그룹 이름 : schedule-log-group-test
토픽 이름 : JobLogDev
파티션 개수 : 3개
컨슈머 개수 : 3개

카프카 컨슈머의 특성상 하나의 컨슈머 그룹 안에 컨슈머의 개수가 파티션의 개수보다 많으면 파티션을 할당받지 못하는 컨슈머가 생기게 된다. 그래서 최대의 효율을 위해서는 파티션의 개수와 컨슈머의 개수를 같게 해준다

docker exec -it broker-name kafka-consumer-groups --bootstrap-server localhost:port --describe --group schedule-log-group-test
위 명령어를 통해서 schedule-log-group-test 의 정보를 확인할 수 있다(어떤 컨슈머가 어떤 파티션에 할당되었는지 확인 가능)

파티션 각 파티션의 정보는 아래를 참고 하도록 하자

CURRENT-OFFSET: 342 // [1] 컨슈머 my-app-eb0ddd21-cb31-44f0-b852-00f164e550f8 의 현재 오프셋
LOG-END-OFFSET: 342 // [2] 총 메세지 개수
LAG: 0              // [2] - [1]

만약 총 메세지 개수가 500 개이고 현재 오프셋이 342 이면 LAG는 158(500 - 342) 로 찍힐 것이다

컨슈머 삭제

이제 코드에서 컨슈머 하나를 주석 처리하고 앱을 재실행 해보자.

이제 schedule-log-group-test 그룹에서 JobLogDev 를 읽는 컨슈머는 두개이다. 카프카 브로커는 남은 두개의 컨슈머가 3개의 파티션에 (잘?) 할당하는 작업을 진행한다

컨슈머그룹 정보를 보면 컨슈머 아이디 my-app-1a9aedf4-4c2e-4c41-8ce1-d37641b7dc26 가 파티션 0번과 1번을 할당받은것을 알 수 있다.
그리고 남은 컨슈머 my-app-60f6c0b0-4d44-4df2-af18-8982cd50369c가 파티션 2번을 할당 받았다.

컨슈머 추가

다시 죽었던 컨슈머를 부활시켜서 컨슈머 개수와 JobLogDev 토픽의 파티션 개수(3개) 를 동일하게 해보자

{"level":"INFO","timestamp":"2023-02-04T14:19:50.589Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-da284b3b-7d2c-426b-8558-2c27f70c4a4f","leaderId":"my-app-b15583a4-bf6f-4260-bda5-25f4f49ce146","isLeader":false,"memberAssignment":{"JobLogDev":[1]},"groupProtocol":"RoundRobinAssigner","duration":23700}
{"level":"INFO","timestamp":"2023-02-04T14:19:50.598Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-b15583a4-bf6f-4260-bda5-25f4f49ce146","leaderId":"my-app-b15583a4-bf6f-4260-bda5-25f4f49ce146","isLeader":true,"memberAssignment":{"JobLogDev":[2]},"groupProtocol":"RoundRobinAssigner","duration":23707}
{"level":"INFO","timestamp":"2023-02-04T14:19:50.600Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-a8eb8f69-e988-4d47-a91e-0f67ff3a7656","leaderId":"my-app-b15583a4-bf6f-4260-bda5-25f4f49ce146","isLeader":false,"memberAssignment":{"JobLogDev":[0]},"groupProtocol":"RoundRobinAssigner","duration":23708}

로그를 확인하면 "groupProtocol":"RoundRobinAssigner" 에 의해서 라운드로빈 방식으로 컨슈머에 파티션이 ssign 된 것을 확인할 수 있다

다시 3개의 컨슈머가 3개의 파티션에 랜덤으로 할당 된것을 확인할 수 있다. 공식문서 에서는RoundRobinAssigner 에 대해서 다음과 같이 설명한다.

The round robin assignor lays out all the available partitions and all the available consumers. It then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumers.)

라운드로빈 방식이 실행되면 기존에 할당되어 있던 파티션과 컨슈머 조합을 전부 폐기하고 파티션을 컨슈머에 랜덤 배치한다는 뜻으로 해석된다.

컨슈머 그룹 프로토콜

위의 로그에서 언급한 "groupProtocol":"RoundRobinAssigner" 에 대해서 좀 더 자세하게 알아보자
카프카 그룹은 어떻게 파티션을 컨슈머에 assign 해 줄것인가는 4가지 프로토콜에 의해서 정해진다

Range

각각의 파티션은 정수 번호를 가진다. Range 방식은 각각의 컨슈머에 파티션의 정수 번호의 범위를 지정해주는 것이다. 예를들어 컨슈머가 2개가 있고 파티션이 10개가 있다면 0~4 파티션까지는 컨슈머1이 담당하고, 5~9까지는 컨슈머2가 담당하도록 할 수 있다

Round Robin

모든 컨슈머를 파티션에 할당해주는 방식이다. 예를들어 파티션이 6개 컨슈머가 3개가 있다면, 컨슈머1-파티션1, 컨슈머2-파티션2, 컨슈머3-파티션3, 컨슈머1-파티션4, 컨슈머2-파티션5, 컨슈머3-파티션6 이런 방식으로 할당이 안된 파티션이 없게 컨슈머를 할당한다.

Sticky

이 프로토콜은 컨슈머가 그룹을 떠났다가 다시 가입한 후에도 이전 상태의 파티션-컨슈머 할당을 가능하게 한다.

Sticky 프로토콜이 동작하는 방식은 다음과 같다.

  1. 컨슈머가 그룹에 가입하면 그룹 코디네이터에게 파티션 할당을 요청한다.

  2. 코디네이터는 컨슈머가 이전에 파티션을 소유한 적이 있는지 확인하고 그렇다면 해당 파티션을 소비자에게 다시 할당한다.

  3. 컨슈머가 이전에 파티션을 소유하지 않은 경우 코디네이터는 라운드 로빈 전략을 사용하여 소비자에게 파티션을 할당한다.

  4. 컨슈머가 그룹을 떠난다면 파티션이 그룹의 다른 컨슈머에게 재할당 된다.

  5. 컨슈머가 그룹에 다시 가입하면 코디네이터에게 파티션 할당을 요청하고 코디네이터는 가능한 경우 이전에 소유한 파티션을 컨슈머에게 다시 할당한다.

이런 방식은 네트워크 오류 또는 서버 다시 시작으로 인해 컨슈머가 일시적으로 오프라인 상태가 될 수 있는 시나리오에서 유용하게 작용한다.
파티션-컨슈머 할당을 유지함으로써 파티션 재할당의 오버헤드를 줄이는 데 도움이 될 수 있으며 보다 원활하고 일관된 메세지 컨슘을 가능하게 한다.

Custom

위 3가지 방식외에 다른 방법을 통해서 컨슈머에 파티션을 할당해주고 싶다면 이 커스텀한 방법을 정의할 수 있다.
카프카 클라이언트에서 커스텀한 구조체를 선언해서 구현 가능한데 go를 통해서 사용법을 알아보자.

type customPartitionAssignor struct {}

func (c *customPartitionAssignor) Name() string {
    return "custom"
}

func (c *customPartitionAssignor) Assign(metadata *kafka.Metadata, groupGeneration int32, members []kafka.GroupMember) (map[string][]int32, error) {
     // Create a map to store the partition assignment for each member
    assignments := make(map[string][]int32)
    
    // Get the list of available partitions for the topic
    partitions := metadata.Partitions[r.Topic]
    
    // Loop through each member and assign partitions to them
    for _, member := range members {
        memberID := member.ID
        for _, partition := range partitions {
            // Assign the partition to the member
            assignments[memberID] = append(assignments[memberID], partition.ID)
        }
    }
    
    return assignments, nil
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    Topic:     "my-topic",
    GroupID:   "my-group",
    Partition: 0,
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})
r.SetAssignor(customPartitionAssignor{})


for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        panic(err)
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

정리

주목 할 점은 당연한 이야기 이겠지만 "시작" 단계에서 3개의 컨슈머와 3개의 파티션이 할당되었을 때의 컨슈머-파티션 매칭과 "컨슈머 추가" 단계에서의 컨슈머-파티션 매칭은 다르다는 사실이다.

이 말은 "시작" 단계에서 컨슈머1(예를들어)이 읽고 있던 파티션의 CURRENT-OFFSET과 "컨슈머 추가" 단계에서 컨슈머1에 할당된 CURRENT-OFFSET은 다르다는 뜻이다.

즉, 하나의 토픽에 여러개의 컨슈머를 assign 하는 경우에는 모든 컨슈머가 같은 동작을 하도록 만드는것이 좋을 것 같다. 라이브 환경에서는 언제든 컨슈머가 죽었다가 살아날 수 있다는 것을 가정해야 하고, 컨슈머가 죽었다가(삭제 되었다가) 살아난다면(추가된다면) 리벨런싱 되어서 다른 파티션의 오프셋에 있는 데이터를 읽고 있을 것이기 때문이다.

다른 인스턴스에 컨슈머 추가

다음과 같은 상황을 추가 해보자

같은 컨슈머 그룹을 사용하는 다른 인스턴스를 띄운다

이제 두개의 인스턴스에 각각 3개씩, 총 6개의 컨슈머가 schedule-log-group-test 그룹에 등록되어 있다.

첫번째 인스턴스의 컨슈머 등록 로그

{"level":"INFO","timestamp":"2023-02-04T15:38:07.582Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-1b28ceba-461e-407c-b02c-faec25d3777f","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":false,"memberAssignment":{"JobLogDev":[2]},"groupProtocol":"RoundRobinAssigner","duration":503}
{"level":"INFO","timestamp":"2023-02-04T15:38:07.589Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-cedbf37f-5f85-4f5b-9c58-84d4d7b9e340","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":519}
{"level":"INFO","timestamp":"2023-02-04T15:38:07.600Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-34c7a811-d801-480c-a32f-c34f155b6795","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":false,"memberAssignment":{"JobLogDev":[1]},"groupProtocol":"RoundRobinAssigner","duration":518}

두번째 인스턴스의 컨슈머 등록 로그

{"level":"INFO","timestamp":"2023-02-04T15:38:07.580Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-0d3e457a-fdc3-4d4e-88a6-18a4d2435ea0","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":false,"memberAssignment":{"JobLogDev":[0]},"groupProtocol":"RoundRobinAssigner","duration":98}
{"level":"INFO","timestamp":"2023-02-04T15:38:07.584Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":true,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":106}
{"level":"INFO","timestamp":"2023-02-04T15:38:07.587Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-b4c63877-df55-4f22-a868-f1a56903b053","leaderId":"my-app-f92fcaff-d74b-4963-97e8-83a6637bf719","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":103}

로그를 보면 총 6개의 컨슈머 아이디를 할당받은것을 확인 할 수 있다.

파티션이 3개인 상태에서 코오디네이터는 Round Robin 방식으로 컨슈머에 파티션을 할당한다.

이경우는 파티션이 컨슈머의 개수보다 적은 상태이기 때문에 나머지 3개의 컨슈머는 놀게된다

사진을 보면 파티션0에 할당된 호스트와 파티션1, 파티션2에 할당된 호스트가 다른것을 확인할 수 있다

새로운 토픽 추가

이제까지는 기존 컨슈머그룹에 토픽의 변경없이 새로운 컨슈머를 추가하거나 삭제했었다. 그리고 컨슈머 그룹 리벨런식을 통해서 컨슈머와 파티션이 재조합 되는것을 확인했다.

이번에는 컨슈머그룹에 새로운 토픽을 추가했을 때 컨슈머그룹이 어떻게 동작하는지 알아보자

기존컨슈머의 토픽변경

기존에 토픽 A를 컨슘하고 있었다면 이 컨슈머가 토픽 B를 컨슘하도록 바꿔본다.

{"level":"INFO","timestamp":"2023-02-20T10:52:57.615Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-43543f90-0187-433d-8994-f798b5f5fc01","leaderId":"my-app-43543f90-0187-433d-8994-f798b5f5fc01","isLeader":true,"memberAssignment":{"JobLogDev":[0]},"groupProtocol":"RoundRobinAssigner","duration":20726}
{"level":"INFO","timestamp":"2023-02-20T10:52:57.622Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-cecbe87e-fbff-4724-ac3e-e1d8bd0f5a0d","leaderId":"my-app-43543f90-0187-433d-8994-f798b5f5fc01","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":20724}
{"level":"WARN","timestamp":"2023-02-20T10:52:57.621Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer group received unsubscribed topics","groupId":"schedule-log-group-test","generationId":79,"memberId":"my-app-cecbe87e-fbff-4724-ac3e-e1d8bd0f5a0d","assignedTopics":["JobLogDev"],"topicsSubscribed":["createDeferredIncomeDev"],"topicsNotSubscribed":["JobLogDev"],"helpUrl":"https://kafka.js.org/docs/faq#why-am-i-receiving-messages-for-topics-i-m-not-subscribed-to"}
{"level":"INFO","timestamp":"2023-02-20T10:52:57.635Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-d5f353b8-d73d-4df9-97e9-beae39ddc1dc","leaderId":"my-app-43543f90-0187-433d-8994-f798b5f5fc01","isLeader":false,"memberAssignment":{"JobLogDev":[1]},"groupProtocol":"RoundRobinAssigner","duration":20734}

토픽 A를 컨슘하던 컨슈머가 토픽B를 컨슘하게 되자 다음과같은 상황이 발생한다

Consumer group received unsubscribed topics

새로운 토픽을 컨슘하는 새로운 컨슈머 추가

이제는 기존의 컨슈머는 건들지 말고 새로운 컨슈머를 추가해서 새로운 토픽을 컨슘하도록 만들어보자

{"level":"INFO","timestamp":"2023-02-20T11:00:47.968Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","leaderId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","isLeader":true,"memberAssignment":{"JobLogDev":[1]},"groupProtocol":"RoundRobinAssigner","duration":20614}
{"level":"INFO","timestamp":"2023-02-20T11:00:47.974Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-31f6ab5e-60bd-4f25-b11a-37b5c5e2f491","leaderId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","isLeader":false,"memberAssignment":{"JobLogDev":[2]},"groupProtocol":"RoundRobinAssigner","duration":20630}
{"level":"INFO","timestamp":"2023-02-20T11:00:47.975Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-ed46b2ec-4574-4eda-824b-04a9a160e851","leaderId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":20618}
{"level":"INFO","timestamp":"2023-02-20T11:00:47.976Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-1a3d7f7e-cb8f-4405-b6bf-5c52db5afc7b","leaderId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","isLeader":false,"memberAssignment":{"JobLogDev":[0]},"groupProtocol":"RoundRobinAssigner","duration":20628}

기존의 컨슈머들은 토픽 A를 잘 컨슘하도록 할당되었다. 하지만 새로 추가된 컨슈머는 등록은 되었지만(멤버 아이디가 생겼기 때문에 등록은 되었다고 볼수 있음), 토픽 B를 컨슘하는데는 실패했음을 알수 있다

{"level":"INFO","timestamp":"2023-02-20T11:00:47.975Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"schedule-log-group-test","memberId":"my-app-ed46b2ec-4574-4eda-824b-04a9a160e851","leaderId":"my-app-84d037fa-e8c7-4a9a-9254-d9fede53ba3d","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":20618}

메세지를 자세히 보면 "memberAssignment":{} 이렇게 컨슈머 아이디 "my-app-ed46b2ec-4574-4eda-824b-04a9a160e851" 에 해당하는 컨슈머는 토픽 할당을 못받았음을 알수있다.

결론

실험 결과 컨슈머그룹은 처음 만들어질 때 부터 어떤 토픽을 컨슘할건지 정해져있다.
만약 컨슈머그룹에 새로운 토픽을 추가하고 싶다면 컨슈머그룹을 새로 만들어주어야 할거같다.

profile
Quit talking, Begin doing

0개의 댓글