4장. 자바 API를 사용하여 애플리케이션 만들기

문법식·2022년 8월 25일
0

4장도 3장과 마찬가지로 실습하는 장이었다. 메이븐을 설치하고 메이븐 프로젝트를 생성했다. 그리고 카프카의 자바 API를 이용하기 위해서 필요한 라이브러리가 있으므로 pom.xml에 의존관계를 추가하여 메이븐 빌드 정보를 작성하였다.

프로듀서

프로듀서 애플리케이션의 핵심 부분에 대해서 설명하겠다.

KafkaProducer 객체

Properties conf= new Properties();

conf객체의 setProperty 메서드로 KafkaProducer에 필요한 설정을 실습했다.

  • bootstrap.servers: 작성할 KafkaProducer가 접속하는 브로커의 호스트명과 포트 번호를 지정하고 있다.
  • key.serializer, value.serializer: 카프카에서는 모든 메시지가 직렬화된 상태로 전송된다. key.serializervalue.serializer는 이 직렬화 처리에 이용되는 시리얼라이저 클래스를 지정한다.

메시지 송신하기

ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);

KafkaProducer 객체를 사용하여 메시지를 송신한다. KafkaProducer를 이용하여 메시지를 보낼 때는 송신 메시지를 위와 같은 ProducerRecord라는 객체에 저장한다. 이때 메시지의 Key, Value 외에 송신처의 토픽도 함께 등록한다.

작성한 ProducerRecord 객체는 KafkaProducer 객체의 send 메서드로 송신된다. 예제 코드는 다음과 같은 특징이 있다.

  • 메시지를 송신하고 Ack를 받았을 때 실행할 작업(Callback) 등록
  • Callback 클래스에서 구현하고 있는 onCompletion 메서드에서는 송신을 완료했을 때 실행되어야 할 처리를 하고 있다.
  • KafkaProducer의 송신 처리는 비동기적으로 이루어지기 때문에 send 메서드를 호출했을 때 발생하지 않는다. send 메서드의 처리는 KafkaProducer의 송신 큐에 메시지를 넣을 뿐이다.
  • 송신 큐에 넣은 메시지는 사용자의 애플리케이션과는 다른 별도의 스레드에서 순차적으로 송신된다.
  • 메시지가 송신된 경우 카프카 클러스터에서 Ack가 반환된다. Callback 클래스의 메서드는 그 Ack를 수신했을 때 처리된다.

컨슈머

컨슈머 애플리케이션의 핵심 부분에 대해서 설명하겠다.

KafkaConsumer 객체

Properties conf= new Properties();

conf객체의 setProperty 메서드로 KafkaConsumer에 필요한 설정을 실습했다.

  • group.id: 작성할 KafkaConsumer가 속한 Consumer Group을 지정한다.
  • enable.auto.commit: 오프셋 커밋을 자동으로 실행할지의 여부를 지정한다. 여기에서는 수동으로 오프셋을 커밋하기 때문에 false로 했다.
  • key.deserializer, value.deserializer: 컨슈머의 사용자 처리에 전달되기 전에 실시되는 역직렬화 처리에 이용되는 역직렬화 클래스를 지정한다.

메시지를 수신하기

KafkaConsumer 객체를 이용하여 메시지를 수신한다. KafkaConsumer에서는 메시지를 수신하는 토픽을 구독할 필요가 있다. 아래의 예제 코드에서는 subscribe 메서드를 호출함으로써 실시하고 있다. subscribe 메서드에 전달하는 리스트에 여러 토픽을 등록함으로써 여러 토픽을 구독할 수도 있다.

consumer.subscribe(Collections.singletonList(topicName));

conumer 객체의 poll 메서드를 호출하여 메시지를 얻는다. 이때 메시지는 ConsumerRecords라는 객체로 전달된다. ConsumerRecords 객체에는 수신된 여러 메시지의 Key, Value, 타임스탬프 등 메타 데이터가 포함되어 있다.

예제 코드에서는 Manual Offset Commit을 하고 있기 때문에 애플리케이션에서 적절한 타이밍에 오프셋 커밋을 명시적으로 실행할 필요가 있다. 예제 코드에서는 하나의 메시지 처리가 완료될 때마다 오프셋을 커밋한다. Auto Offset Commit을 하는 설정의 경우 오프셋 커밋하는 예제 코드가 필요하지 않다.

profile
백엔드

0개의 댓글