4장도 3장과 마찬가지로 실습하는 장이었다. 메이븐을 설치하고 메이븐 프로젝트를 생성했다. 그리고 카프카의 자바 API
를 이용하기 위해서 필요한 라이브러리가 있으므로 pom.xml
에 의존관계를 추가하여 메이븐 빌드 정보를 작성하였다.
프로듀서 애플리케이션의 핵심 부분에 대해서 설명하겠다.
Properties conf= new Properties();
conf
객체의 setProperty
메서드로 KafkaProducer
에 필요한 설정을 실습했다.
bootstrap.servers
: 작성할 KafkaProducer
가 접속하는 브로커의 호스트명과 포트 번호를 지정하고 있다.key.serializer
, value.serializer
: 카프카에서는 모든 메시지가 직렬화된 상태로 전송된다. key.serializer
와 value.serializer
는 이 직렬화 처리에 이용되는 시리얼라이저 클래스를 지정한다.ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);
KafkaProducer
객체를 사용하여 메시지를 송신한다. KafkaProducer
를 이용하여 메시지를 보낼 때는 송신 메시지를 위와 같은 ProducerRecord
라는 객체에 저장한다. 이때 메시지의 Key, Value 외에 송신처의 토픽도 함께 등록한다.
작성한 ProducerRecord
객체는 KafkaProducer
객체의 send
메서드로 송신된다. 예제 코드는 다음과 같은 특징이 있다.
Ack
를 받았을 때 실행할 작업(Callback
) 등록Callback
클래스에서 구현하고 있는 onCompletion
메서드에서는 송신을 완료했을 때 실행되어야 할 처리를 하고 있다.KafkaProducer
의 송신 처리는 비동기적으로 이루어지기 때문에 send
메서드를 호출했을 때 발생하지 않는다. send
메서드의 처리는 KafkaProducer
의 송신 큐에 메시지를 넣을 뿐이다.Ack
가 반환된다. Callback
클래스의 메서드는 그 Ack
를 수신했을 때 처리된다.컨슈머 애플리케이션의 핵심 부분에 대해서 설명하겠다.
Properties conf= new Properties();
conf
객체의 setProperty
메서드로 KafkaConsumer
에 필요한 설정을 실습했다.
group.id
: 작성할 KafkaConsumer
가 속한 Consumer Group
을 지정한다.enable.auto.commit
: 오프셋 커밋을 자동으로 실행할지의 여부를 지정한다. 여기에서는 수동으로 오프셋을 커밋하기 때문에 false
로 했다.key.deserializer
, value.deserializer
: 컨슈머의 사용자 처리에 전달되기 전에 실시되는 역직렬화 처리에 이용되는 역직렬화 클래스를 지정한다.KafkaConsumer
객체를 이용하여 메시지를 수신한다. KafkaConsumer
에서는 메시지를 수신하는 토픽을 구독할 필요가 있다. 아래의 예제 코드에서는 subscribe
메서드를 호출함으로써 실시하고 있다. subscribe
메서드에 전달하는 리스트에 여러 토픽을 등록함으로써 여러 토픽을 구독할 수도 있다.
consumer.subscribe(Collections.singletonList(topicName));
conumer
객체의 poll
메서드를 호출하여 메시지를 얻는다. 이때 메시지는 ConsumerRecords
라는 객체로 전달된다. ConsumerRecords
객체에는 수신된 여러 메시지의 Key
, Value
, 타임스탬프 등 메타 데이터가 포함되어 있다.
예제 코드에서는 Manual Offset Commit
을 하고 있기 때문에 애플리케이션에서 적절한 타이밍에 오프셋 커밋을 명시적으로 실행할 필요가 있다. 예제 코드에서는 하나의 메시지 처리가 완료될 때마다 오프셋을 커밋한다. Auto Offset Commit
을 하는 설정의 경우 오프셋 커밋하는 예제 코드가 필요하지 않다.