개인적인 토이 프로젝트로 Spring Webflux & Reactive Kafka를 활용하여 API를 통한 프로듀서와 컨슈머를 구성해보려 한다.
Bloking IO를 사용할때의 개발과는 전혀 달라서 익숙해지는데 꽤나 걸릴 듯 하다. Webflux의 라우팅 방식이 아닌 RestController를 사용하여 구성했다.
기본적인 구성도는 매우 간단하다. 단순히 RestController를 통해 들어온 메시지를 카프카로 전송하고 컨슈머는 카프카에서 메시지를 가져오기만 하는 굉장히 단순한 흐름이다. 단지 이 모든 과정을 논블로킹 I/O로 처리하는것이 핵심이다.
전체 소스 코드 : Github 링크
로컬환경이 아닌 실무환경과 비슷하게 외부에 도커를 이용하여 구성하였다. 주키퍼와 브로커는 각 1대씩으로 우선 구성하였고 2대 이상 클러스터는 차후 시간날때 구성하려 한다.
개인 홈서버에 구성하였고 파티션은 1개로 설정했다.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Controller를 통해 들어온 메시지를 카프카로 전송하기 위한 Bean 설정 및 옵션 설정이다. 현재 프로듀서와 컨슈머가 독립적인 프로젝트로 구성되어있기에 해당 설정에는 프로듀서용 설정만 해놓은 상태이다.
Spring Kafka의 설정과는 좀 달라진 부분들이 많다. 매우 기본적인 설정만 구성한 상태이고 이 외에도 레퍼런스를 찾아보면 다양하고 복잡한 설정들을 찾을 수 있다.
공식 레퍼런스 : 공식 레퍼런스 링크
/**
* Kafka 설정
*/
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties properties;
/******************************************************************
************************ Producer Options ************************
******************************************************************/
// 기본 설정들로 구성
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
// 프로듀서 옵션
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); // 전송 시간 제한을 1000ms로 설정
}};
}
}
KafkaService 클래스에 넘어온 AbstractKafkaMessage 클래스는 Controller를 통하여 요청받은 @RequestBody 데이터이다. 내부적인 5XX 서버 에러를 리턴해주고 싶지 않기 때문에 KafkaProduceResult 라는 클래스를 만들어 자체적으로 처리하도록 구성하려 한다. 즉 요청 측에서는 전송 결과의 StatusCode는 무조건 2XX로 받게 될것이고 내부적으로는 메시지 전송에 실패할 경우 NOSQL이나 기타 방법등을 활용하여 재전송 처리를 할수 있도록 하려 한다.
/**
* 프로듀싱 서비스
* Kafka 프로듀싱 전 로직 처리
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ProduceService {
private final KafkaService kafkaService;
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
return kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) {
failureMessageService.produceFailure();
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
}
}
실제 요청받은 메시지를 카프카로 보내는 코드이다. 실행해보면 100건, 1000건, 10000건이건 간에 싱글스레드로 처리되는데 이 부분을 멀티쓰레드로 돌리고 싶어 구글링을 열심히 해본 결과 Sender의 경우 애초에 싱글스레드로 돌아가도록 구현되어있다고 한다. 옵션에 스케쥴러도 다르게 지정해보고 삽질이란 삽질은 다해봤는데...
/**
* 카프카 서비스
* 실제 카프카로 메시지 프로듀싱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {
private final KafkaSender<String, Object> producer;
public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {
return producer.createOutbound()
// 지정된 토픽으로 메시지 전송
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
.then()
// 에러 없이 전송이 완료 되었을 경우
.thenReturn(new KafkaProduceResult(message))
// 에러가 발생했을 경우
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}
}
요청받은 메시지 매핑 및 전송 처리 결과를 리턴하기 위한 모델 클래스들이다. 위 Kafka관련 클래스들과는 연관 없는 클래스이다. 대충 토이프로젝트의 의도를 보여주기 위함이다.
대충 어노테이션을 보면 @RequstBody 를 이용해 매핑되는 클래스로 KafkaUriMessage 타입과 KafkaBodyMessage 타입 있다는 것을 알 수있다. 이는 혹시 전송실패할 경우 두개의 타입을 다르게 처리하려고 나눠놓은 것이다.
/**
* 카프카 메시지 베이스
* 프로듀서 내 에러 발생시 처리를 쉽게하기 위해 URI 형태와 Message 형태로 나눔
*/
@Getter
@Setter
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type",
defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
@JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaMessage {
// 요청 토픽
protected String topic;
// 메시지 타입 (uri , message)
protected ProduceMessageType type;
// 요청 시간
protected LocalDateTime requestedAt;
public abstract String getRequestedMessage();
}
마지막으로 전송결과가 매핑될 클래스이다. 요청 측에서는 해당 클래스의 내용에 따라 전송 성공 & 실패를 확인할 수 있다.
/**
* 카프카 메시지 전송결과 클래스
*/
@Getter
public class KafkaProduceResult {
// 메시지 전송 상태 - true : 전송완료, false : 전송실패
private Boolean status = true;
// 메시지 전송 토픽
private String topic;
// 요청받은 메시지 타입 (uri, message)
private ProduceMessageType messageType;
// 요청받은 메시지 - URI 또는 JSON String
private String requestedMessage;
// 에러 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
@JsonIgnore
private Throwable error = null;
// 에러 메시지 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
private String errorMessage = null;
// 메시지를 요청받은 시간
private LocalDateTime requestedAt;
// 메시지를 처리한 시간
private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaMessage message) {
this.setRequestedMessage(message);
}
public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
this.setRequestedMessage(message);
this.status = false;
this.error = e;
this.errorMessage = e.getMessage();
this.producedAt = null;
}
public Boolean hasError() {
return error != null;
}
private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
this.topic = requestedMessage.getTopic();
this.messageType = requestedMessage.getType();
this.requestedMessage = requestedMessage.getRequestedMessage();
this.producedAt = LocalDateTime.now();
this.requestedAt = requestedMessage.getRequestedAt();
}
}