Message Queue

Kim, Beomgoo·2023년 4월 30일
0

Message Queue 비교

SQS vs. Kafka vs. RabbitMQ

Amazon SQSApache KafkaRabbitMQ
오픈소스OO
브로커 구분메시지 브로커이벤트 브로커메시지 브로커
동기/비동기둘 다 가능비동기둘 다 가능
메시지 전달 보장 수준At least once(Standard)
Exactly once (FIFO)
At most once
At least once
Exactly once
At most once
At least once
메시지 순서 보장 수준최대한 보장 (Standard)
순서 보장 (FIFO)
하나의 컨슈머 그룹 기준으로 파티션 내 메시지 순서 보장하나의 큐에 하나의 컨슈머 연결시 순서 보장
성능300TPS100000TPS20000TPS

메시지 전달 보장 수준

종류의미재전송 여부중복 삭제 여부비고
At most once1회 전달 시도XX메시지는 중복되지 않으나, 상실 가능성
At least once적어도 1회는 전달OX메시지가 중복될 수 있으나, 상실되지는 않음
Exactly once1회만 전달OO중복되거나 상실되지 않고, 확실히 메시지가 도달함.

Apache Kafka

예시 코드

  • Producer. Java Application
  • Consumer. Python Application
  • JSON 메시지를 Pub/Sub 한다.

Producer

https://www.skyer9.pe.kr/wordpress/?p=1550

  • application.yml. Bootstrap Server 지정
    spring:
      kafka:
        bootstrap-servers: localhost:9092
  • DTO
    • Request DTO. Controller에 요청 시 Request Body의 내용.
      public class DiaryRequestDto {
      
          @Getter
          @NoArgsConstructor
          @AllArgsConstructor
          @Builder
          @ToString
          public static class CreateRequest {
      
              private String title;
              private String content;
          }
      }
    • Message DTO. Publish할 때 JSON Serialize하여 전송된다.
      @Getter
      @Builder
      @NoArgsConstructor
      @AllArgsConstructor
      public class DiaryMessageDto {
      
          private Long diaryId;
          private String originalContent;
          private Long userSeq;
      
          public static DiaryMessageDto of(Diary diary) {
              return DiaryMessageDto.builder()
                      .diaryId(diary.getId())
                      .originalContent(diary.getOriginalContent())
                      .userSeq(1L)
                      .build();
          }
      }
  • Kafka Producer Configuration
    @Configuration
    public class KafkaProducerConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public ProducerFactory<Long, DiaryMessageDto> producerFactory() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
            return new DefaultKafkaProducerFactory<>(configs);
        }
    
        @Bean
        public KafkaTemplate<Long, DiaryMessageDto> kafkaTemplate() {
    
            return new KafkaTemplate<>(producerFactory());
        }
    }
  • Producer Service
    @Service
    @RequiredArgsConstructor
    public class KafkaProducer {
    
        private final static String TOPIC = "hello.kafka";
        private final KafkaTemplate<Long, DiaryMessageDto> kafkaTemplate;
    
        public void pubMessage(Diary diary) {
            DiaryMessageDto message = DiaryMessageDto.of(diary);
    
            kafkaTemplate.send(TOPIC, message);
        }
    }
  • Rest Controller
    @RequestMapping
    @RequiredArgsConstructor
    @RestController
    public class ProducerController {
    
        private final KafkaProducer kafkaProducer;
    
        @PostMapping("/publish")
        public ResponseEntity<?> publishMessage(@RequestBody DiaryRequestDto.CreateRequest requestDto) {
            Diary diary = Diary.builder()
                    .id(1L)
                    .title(requestDto.getTitle())
                    .originalContent(requestDto.getContent())
                    .shortContent("")
                    .selected(false)
                    .build();
    
            kafkaProducer.pubMessage(diary);
    
            return ResponseEntity.status(HttpStatus.OK).body("Published");
        }
    }

Consumer

  1. kafka-python 패키지 설치

    $ pip install kafka-python
  2. Consumer 생성

    from kafka import KafkaConsumer
    import json
    
    consumer = KafkaConsumer(
        bootstrap_servers = [ 'localhost:9092' ],
        value_deserializer = lambda m: json.loads(m.decode('ascii'))
    )
    
    consumer.subscribe([ 'hello.kafka' ])
    
    for message in consumer:
        print(message.value)

실행 확인

  1. Producer Controller에 POST 요청
    image
  2. Consumer 확인
    image

Amazon SQS

예시 코드

다음의 값들은 코드상에 저장되는 등 Public으로 공개되면 안 되는 값들이므로, 실행 시 환경변수 또는 명령행 아규먼트로 전달되어야 한다.

  • AWS Access Key
  • AWS Secret Key
  • Amazon SQS Queue Endpoint

Producer

  • application.yml. AWS Credential과 리전, 엔드포인트 등을 지정한다.
    • 실행 시 환경 변수로 전달한다.

      cloud:
        aws:
          credentials:
            access-key: ${SQS_ACCESS_KEY}
            secret-key: ${SQS_SECRET_KEY}
          region:
            static: ap-northeast-2
          stack:
            auto: false
          sqs:
            endpoint: ${SQS_ENDPOINT}
  • DTO
    • Request DTO. Controller에 요청 시 Request Body의 내용.
      public class DiaryRequestDto {
      
          @Getter
          @NoArgsConstructor
          @AllArgsConstructor
          @Builder
          @ToString
          public static class CreateRequest {
      
              private String title;
              private String content;
          }
      }
    • Message DTO. Publish할 때 JSON Serialize하여 전송된다.
      @Getter
      @Builder
      @NoArgsConstructor
      @AllArgsConstructor
      public class DiaryMessageDto {
      
          private Long diaryId;
          private String originalContent;
          private Long userSeq;
      
          public static DiaryMessageDto of(Diary diary) {
              return DiaryMessageDto.builder()
                      .diaryId(diary.getId())
                      .originalContent(diary.getOriginalContent())
                      .userSeq(1L)
                      .build();
          }
      }
  • Amazon SQS Config.
    @Configuration
    public class AmazonSqsConfig {
    
        @Value("${cloud.aws.credentials.access-key}")
        private String accessKey;
    
        @Value("${cloud.aws.credentials.secret-key}")
        private String secretKey;
    
        @Value(("${cloud.aws.region.static}"))
        private String region;
    
        @Primary
        @Bean
        public AmazonSQSAsync amazonSQSAsync() {
            BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
    
            return AmazonSQSAsyncClientBuilder.standard()
                    .withRegion(region)
                    .withCredentials(new AWSStaticCredentialsProvider(credentials))
                    .build();
        }
    }
  • Sender Service.
    @Service
    @RequiredArgsConstructor
    public class SqsSenderService {
    
        @Value("${cloud.aws.sqs.endpoint}")
        private String url;
    
        private final ObjectMapper objectMapper;
        private final AmazonSQS amazonSQS;
    
        public SendMessageResult sendMessage(DiaryMessageDto message) throws JsonProcessingException {
            SendMessageRequest sendMessageRequest = new SendMessageRequest(url, objectMapper.writeValueAsString(message));
    
            return amazonSQS.sendMessage(sendMessageRequest);
        }
    }
  • Rest Controller.
    @RequestMapping
    @RequiredArgsConstructor
    @RestController
    public class SenderController {
    
        private final SqsSenderService senderService;
    
        @PostMapping("/send")
        public ResponseEntity<?> publishMessage(@RequestBody DiaryRequestDto.CreateRequest requestDto) {
            Diary diary = Diary.builder()
                    .id(1L)
                    .title(requestDto.getTitle())
                    .originalContent(requestDto.getContent())
                    .shortContent("")
                    .selected(false)
                    .build();
    
            try {
                senderService.sendMessage(DiaryMessageDto.of(diary));
            } catch (JsonProcessingException jsonProcessingException) {
                jsonProcessingException.printStackTrace();
    
                return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Json Processing Exception");
            }
    
            return ResponseEntity.status(HttpStatus.OK).body("Send Success");
        }
    }

Consumer

  1. boto3 패키지 설치

    $ pip install boto3
  2. Consumer 생성

    import boto3
    import argparse
    import json
    
    region_name = 'ap-northeast-2'
    
    def parse_args():
        parser = argparse.ArgumentParser(description='Boto3 Example')
        parser.add_argument('--access-key', type=str, help='AWS access key id')
        parser.add_argument('--secret-key', type=str, help='AWS secret access key')
        parser.add_argument('--queue_url', type=str, help='AWS SQS Endpoint')
    
        return parser.parse_args()
    
    def receive_message(sqs, queue_url):
        response = sqs.receive_message(
            QueueUrl=queue_url,
            AttributeNames=['All'],
            MessageAttributeNames=['All'],
            MaxNumberOfMessages=1,
            WaitTimeSeconds=20
        )
        if 'Messages' in response:
            message = response['Messages'][0]
            message_body = json.loads(message['Body'])
            print('Received message: ', message_body)
            # SQS 메시지 삭제
            sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
    
    def main():
        args = parse_args()
    
        session = boto3.Session(
            aws_access_key_id=args.access_key,
            aws_secret_access_key=args.secret_key
        )
    
        # boto3 클라이언트 생성
        sqs = session.client('sqs', region_name=region_name)
    
        while True:
            receive_message(sqs, queue_url=args.queue_url)
    
    if __name__ == '__main__':
        main()

실행 확인

  1. Consumer 실행

    $ python3 consumer.py --access-key $ACCESS_KEY --secret-key $SECRET_KEY --queue_url $QUEUE_URL
  2. Sender Controller에 POST 요청
    image

  3. Amazon SQS 큐 메시지 확인
    image

  4. Consumer 확인
    image

profile
하나에 하나를 보탠다

0개의 댓글