Apache Kafka with. docker-compose

Sei Kim·2024년 1월 20일
0

Kafka

목록 보기
1/1
post-thumbnail
post-custom-banner

들어가며


현재 급하게 Spring Cloud를 사용하여 MSA 환경을 구성하고 있습니다.
그런데 데이터 동기화를 위한 메시지큐가 필요하다고 느끼게 되었고 도입하는 과정을 그리려고 합니다.

처음에는 설치부터 시작하지만 이후에는 카프카에 대해서 더 자세히 알아보도록 하겠습니다.

1. docker-compose


version: "3" # docker-compose 버전 지정
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

먼저 로컬 환경에서 테스트 하기 위해서 localhost로 지정하 두고 작성을 하였습니다. 해당 파일이 있는 경로에서 docker-compose up -d를 사용하여 실행합니다.

⚠️ Warning

현재 mac M1 에서 진행중이여서 주키퍼 이미지와의 OS가 일치하지 않는 현상이 발생하였습니다.
추후 실제 배포 환경에서는 EC2 환경에서 진행할 예정이므로 내용을 추가하도록 하겠습니다.

2. 사용하기


docker-compose 로 설치를 하였습니다. 이번에는 카프카에 접속한 후 토픽을 생성하고 조회와 같은 작업들을 해보도록 하겠습니다.

2.1. 토픽 생성하기


# Kafka 접속
docker exec -it <컨테이너> bash

# Kafka 버전 확인
kafka-topics.sh --version 

## Result ---> 2.8.1 (Commit:839b886f9b732b15)

# 토픽 생성
kafka-topics.sh --create \
--topic sample_topic_1 \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1

## Result ---> Created topic sample_topic_1.

# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092

## Result ---> sample_topic_1
사진으로 보기

2.2. producer/consumer


이제 위에서 생성한 토픽을 가지고 활용해보도록 하겠습니다.

# Producer
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test_topic

# Consumer
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test_topic

실제로 통신을 하는 내용을 확인할 수 있습니다.

3. Spring Boot 연동


이번에는 Spring Boot 와 연동을 해보도록 하겠습니다.

3.1. Dependency


// build.gradle
dependencies {
	...
    // https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
    implementation 'org.springframework.kafka:spring-kafka:3.1.1'
}

먼저 Dependency(Spring Kafka Support) 를 프로젝트에 추가하도록 하겠습니다.

3.2. Consumer/Producer Config


@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  // Kafka 주소
        properties.put(GROUP_ID_CONFIG, "consumerGroupId");  // Consumer 들을 그룹으로 묶을 수 있다.
        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

위와 같이 KafkaConsumer 설정들을 지정하였습니다.

  • ConsumerFactory 에서는 "토픽"에 접속하기 위한 설정 파일들을 담아두기 위해 Map을 사용하여 만듭니다.
  • ConcurrentKafkaListenerContainerFactory 에서는 "토픽"의 변경사항을 파악할 수 있는 리스너를 만듭니다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  // Kafka 주소
        properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

이번에는 Producer 를 위한 설정 파일을 만들어 보았습니다.

  • producerFactory 에서는 "토픽"에 접속하기 위한 설정 파일들을 담아두기 위해 Map을 사용하여 만듭니다. consumer 와 많이 비슷합니다.
  • kafkaTemplate 에서는 내용을 보내기 위한 빈을 등록하였습니다.

3.3. Consumer/Producer


@RequiredArgsConstructor
@Service
@Slf4j
public class KafkaConsumer {
    private final CatalogRepository repository;

    @Transactional
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQuantity(String kafkaMessage) {
        log.info("Kafka Message : -> [{}]", kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
            });
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogJpaEntity catalogJpaEntity = repository.findByProductId((String) map.get("productId"));

        if (catalogJpaEntity != null) {
            catalogJpaEntity.setStock(catalogJpaEntity.getStock() - (Integer) map.get("quantity"));
        }
    }
}

@KafkaListener를 사용하여 해당 토픽에 구독합니다. 해당 토픽에 메시지가 쌓이면 해당 메서드가 실행됩니다.

@RequiredArgsConstructor
@Service
@Slf4j
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderVo send(String topic, OrderVo orderVo) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonIsString = "";
        try {
            jsonIsString = mapper.writeValueAsString(orderVo);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, jsonIsString);
        log.info("Kafka Producer sent data from the Order microservice: [{}]", orderVo);
        return orderVo;
    }
}

@Bean으로 등록한 KafkaTemplate 를 사용하여 메시지를 전송하는 기능입니다. 객체를 문자열로 변환 한 뒤 해당 토픽으로 전송합니다.

Producer

Consumer

정상적으로 메시지를 구독하고 전송하는 것을 확인할 수 있습니다.

Ref


  1. MSA- 메시지큐 - Kim Hyen Su
post-custom-banner

0개의 댓글