[Kafka] 기본 프로듀서와 컨슈머

홍건의·2024년 9월 25일
0

카프카

목록 보기
2/2

로컬 환경에서 카프카 구동하기

  1. zookeeper 구동
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 브로커 구동
bin/kafka-server-start.sh config/server.properties
  1. 카프카 실행 확인
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

위 명령어로 브로커 정보(버전, 옵션 등)들을 확인할 수 있다.

  1. java 소스코드 작성 시 카프카 관련 라이브러리 당겨오기
    예제에서는 gradle을 이용했다. (Spring 프로젝트 x)

프로듀서

public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);
        producer.flush();
        producer.close();
    }

주요한 클래스는 KafkaProducerProducerRecord이다.
KafkaProducer를 생성할 때는

  • 브로커 정보
  • 레코드 키 직렬화 방식
  • 레코드 값 직렬화 방식

이 필수 설정으로 들어가며, Java에서는 Properties 클래스를 사용한다.

KafkaProducer

send()

즉시 전송이 아니라 모아뒀다가 배치로 전송한다.

flush()

버퍼에 존재하는 레코드를 모두 전송

close()

producer 인스턴스 리소스 종료

결과 확인

동기

RecordMetadata metadata = producer.send(record).get();

Future 클래스의 get()을 호출하고 RecordMetadata로 확인할 수 있다.

비동기

public class ProducerCallback implements Callback {
    private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null)
            logger.error(e.getMessage(), e);
        else
            logger.info(recordMetadata.toString());
    }
}

Callback 인터페이스를 구현함으로써 콜백함수를 작성한다.
그리고

producer.send(record, new ProducerCallback());

producer에서 전송할 때, 해당 콜백 클래스를 같이 넘겨준다.

어느 방식이든 보낸 레코드의 파티션 정보오프셋 정보를 알 수 있다.

컨슈머

 public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("record:{}", record);
            }
        }
    }

주요한 클래스는 KafkaConsumerConsumerRecord이다.
KafkaConsumer 생성할 때는

  • 브로커 정보
  • 레코드 키 직렬화 방식
  • 레코드 값 직렬화 방식

이 필수 설정으로 들어가며, Producer와 마찬가지로 정보를 전달할 때 Properties 클래스를 사용한다.

KafkaConsumer

subscribe(토픽 네임)

consumer.subscribe(Arrays.asList(TOPIC_NAME));

컨슈머에 토픽을 할당한다.

poll(시간)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

데이터를 가져온다. 인자로 들어가는 내용은 컨슈머 버퍼에 데이터를 기다리기 위한 시간.

commitSync()

consumer.commitSync();

poll 메소드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
커밋이 완료될 때까지 기다린다.

commitAsync()

consumer.commitAsync();
consumer.commitAsync(new OffsetCommitCallback() {
	public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
    	// 필요 작업 처리
    }
}	

정상적으로 커밋이 되었을 때, 콜백함수를 사용하여 후처리를 할 수도 있다.

리밸런스 리스너

ConsumerRebalanceListener 인터페이스의 onPartitionAssigned()와 onPartitionRevoked()를 이용

파티션 할당

직접 파티션을 지정하여 할당할 때는 assign()을 이용한다.

안전한 종료

정상적으로 종료되지 않을 경우, 세션 타임아웃이 발생할 때 까지 컨슈머 그룹에 남을 수 있다. 이 경우 컨슈머 랙이 늘어날 수 있다.
wakeup() 메소드를 통해 안전하게 종료할 수 있다.
셧다운 훅을 이용해 kill-TERM {프로세스 번호}를 호출함으로써 셧다운 훅을 발생시킨다.

그리고 이후에 close()를 통해 안전하게 종료되었음을 명시적으로 알려준다.

어드민 API

소스코드에서 AdminClient 클래스를 이용해 브로커 정보를 알아낼 수 있다.

Propertites configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(configs);
profile
Backend Developer

0개의 댓글

관련 채용 정보