[Kafka] 3장 카프카 기본개념 설명 (2)

600g (Kim Dong Geun)·2021년 6월 7일
1
post-thumbnail

컨슈머 중요개념

토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지가 존재.

  • 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것
  • 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것
컨슈머 그룹으로 운영하는 방법
  • 컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에 안전하게 운영할 수 있도록 도와주는 방식.
  • 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.
  • 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다.
  • 그리고 1개의 컨슈머는 여러개의 파티션에 할당될 수 있다.
  • 이러한 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.

예를들어 컨슈머가 파티션 개수보다 많을 경우, 파티션은 컨슈머에 1대1로 할당되고 남는 컨슈머는 데이터를 처리하지 못하고 불필요한 스레드로 남게 된다.

  • 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있음
    • 따라서 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지않게 처리할 수 있다는 장점이 있음.

컨슈머 그룹의 컨슈머에 장애가 발생하면?

  • 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.
  • 이러한 과정을 리밸런싱이라고 한다.
  • 리밸런싱은 크게 2가지 상황에서 일어남
    1. 컨슈머가 추가되는 상황
    2. 컨슈머가 제외되는 상황
  • 리밸런싱이 일어나면 이슈가 발생한 컨슈머에 할당된 파티션은 더는 데이터 처리를 하지 못하고 있으므로 데이터 처리에 지연이 발생할 수 있다.
    • 이를 해소하기 위해 발생한 컨슈머를 컨슈머 그룹에서 제외하여 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여준다.
  • 리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해둬야한다.
  • 물론 리밸런싱이 자주 일어나면 성능상 이슈가 발생한다.
    • 파티션의 소유권을 다른 컨슈머로 재할당하는 과정에서 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽지 않기 때문이다.
  • 그룹 조정자(group coordinator)는 리밸런싱을 발동시키는 역할을 하는데, 컨슈머 그룹의 컨슈머가 추가되고 삭제될 때를 감지
    • 카프카 브로커 중 한 대가 그룹 조정자의 역할을 수행
  • 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록한다.
    • 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽에 기록된다.
    • 컨슈머 동작 이슈가 발생하여 _consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.
    • 그러므로 데이터 처리의 중복이 발생하지 않게 하기 위해서는 컨슈머 어플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증할 것
  • 오프셋 커밋은 컨슈머 어플리케이션에서 명시적, 비 명시적 으로 수행할 수 있다.
    • 기본 옵션은 Poll()메소드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어있다.
    • 이렇게 일정 간격마다 자동으로 커밋되는 것비명시 오프셋 커밋이라고 부른다.
    • 이 옵션은 auto.commit.interval.ms에 설정된 값과 함께 사용되는데, poll 메소드가 auto.commit.interval.ms에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋한다.
    • poll() 메소드를 호출할 때 커밋을 수행하므로 코드상에서 따로 커밋 관련 코드를 작성할 필요없음.
    • 비 명시적 오프셋 커밋 은 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있다.
    • 그러므로 데이터 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋을 허용해선 안된다.
  • 명시적으로 오프셋을 커밋하려면 poll()메소드 호출 이후에 반환받은 데이터의 처리가 완료되고 commitSync() 메소드를 호출하면 된다.

    • commitSync()메소드는 poll() 메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.
    • commitSync()메소드는 브로커에 커밋 요청을 하고 커밋이 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다.
      • Round trip time 때문
  • 이를 해결하기 위해 commitAsync() 메소드를 사용하여 커밋 요청을 전송하고 응답이 오기전까지 데이터 처리를 수행할 수 있다.

    • 하지만 비동기 커밋은 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수 있음.
  • 컨슈머의 내부구조는 poll()메소드를 통해 레코드를 반환받지만 poll() 메소드를 호출하는 시점에 클러스터에 데이터를 가져오는 것은 아니다.
    • 컨슈머 어플리케이션을 실행하게 되면 내부에서 Fetcher인스턴스가 생성되어 poll()메소드를 호출하기 전에 미리 레코드들을 내부 큐로 가져온다.
    • 이후에 사용자가 명시적으로 poll()메소드를 호출하면 컨슈머는 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.

컨슈머의 주요옵션

  • 필수옵션
옵션 명설명
bootstrap.servers프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정가능한다.
key.deserializer레코드의 메시지 키를 역직렬화하는 클래스를 지정한다
value.deserializer레코드의 메시지 값을 역직렬화하는 클래스를 지정한다.
  • 선택옵션
옵션 명설명
group.id컨슈머 그룹 id를 지정한다. subscribe() 메소드로 토픽을 구독하여 사용할 때는 이 옵션을 필수로 넣어야 한다. 기본값은 null 이다.
auto.offset.reset컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다. 이미 컨슈머 오프셋이 있다면 이 옵션 값은 무시된다.
이 옵션은 latest,earliest, none중 설정가능하다.
latest : 가장 최근에 넣은(높은) 오프셋부터 읽기 시작
earilest : 가장 처음에 넣은(낮은) 오프셋부터 읽기 시작
none : 컨슈머 그룹이 커밋한 기록이 있는지 찾아보고 커밋 기록이 없으면 오류 반환, 존재한다면 기존 커밋 기록 이후 오프셋부터 읽기시작한다
enable.auto.commit자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true
auto.commit.interval.ms자동 커밋(enable.auto.commit=true)일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초)이다.
max.poll.recordsPoll() 메소드를 통해 반환되는 레코드 개수를 지정한다 기본값은 500이다.
session.timeout.ms컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 이 시간 내에 하트 비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 하트비트 시간 간격의 3배로 설정한다.
heartbeat.interval.ms하트 비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다.
max.poll.interval.msPoll() 메소드를 호출하는 간격의 최대 시간을 지정한다. poll() 메소드를 호출한 이후에 데이터를 처리하는데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작
isolation.level트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. read_committed, read_uncommited로 설정할 수 있다.
read_commited : 커밋이 완료된 레코드만 읽는다.
read_uncommited : 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.
  • 동기 오프셋 커밋
		configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));

		logger.info("{}","start");
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
			for(ConsumerRecord<String,String> record : records){
				logger.info("{}",record.toString());
			}
			consumer.commitSync();
		}
  • poll() 메소드가 호출된 이후에 commitSync() 메소드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있다.
  • commitSync()는 poll() 메소드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋한다.
    • 그렇기 때문에 동기 오프셋 커밋을 사용할 경우에는 poll() 메소드로 받은 모든 레코드의 처리가 끝난 이후 commitSync() 메소드를 호출해야 한다.
    • 동기 커밋의 경우 브로커로 커밋을 요청한 이후에 커밋이 완료될 때 까지 기다린다.
  • 물론 위와같은 특성때문에 자동 커밋이나 비동기 오프셋 커밋 보다는 동일 시간당 데이터 처리량이 적다는 특징이 있다.
  • commitSync()에 파라미터가 들어가지 않으면 poll()로 반환된 가장 마지막 레코드의 오프셋을 기준으로 커밋된다. 만약 개별 레코드 단위로 매번 오프셋을 커밋하고 싶다면 Map<TopicPartition,OffsetAndMetaData> 인스턴스를 파라미터로 넣으면된다.
  • 비동기 오프셋
		configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME));

		logger.info("{}","start");
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
			for(ConsumerRecord<String,String> record : records){
				logger.info("{}",record.toString());
			}
			consumer.commitAsync();
		}
  • 비동기 오프셋도 마찬가지로 poll() 메소드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋한다. 다만, 동기 오프셋 커밋과 다른점은 커밋이 완료될때 까지 응답을 기다리지 않는 다는 것
  • 따라서 처리량이 더많다.

물론 callback 함수를 따로 구현해줘야한다.

consumer.commitAsync(new OffsetCommitcallback() {
  public void onComplete(Map<TopicPartiton, OffsetAndMetadata> offsets, Exception e){
    /// 코드 서술
  }
  
})

리밸런스 리스너를 가진 컨슈머

  • 컨슈머 그룹에서 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스 가 일어난다.

  • poll() 메소드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복처리할 수도 있다.

  • 리밸런스 발생시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야한다.

    • 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerrebalanceListener인터페이스를 지원한다.
    • ConsumerRebalanceListener 인터페이스로 구현된 클래스는 onPartitionAssigned() 메소드와 onPartitionRevoked() 메소드로 이루어져있다.
    • onPartitionAssigned : 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메소드
    • onPartitionRevoked : 리밸런스가 시작되기 직전에 호출되는 메소드

    따라서 마지막으로 처리한 레코드를 기준으로 커밋을 하기 위해서는 리밸런스가 시작하기 직전에 커밋을 하면되므로 onPartitonRevoked() 메소드에 커밋을 구현하여 처리할 수 있다.

consumer = new KafkaConsumer<String, String>(configs);
		consumer.subscribe(Collections.singletonList(TOPIC_NAME),new RebalanceListener());

		logger.info("{}","start");
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
			for(ConsumerRecord<String,String> record : records){
				logger.info("{}",record.toString());
			}
			consumer.commitSync();
		}
	}

	private static class RebalanceListener implements ConsumerRebalanceListener {

		@Override
		public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
			logger.warn("Partitions are assigned");
		}

		@Override
		public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
			logger.warn("Partitions are revoked");
			consumer.commitSync();
		}
	}

파티션 할당 컨슈머

컨슈머를 운영할 때 subscribe() 메소드를 사용하여 구독형태로 사용하는 것 외에도 직접 파티션을 컨슈머에 명시적으로 할당하여 운영할 수 있다.

  • 컨슈머가 어떤 토픽, 파티션을 할당할지 명시적으로 선언할 때는 assign() 메소드를 사용하면 된다.
  • assign() 메소드는 다수의 TopicPartition 인스턴스를 지닌 자바 컬렉션 타입을 파라미터로 받는다.
  • TopicPartition클래스는 카프카 라이브러리 내/외부에서 사용되는 토픽, 파티션의 정보를 담는 객체로 사용된다.
private final static int TOPIC_NUMBER = 0;

public static void main(String ...args){
//...
  
consumer = new KafkaConsumer<String, String>(configs);
//		consumer.subscribe(Collections.singletonList(TOPIC_NAME),new RebalanceListener());

// 컨슈머가 특정 파티션을 바라보도록
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
 
  //...
  
}
  • 이전에 사용하던 subscribe() 메소드 대신 assign() 메소드가 사용된 것을 볼 수 있다.
  • 이제 consumer는 test 토픽의 0번 파티션만을 바라보고 데이터를 처리할 것이다.
  • subscribe() 와는 다르게 직접 컨슈머가 특정 토픽, 특정파티션에 할당되므로 리밸런싱 하는 과정이 없다.

컨슈머에 할당된 파티션 확인 방법

  • 컨슈머에 할당된 토픽과 파티션에 대한 정보는 assignment() 메소드로 확인할 수 있다.
  • assignment()메소드는 Set<TopicPartition> 인스턴스를 반환한다.
    • TopicPartiton 클래스는 토픽이름파티션 번호가 포함된 객체이다.
		// 컨슈머가 특정 파티션을 바라보도록
		consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
		Set<TopicPartition> assignedTopicPartition = consumer.assignment();
		logger.info("topicInfo = {}", assignedTopicPartition);

컨슈머의 안전한 종료

  • 컨슈머 애플리케이션은 안전하게 종료되어야 한다.

    • 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃 이 발생할때까지 컨슈머 그룹에 남게된다.
    • 이로인해 실제로 종료되었지만 더는 동작을 하지 않는 컨슈머가 존재하기 때문에 파티션의 데이터는 소모되지 못하고 컨슈머 랙이 늘어나게 된다.
  • 컨슈머 랙이 일어나면 데이터 처리 지연이 발생하게 된다

  • 컨슈머를 안전하게 종료하기 위해 KafkaConsumer클래스는 wakeup() 메소드를 지원한다.

    • wakeup() 메소드가 실행된 이후 poll() 메소드가 호출되면 WakeupException 예외가 발생한다.
    • WakeupException 예외를 받은 뒤에는 데이터 처리를 위해 사용한 자원들을 해제하면 된다.
  • 마지막에는 close()메소드를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알려주면 종료가 완료되었다고 볼수 있다.

    		try{
    			while (true) {
    				ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    				for (ConsumerRecord<String, String> record : records) {
    					logger.info("{}", record.toString());
    				}
    				consumer.commitSync();
    			}
    		}catch (WakeupException we){
    			logger.warn("Wakeup consumer");
    		} finally{
    			consumer.close();
    		}
  • 그렇다면 wakeup() 메소드는 어디서 호출하면 될까?

    • 자바 어플리케이션의 경우 코드 내부에 셧다운 훅(shotdown hook)을 구현하여 안전한 종료를 명시적으로 구현할 수 있다

      셧다운 훅이란 사용자 또는 운영체제로부터 종료 요청을 받으면 실행하는 스레드를 뜻한다.

public class Consumer {
	private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
//...
  
  public static void main(String ...args){
    //...
    Runtime.getRuntime().addShutdownHook(new ShutdownThread());
    //...
    
  }
  //...
	static class ShutdownThread extends Thread{
		@Override
		public void run() {
			logger.info("Shutdown hook");
			consumer.wakeup();
			
		}
	}
  • 사용자는 안전한 종료를 위해 위 코드로 실행된 어플리케이션 kill -TERM {프로세스 번호}를 호출하여 셧다운 훅을 발생시킬 수 있다.

https://bigsun84.tistory.com/355 보면 kill에 대한 자세한 옵션 나와있음

어드민 API

실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는 것만큼 카프카에 설정된 내부 옵션을 확인하는 것도 중요함. 내부옵션을 가장확인하기 쉬운방법은 브로커 중 한대에 접속하여 카프카 브로커 옵션을 확인 하는 것이지만 매우 번거로운 작업

  • 카프카 커맨드 라인 인터페이스로 명령을 내려 확인하는 방법도 있지만 일회성 작업에 그침
  • 카프카 클라이언트에서는 내부 옵션들을 조정하거나 조회하기 위해 AdminClient 클래스를 제공
  • AdminClient 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.
    • 카프카 컨슈머를 멀티 스레드로 생성할때, 구독하는 토픽의 파티션 개수만큼 스레드를 생성하고 싶을때 스레드 생성전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올수 있다.
    • AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한 규칙 추가를 할 수 있다.
    • 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient클래스로 해당 토픽의 파티션을 늘릴 수 있다.
public class Admin {

	public static void main(String ...args){
		Properties properties = new Properties();
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

		AdminClient admin = AdminClient.create(properties);
		
	}

}
  • 프로듀서 API컨슈머 API와는 다르게 추가설정 없이 클러스터 정보에 대한 설정만 하면 된다.
  • create() 메소드로 KafkaAdminClient 를 반환받는다.
    • KafkaAdminClient는 브로커들의 옵션들을 확인, 설정할 수 있는 유틸 클래스다.
  • KafkaAdminClient의 주요메소드
메소드명설명
describeCluster(DescribeClusterOptions options)브로커의 정보조회
listTopics(ListTopicsOptions options)토픽 리스트 조회
listConsumerGroups(ListConsumerGroupsOptions options)컨슈머 그룹 조회
createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options )신규 토픽생성
createPartitions(Map<String, NewPartition> newPartitions, CreatePartitionsOptions options)파티션 개수 변경
createAcls(Collection<AclBinding> acls, CreateAclsOptions options)접근 제어 규칙 생성
  • 브로커 정보조회
  public class Admin {
  
  	static final Logger logger = LoggerFactory.getLogger(Admin.class);
  
  	public static void main(String ...args){
  		Properties properties = new Properties();
  		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  
  		AdminClient admin = AdminClient.create(properties);
  
  		logger.info("==GET Broker information");
  		try {
  			for (Node node : admin.describeCluster().nodes().get()){
  				logger.info("log={}",node);
  				ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
  				DescribeConfigsResult describeConfigsResult = admin.describeConfigs(Collections.singleton(configResource));
  				describeConfigsResult.all().get().forEach((broker,config)->{
  					config.entries().forEach(configEntry -> logger.info(configEntry.name() + "=" + configEntry.value()));
  				});
  			}
  		} catch (InterruptedException | ExecutionException e) {
  			e.printStackTrace();
  		}
  
  	}
  
  }
  • 결과

  • 토픽 정보조회
Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singleton("test")).all().get();

logger.info("{}",topicInformation);
  • 어드민 API 종료

    • 어드민 API는 사용하고 나면 명시적으로 종료 메소드를 호출하여 리소드가 낭비되지 않도록 한다.
    admin.close();
profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

0개의 댓글