타 팀과의 데이터를 주고받기 위해서 Message Queue가 필요해서 어떤 것을 사용할까 고민을 하다가 익숙한 AWS 에서 제공해주는 SNS, SQS 를 사용하기로 결정했다.
따로 Kafka를 사용하기에는 리소스도 많이 사용해야하고 AWS SNS, SQS 를 사용하면 전반적인 서비스의 관리를 AWS 에서 해주기 때문에 훨씬 도입하여 사용하기에 편할 것으로 판단했다.
실제 Message Queue 용도로만 사용하기 위해서는 SQS 만을 사용하면 되지만 나중을 고려하여 Topic기능을 사용하기 위해서 SNS를 같이 조합하기로 결정했다.
AWS SNS 란 아마존에서 관리하고 있는 PUB/SUB 메시지 전달 서비스이다. SNS 서비스는 Amazon Kinesis Data Firehose, Amazon SQS, AWS Lambda, HTTP, email, mobile push notifications, and mobile text messages (SMS). 에서 구독하여서 사용할 수 있다.
SQS(Amazon Simple Queue Service)는 분산 애플리케이션 구성 요소 간의 분리된 통신을 가능하게 하는 완전 관리형 메시지 큐 서비스이다. 분산 아키텍처의 구성 요소 간에 메시지를 일시적으로 저장할 수 있는 메시지 버퍼를 제공하여 구성 요소를 분리하고 시스템의 전반적인 안정성과 확장성을 향상시킬 수 있다.
SQS을 사용하면 메시지를 손실하거나 다른 서비스를 사용할 필요 없이 소프트웨어 구성 요소 간에 어떤 규모로든 메시지를 보내고, 저장하고, 받을 수 있다는 장점이 있고 이를 통해 증가하는 수요를 처리할 수 있도록 확장할 수 있는 분리된 시스템을 구축하는 데 좋은 솔루션이 될 수 있다.
SQL은 표준 대기열과 FIFO(First-In-First-Out) 대기열을 모두 지원하며, 다양한 수준의 전송이 보장된다.
sns topic을 만들면 다음과 같은 설정을 하게 된다. 어떤 타입의 topic을 만들지를 선정할 수 있다. message 의 선입선출이 중요하다면 FIFO를 순서는 중요하지 않고 높은 throughput이 중요하다면 Standard를 설정하면 된다.
FIFO를 설정하게 되면 Subscriber 는 SQS로 고정된다.
Topic을 만들고 구독을 추가하게 되면
Topic ARN 에 설정할 수 있는 sqs (fifo) 가 나오고 Protocol은 Amazon SQS만 가능하게 돼있다.
여기서 사용할 수 있는 두 가지 기능이 있는데 하나는 Filter policy 와 Redrive policy 이다.
Filter policy 는 message attribute, message body등에 있는 값을 이용하여 filtering을 제공하는 기능이다. 조건을 만족해야만 subscriber에게 message가 전달되게 된다.
Redrive policy는 message 가 제대로 전달이 되지 못하는 상황이 발생했을 때 message를 전달할 수 있는 다른 sqs 를 설정하는 기능이다. 일종의 dead-letter queue
SNS의 Topic을 구독하는 Subscriber 는 Filter Policy를 적용할 수 있다. 이를 통해서 특정 message 에 대해서만 읽는 등의 설정을 할 수 있다.
Filtering을 설정하는데에 두 가지 방법이 있다.
MessageAttributes: Message attribute 값을 통해서 필터링을 하는 방식으로 default 로 사용되는 방식이다.
MessageBody: Message body 값을 통해서 필터링을 하는 방식이다.
MessageAttributes 는 다음과 같은 형태로 구성된다.
Name, type 그리고 value 값은 모두 빈값이거나 null값이 될 수 없다.
e.g.)
{
"Type": "Notification",
"MessageId": "a1b2c34d-567e-8f90-g1h2-i345j67klmn8",
"TopicArn": "arn:aws:sns:us-east-2:123456789012:MyTopic",
"Message": "message-body-with-transaction-details",
"Timestamp": "2019-11-03T23:28:01.631Z",
"SignatureVersion": "4",
"Signature": "signature",
"UnsubscribeURL": "unsubscribe-url",
"MessageAttributes": {
"customer_interests": {
"Type": "String.Array",
"Value": "[\"soccer\", \"rugby\", \"hockey\"]"
},
"store": {
"Type": "String",
"Value":"example_corp"
},
"event": {
"Type": "String",
"Value": "order_placed"
},
"price_usd": {
"Type": "Number",
"Value": "210.75"
}
}
}
위의 Message sample을 토대로 accept 되는 경우와 reject되는 예시를 보면 좀 더 쉽게 policy 적용 방식을 이해할 수 있다.
Accept policy example
{
"store": ["example_corp"],
"event": "order_placed",
"customer_interests": [
"rugby",
"football",
"baseball"
],
"price_usd": [{"numeric": [">=", 100]}]
}
각 key 값들에 대한 value 값이 모두 통과해야 accept하게 된다. 각각의 key가 and로 이어진다고 생각하면 된다.
Reject policy example
{
"store": ["example_corp"],
"event": "order_cancelled"
"encrypted": [false],
"customer_interests": [
"basketball",
"baseball"
]
}
존재하지 않는 key 값이 있는 경우 또한 reject될 수 있고 값이 다른 경우에도 당연히 reject 된다.
sqs에 대한 polling을 할 때 받을 수 있는 최대 message 는 10개 가 max이다. batch의 크기가 생각보다 작기 때문에 빠르게 처리하기 위해서는 비동기적으로 sqs polling 을 구현하여야 한다.
이 때 동시에 여러 process에서 queue를 polling하게 되면 message의 중복읽기가 발생할 수 있는데 이러한 상황을 방지하기 위해서 VisibilityTimeout 이 있다.
VisibilityTimeout이란 해당 message를 소유할 수 있는 최대 시간이라고 생각하면 된다. 만약 10으로 설정되어 있다면 누군가 message_1을 읽었을 때 다른 사용자들은 message_1에 대한 접근이 10초 이후에 가능하다는 의미이다.