Kafka Streams Application with Spring Cloud - 2. 카프카 환경 구성하기

라빈·2022년 10월 2일
0

이전 글에 이어서 Kafka Streams Application을 만드는 과정을 진행해보겠습니다. 이번 장에서는 카프카 스트림즈가 수신할 토픽을 발행하는 카프카 프로듀서 애플리케이션을 구성해보도록 하겠습니다.

이번에 작성한 모듈에 추가한 기술 스택은 아래와 같습니다.

  • spring-boot-starter-web
  • spring-kafka

코드는 아래 깃헙에서 확인해 보실 수 있습니다 :)

로컬에 카프카 구성하기 with Docker

이 예제에서는 가상의 배달 도메인 데이터를 구성해서 해당 데이터를 이용해 현실에 적용 가능한 여러 요구사항을 카프카 스트림즈를 사용해서 구현합니다. 이때 배달 도메인의 특성 때문에 분산 메시징 시스템을 카프카를 사용합니다.

  • 배달 도메인의 경우 이벤트 순서를 보장할 수 있어야 합니다. 하나의 특정 배달건에 대해서 여러 상태의 이벤트가 발생할 수 있는데 비즈니스 로직에 적합한 이벤트 순서대로 처리할 수 있어야 합니다. 만약 이벤트 순서가 달라졌을 경우 이전 상태의 이벤트를 먼저 처리할 수 있도록 재시도를 해야합니다.
  • 특정 시간대에 높은 처리량을 보장할 수 있어야 합니다. 예를 들어, 음식 배달을 수행한다고할 때 점심 시간, 저녁 시간 등에 이벤트 처리량이 더 많아질 것입니다.
  • 공통 속성을 갖는 이벤트끼리 묶어서 비즈니스 요구사항을 처리할 수 있는 새로운 데이터로 구성하거나 특정 지표를 만들 수 있어야 합니다. 배달의 진행 현황 노출, 진행 중인 배달 건수 집계 등의 요구사항이 있을 수 있습니다. 이때 데이터 가공을 위해 특정 조건을 만족하는 이벤트끼리 묶을 수 있어야 합니다.

보통 애플리케이션 환경에 메시징 시스템을 구성할 경우일반적으로 AWS SQS를 떠올리실 겁니다. 하지만 SQS의 경우 수신한 메시지의 순서를 보장할 수 없습니다. 이것을 해결하기 위해 FIFO queue를 사용하게 된다면 처리량에 제한이 있을 수 있습니다. 그래서 이런 문제를 해결하기 위해 카프카를 선택하게 되었습니다. 카프카를 사용하면 키값을 이용해 특정 이벤트를 같은 파티션에 담아서 처리할 수도 있습니다.

위와 같은 이유로 카프카를 선택하게 되었고 이제 로컬에서 사용할 카프카 환경을 docker를 이용해 구성해보도록 하겠습니다.

docker-compose 구성하기

먼저 카프카를 로컬에서 사용하기 위해 아래와 같이 docker-compose를 작성합니다.

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

여기서 카프카가 의존성을 갖고 있는 Zookeeper의 역할에 대해서 간략하게 알아보겠습니다. Zookeeper는 여러 카프카 브로커와 묶여서 클러스터를 형성합니다. Zookeeper는 카프카 브로커들에게 클러스터링 서비스를 제공하고 카프카 브로커는 데이터 스트림과 클라이언트의 연결을 처리하게 됩니다. 실제 운영 환경에는 카프카 브로커를 한 대만 구성하는 것이 아닌 여러 대를 운영하게 됩니다. Zookeeper는 브로커를 관리하는 service registry 역할을 하고 브로커의 리더를 선출하는 역할을 담당합니다. 따라서 카프카 환경을 구성할 때 필수로 같이 구성해야 합니다.

카프카 토픽 생성

다음으로 우리가 사용할 토픽을 생성해보도록 하겠습니다. 먼저 토픽을 생성하기 위해 docker container에 직접 접속해서 토픽을 생성하겠습니다.

아래 명령어를 이용해서 로컬에 구성된 카프카 컨테이너에 접속합니다.

docker exec -it ${kafka container name or container id} bash

다음 명령어를 이용해 카프카에 토픽을 생성하고 확인할 수 있습니다.

# 카프카 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 --create \
--partitions 3 \
--replication-factor 1 \
--topic delivery

# 카프카 토픽 확인
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 토픽 데이터 확인
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic delivery \
--property print.key=true \
--property key.separator="-" \
--group console-group \
--from-beginning

# 토픽 옵션 확인 -> topic 이름 / id, replication factor, configs 등을 확인
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topics-with-overrides

위의 내용 중 카프카 토픽 생성 시 replication-factor 1 옵션에 대해서 간략하게 알아보겠습니다. 위 옵션은 토픽을 몇개의 브로커에서 관리할것인가에 대한 옵션입니다. 예를 들어, 카프카 브로커 3대를 구성해 클러스터를 구성하고 토픽에 대해 해당 옵션을 2로 준다면 2대의 브로커가 해당 토픽을 동일하게 저장하게 됩니다. 이번 예제에서는 카프카 브로커를 1대만 구성했기 때문에 해당 옵션을 1로 설정하였습니다.

위와 같이 구성이 끝났다면 카프카 토픽을 발행할 준비가 완료되었습니다. 다음으로는 카프카 토픽을 발행하는 프로듀서 애플리케이션을 구성해보겠습니다.

Kafka Producer Application

간단한 스프링 부트 애플리케이션을 로컬에 구성해서 API 호출을 통해 카프카 이벤트를 발행해보도록 하겠습니다. API에 이벤트에 대한 정보를 요청값으로 보내면 그대로 이벤트로 구성해 카프카로 발행하는 단순한 구조로 설계해보도록 하겠습니다.

API 처리 클래스 작성

아래 처럼 컨트롤러, 서비스, 요청 클래스를 구성해줍니다. 로컬에서 실행하는 단순한 예제 코드여서 카프카로 이벤트 발행 시 예외 처리는 수행하지 않았습니다.

@RequestMapping("/kafka")
@RestController
public class KafkaTopicController {

    private final DeliveryKafkaProducer deliveryKafkaProducer;

    public KafkaTopicController(DeliveryKafkaProducer deliveryKafkaProducer) {
        this.deliveryKafkaProducer = deliveryKafkaProducer;
    }

    @PostMapping("/send")
    public void sendTopic(@RequestBody DeliveryRequest request) throws ExecutionException, InterruptedException {
        deliveryKafkaProducer.send(request);
    }
}

@Service
public class DeliveryKafkaProducer {

    private static final String TOPIC = "delivery";

    private final KafkaTemplate<String, DeliveryEvent> kafkaTemplate;

    public DeliveryKafkaProducer(KafkaTemplate<String, DeliveryEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(DeliveryRequest request) throws ExecutionException, InterruptedException {
        DeliveryEvent event = DeliveryEvent.create(request.getId(), request.getServiceDeliveryType(), request.getServiceDeliveryId());
        kafkaTemplate.send(TOPIC, event.getId(), event)
                .get();
    }
}

@Getter
public class DeliveryRequest {

    private String id;
    private String serviceDeliveryType;
    private String serviceDeliveryId;

    private DeliveryRequest() {
    }
}

이때 DeliveryKafkaProducer.send()를 호출했을 때 kafkatemplate.send().get()을 통해서 이벤트 발행을 동기적으로 수행하고 있습니다.

다음으로 카프카 프로듀서 설정 값을 작성합니다. 이 값은 application.yml에 작성합니다.

spring:
  kafka:
    # https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.2.1/reference/html/spring-cloud-stream-binder-kafka.html#_setting_up_bootstrap_server_configuration
    bootstrap-servers: http://localhost:9092
    producer:
      acks: 1
      batch-size: 1000000
      key-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        linger.ms: 100

카프카는 메시지를 발행할 때 프로듀서의 linger.ms 시간 만큼 또는 지정한 버퍼 사이즈가 모두 찼을 때 배치로 발행하게 됩니다. 로컬에서 테스트 예제를 위해 linger.ms 값을 100으로 설정해 kafkaTemplate으로 메시지 전송시 버퍼가 다 차지 않아도 0.1초 후에 메시지를 전송하도록 설정했습니다.

이때 DeliveryKafkaProducer 클래스가 KafkaTemplate을 주입받아서 사용하고 있는데 이번에 작성한 모듈에는 KafkaTemplate을 빈으로 등록하지 않았습니다. 그렇다면 KafkaTemplate을 어떻게 빈으로 자동으로 등록할 수 있었는지에 대해서 알아보겠습니다.

KafkaTemplate AutoConfiguration

이번에 작성한 모듈은 spring-boot-starter-web 의존성을 갖고 있습니다. 이 의존성 안에는 spring-boot-starter가 들어있고 여기 안에는 또 spring-boot-autoconfigure 라는 의존성이 들어있습니다. 바로 spring-boot-autoconfigure 의존성이 우리가 사용하는 다양한 외부 기술에 대해 스프링이 제공하는 기술 구현체를 자동으로 설정해주는 라이브러리입니다. 예를 들어, 지금 사용하고 있는 카프카 뿐만 아니라 flyway, GraphQL, thymeleaf 등 정말 다양한 라이브러리에 대한 구현체를 제공해주고 있습니다.

spring-boot-autoconfigure가 어떻게 다양한 @AutoConfiguration 중에 특정 기술에 대해서만 빈으로 등록해주는지에 대해 조금 더 설명을 해보겠습니다. 해당 라이브러리가 제공해주는 패키지 중 이번에 사용하는 kafka 패키지를 살펴보겠습니다.

다른 구현체들도 위와 같이 제공이 되는데요. 여기서 우리가 확인해봐야 할 부분은 ~AutoConfiguration 이라고 이름지어진 클래스들 입니다. 그럼 KafkaAutoConfiguration 클래스를 확인해 보겠습니다.

@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

	private final KafkaProperties properties;

	public KafkaAutoConfiguration(KafkaProperties properties) {
		this.properties = properties;
	}

	@Bean
	@ConditionalOnMissingBean(KafkaTemplate.class)
	public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
			ProducerListener<Object, Object> kafkaProducerListener,
			ObjectProvider<RecordMessageConverter> messageConverter) {
		PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
		KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
		messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
		map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
		map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
		map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
		return kafkaTemplate;
	}
    
    // ... 이하 구체적인 내용 생략
}

클래스 레벨에 설정된 어노테이션을 보면 @ConditionalOnClass(KafkaTemplate.class) 라는 어노테이션을 확인할 수 있습니다. 이 어노테이션의 동작 방식은 어노테이션의 value에 지정한 클래스가 classpath에 존재한다면 내부에 작성한 메서드들을 실행합니다. 그리고 내부에서 반환하는 kafkaTemplate에는 @ConditionalOnMissingBean(KafkaTemplate.class) 어노테이션이 설정되어있습니다. 이 어노테이션은 value에 지정한 클래스가 빈으로 등록되지 않았을 때 실행됩니다. 이 두 가지 어노테이션의 조합으로 인해 다음 조건으로 자동으로 KafkaTemplate을 빈으로 등록해주게 됩니다. (위 어노테이션들의 자세한 동작 방식은 링크를 참고해주세요.)

  1. KafkaTemplate 클래스가 classpath에 존재할 때(이 뜻은 우리가 만드는 애플리케이션이 kafka 의존성을 갖는가 입니다. 이번 모듈에 우리가 spring-kafka를 추가했기 때문에 이 조건을 만족합니다.)
  2. 직접 KafkaTemplate을 빈으로 등록하지 않았을 때

따라서 우리는 KafkaTemplate을 직접 빈으로 설정하지 않아도 스프링 컨텍스트가 관리해주는 빈을 주입받을 수 있습니다. 그대신 KafkaTemplate의 설정을 위해 KafkaProperties가 필요한 값들을 application.yml에 지정해주면 됩니다.

정리

지금까지 실습한 내용을 정리해보면 다음과 같습니다.

  • docker를 이용해 로컬 카프카 구성하기
  • 카프카로 이벤트를 전송하는 단순한 서버 구축하기

다음에는 이 프로듀서를 활용해 이벤트를 직접 발행하고 이벤트를 수신하는 카프카 스트림즈 애플리케이션을 본격적으로 구성해보도록 하겠습니다.

참고

profile
작은 개발지식부터 공유해요 :)

0개의 댓글