Amazon SQS | Apache Kafka | RabbitMQ | |
---|---|---|---|
오픈소스 | O | O | |
브로커 구분 | 메시지 브로커 | 이벤트 브로커 | 메시지 브로커 |
동기/비동기 | 둘 다 가능 | 비동기 | 둘 다 가능 |
메시지 전달 보장 수준 | At least once(Standard) Exactly once (FIFO) | At most once At least once Exactly once | At most once At least once |
메시지 순서 보장 수준 | 최대한 보장 (Standard) 순서 보장 (FIFO) | 하나의 컨슈머 그룹 기준으로 파티션 내 메시지 순서 보장 | 하나의 큐에 하나의 컨슈머 연결시 순서 보장 |
성능 | 300TPS | 100000TPS | 20000TPS |
종류 | 의미 | 재전송 여부 | 중복 삭제 여부 | 비고 |
---|---|---|---|---|
At most once | 1회 전달 시도 | X | X | 메시지는 중복되지 않으나, 상실 가능성 |
At least once | 적어도 1회는 전달 | O | X | 메시지가 중복될 수 있으나, 상실되지는 않음 |
Exactly once | 1회만 전달 | O | O | 중복되거나 상실되지 않고, 확실히 메시지가 도달함. |
https://www.skyer9.pe.kr/wordpress/?p=1550
application.yml
. Bootstrap Server 지정spring:
kafka:
bootstrap-servers: localhost:9092
public class DiaryRequestDto {
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@ToString
public static class CreateRequest {
private String title;
private String content;
}
}
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DiaryMessageDto {
private Long diaryId;
private String originalContent;
private Long userSeq;
public static DiaryMessageDto of(Diary diary) {
return DiaryMessageDto.builder()
.diaryId(diary.getId())
.originalContent(diary.getOriginalContent())
.userSeq(1L)
.build();
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<Long, DiaryMessageDto> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<Long, DiaryMessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final static String TOPIC = "hello.kafka";
private final KafkaTemplate<Long, DiaryMessageDto> kafkaTemplate;
public void pubMessage(Diary diary) {
DiaryMessageDto message = DiaryMessageDto.of(diary);
kafkaTemplate.send(TOPIC, message);
}
}
@RequestMapping
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final KafkaProducer kafkaProducer;
@PostMapping("/publish")
public ResponseEntity<?> publishMessage(@RequestBody DiaryRequestDto.CreateRequest requestDto) {
Diary diary = Diary.builder()
.id(1L)
.title(requestDto.getTitle())
.originalContent(requestDto.getContent())
.shortContent("")
.selected(false)
.build();
kafkaProducer.pubMessage(diary);
return ResponseEntity.status(HttpStatus.OK).body("Published");
}
}
kafka-python
패키지 설치
$ pip install kafka-python
Consumer 생성
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
bootstrap_servers = [ 'localhost:9092' ],
value_deserializer = lambda m: json.loads(m.decode('ascii'))
)
consumer.subscribe([ 'hello.kafka' ])
for message in consumer:
print(message.value)
POST
요청다음의 값들은 코드상에 저장되는 등 Public으로 공개되면 안 되는 값들이므로, 실행 시 환경변수 또는 명령행 아규먼트로 전달되어야 한다.
application.yml
. AWS Credential과 리전, 엔드포인트 등을 지정한다.실행 시 환경 변수로 전달한다.
cloud:
aws:
credentials:
access-key: ${SQS_ACCESS_KEY}
secret-key: ${SQS_SECRET_KEY}
region:
static: ap-northeast-2
stack:
auto: false
sqs:
endpoint: ${SQS_ENDPOINT}
public class DiaryRequestDto {
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@ToString
public static class CreateRequest {
private String title;
private String content;
}
}
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DiaryMessageDto {
private Long diaryId;
private String originalContent;
private Long userSeq;
public static DiaryMessageDto of(Diary diary) {
return DiaryMessageDto.builder()
.diaryId(diary.getId())
.originalContent(diary.getOriginalContent())
.userSeq(1L)
.build();
}
}
@Configuration
public class AmazonSqsConfig {
@Value("${cloud.aws.credentials.access-key}")
private String accessKey;
@Value("${cloud.aws.credentials.secret-key}")
private String secretKey;
@Value(("${cloud.aws.region.static}"))
private String region;
@Primary
@Bean
public AmazonSQSAsync amazonSQSAsync() {
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(region)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();
}
}
@Service
@RequiredArgsConstructor
public class SqsSenderService {
@Value("${cloud.aws.sqs.endpoint}")
private String url;
private final ObjectMapper objectMapper;
private final AmazonSQS amazonSQS;
public SendMessageResult sendMessage(DiaryMessageDto message) throws JsonProcessingException {
SendMessageRequest sendMessageRequest = new SendMessageRequest(url, objectMapper.writeValueAsString(message));
return amazonSQS.sendMessage(sendMessageRequest);
}
}
@RequestMapping
@RequiredArgsConstructor
@RestController
public class SenderController {
private final SqsSenderService senderService;
@PostMapping("/send")
public ResponseEntity<?> publishMessage(@RequestBody DiaryRequestDto.CreateRequest requestDto) {
Diary diary = Diary.builder()
.id(1L)
.title(requestDto.getTitle())
.originalContent(requestDto.getContent())
.shortContent("")
.selected(false)
.build();
try {
senderService.sendMessage(DiaryMessageDto.of(diary));
} catch (JsonProcessingException jsonProcessingException) {
jsonProcessingException.printStackTrace();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Json Processing Exception");
}
return ResponseEntity.status(HttpStatus.OK).body("Send Success");
}
}
boto3 패키지 설치
$ pip install boto3
Consumer 생성
import boto3
import argparse
import json
region_name = 'ap-northeast-2'
def parse_args():
parser = argparse.ArgumentParser(description='Boto3 Example')
parser.add_argument('--access-key', type=str, help='AWS access key id')
parser.add_argument('--secret-key', type=str, help='AWS secret access key')
parser.add_argument('--queue_url', type=str, help='AWS SQS Endpoint')
return parser.parse_args()
def receive_message(sqs, queue_url):
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MessageAttributeNames=['All'],
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
if 'Messages' in response:
message = response['Messages'][0]
message_body = json.loads(message['Body'])
print('Received message: ', message_body)
# SQS 메시지 삭제
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
def main():
args = parse_args()
session = boto3.Session(
aws_access_key_id=args.access_key,
aws_secret_access_key=args.secret_key
)
# boto3 클라이언트 생성
sqs = session.client('sqs', region_name=region_name)
while True:
receive_message(sqs, queue_url=args.queue_url)
if __name__ == '__main__':
main()
Consumer 실행
$ python3 consumer.py --access-key $ACCESS_KEY --secret-key $SECRET_KEY --queue_url $QUEUE_URL
Sender Controller에 POST 요청
Amazon SQS 큐 메시지 확인
Consumer 확인