알람을 위한 전체 구조는 아래와 같이 단순하다.
Athena Workgroup의 데이터 사용량이 사전에 설정한 규칙에 따라 임계치에 도달하면, 연결된 SNS 토픽을 통해 알림 로그가 전송된다. 해당 SNS 토픽을 Lambda Function의 트리거로 설정하면, 수신된 메시지를 파싱하여 Slack으로 전달할 수 있다.

그림 출처: LG CNS - 사용자 별 Athena 쿼리 제약을 통한 비용 제어 방안
아래 규칙은 workgroup 생성 단계 또는 workgroup edit 를 통해 수정할 수 있다.

Managed per query data usage control 와 달리 데이터 스캔 사이즈가 임계값을 넘겨도 자체적으로 취소하지는 않는다.Data threshold 에 지정해둔 크기를 초과하면 SNS 토픽을 통해 Lambda Function 을 트리깅할 수 있다.Workgroup data usage alerts 규칙은 workgroup 당 여러개를 등록할 수 있다.
SNS 토픽을 생성하고
Workgroup data usage alerts에서SNS topic selection선택 박스를 통해 연결하면 임계값에 도달했을 때 해당 토픽으로 로그가 전송된다.


Lambda Function 에서
Add trigger를 통해 SNS 토픽과 연동할 수 있다.
lambda_handler함수의event파리미터를 파싱해서 원하는 작업을 하면 된다.
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic:xxxx1234-5678-1234-abcd-*********",
"Sns": {
"Type": "Notification",
"MessageId": "xxxxxxx-1234-5678-abcd-example",
"TopicArn": "arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic",
"Subject": "ALARM: \"xxxxxx....\" in Asia Pacific (Seoul)",
"Message": "{\"AlarmName\":\"AWS_Athena_Workgroup_xxxxxx...\",\"AlarmDescription\":null,\"AWSAccountId\":\"xxxx\",\"AlarmConfigurationUpdatedTimestamp\":\"2025-04-03T10:01:43.287+0000\",\"NewStateValue\":\"ALARM\",\"NewStateReason\":\"Threshold Crossed: 1 out of the last 1 datapoints [6.291456E8 (03/04/25 13:01:00)] was greater than the threshold (1.048576E8) (minimum 1 datapoint for OK -> ALARM transition).\",\"StateChangeTime\":\"2025-04-03T14:01:42.185+0000\",\"Region\":\"Asia Pacific (Seoul)\",\"AlarmArn\":\"arn:aws:cloudwatch:ap-northeast-2:xxxx:alarm:AWS_Athena_Workgroup_xxxxx...\",\"OldStateValue\":\"OK\",\"OKActions\":[],\"AlarmActions\":[\"arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic\"],\"InsufficientDataActions\":[],\"Trigger\":{\"MetricName\":\"ProcessedBytes\",\"Namespace\":\"AWS/Athena\",\"StatisticType\":\"Statistic\",\"Statistic\":\"SUM\",\"Unit\":null,\"Dimensions\":[{\"value\":\"my-work-group\",\"name\":\"WorkGroup\"}],\"Period\":3600,\"EvaluationPeriods\":1,\"DatapointsToAlarm\":1,\"ComparisonOperator\":\"GreaterThanThreshold\",\"Threshold\":1.048576E8,\"TreatMissingData\":\"notBreaching\",\"EvaluateLowSampleCountPercentile\":\"\"}}",
"Timestamp": "2025-04-03T14:01:42.224Z",
"SignatureVersion": "1",
"Signature": "xxxxx....",
"SigningCertUrl": "https://sns.ap-northeast-2.amazonaws.com/SimpleNotificationService-xxxx.pem",
"UnsubscribeUrl": "https://sns.ap-northeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic:example-uuid-xxxx...",
"MessageAttributes": {}
}
}
]
}
import boto3
import json
import logging
import os
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# 환경변수에서 Slack 채널 및 Webhook URL 읽어오기 (CRITICAL 전용)
SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID']
SLACK_HOOK_URL = os.environ['SLACK_HOOK_URL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def send_message(message, hook_url, channel):
"""
지정된 hook_url과 channel로 Slack 메시지 전송.
"""
message["channel"] = channel
req = Request(
hook_url,
data=json.dumps(message).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
try:
response = urlopen(req)
status_code = response.getcode()
response.read()
logger.info("Message posted to %s with status code: %s", channel, status_code)
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
def lambda_handler(event, context):
logger.info("Received event: %s", json.dumps(event))
sns_record = event['Records'][0]['Sns']
raw_message = sns_record.get('Message', '{}')
try:
alarm_data = json.loads(raw_message)
except Exception as e:
logger.error("Failed to parse SNS Message: %s", e)
alarm_data = {}
alarm_name = alarm_data.get("AlarmName", "N/A")
new_state = alarm_data.get("NewStateValue", "N/A")
new_state_reason = alarm_data.get("NewStateReason", "N/A")
state_change_time = alarm_data.get("StateChangeTime", "N/A")
trigger = alarm_data.get("Trigger", {})
# WorkGroup 값 추출
workgroup = "N/A"
dimensions = trigger.get("Dimensions", [])
for d in dimensions:
if d.get("name") == "WorkGroup":
workgroup = d.get("value", "N/A")
break
# Slack 메시지 텍스트 구성
slack_text = (
f"*🚨 AWS Athena Workgroup Alert*\n\n"
f"● *WorkGroup:* {workgroup}\n"
f"● *Reason:* *{workgroup} 워크그룹 데이터 스캔 임계값을 초과하였습니다.*\n{new_state_reason}\n"
f"● *State Change Time:* {state_change_time}"
)
# Slack 메시지 페이로드 구성
slack_message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "AWS Athena Workgroup Alert",
"emoji": True
}
},
{
"type": "divider"
}
],
"attachments": [
{
"color": "#FF0000" if new_state == "ALARM" else "#36a64f",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": slack_text
}
}
]
}
]
}
# CRITICAL hook_url 과 channel 로 메시지 전송
send_message(slack_message, SLACK_HOOK_URL, SLACK_CHANNEL_ID)


Q1.
Workgroup data usage alerts을 Slack 으로 연동하는 작업 중Managed per query data usage control로 이미 쿼리당 스캔 사이즈가 제한 되어 있는데 굳이 Workgroup 사용량 제한 규칙을 추가로 만들 필요가 있냐는 질문을 받았다.
A1. 맞는 말인 것 같았다. 한 세션에서 하나의 쿼리에 대한 스캔 가능 사이즈가 제한 되어 있으니 굳이 필요 없을 것 같기도 했다. 그럼에도 연동한 이유는 다음과 같다.
1. 특정 기간동안 쿼리 작업이 몰려 있는 것을 간접적으로 체크하고 작업을 다른 시간대로 분배할 수 있다.
2. 운영 기간이 길어질 수록 예상 했던 동시 쿼리 활성화 수와 데이터 스캔 사이즈를 초과하는 경우가 분명 발생한다. 이 때, 알람을 받고 쿼리 사용량에 대한when?, where?, what?, why?에 대한 의문으로 작업을 리펙토링 하듯 경량화하는 신호탄이 될 수 있다. 그리고 이런 경우는 많이 없겠지만, 누군가 작업 속도를 빠르게 하기위해 비동기 로직으로 Ahtena 쿼리를 제출하도록 만들어 짧은 기간 내 많은 DML 이 제출 되도록 작업할 수도 있다. 이럴 경우 동일한 AWS Region 에서 한 번에 활성화 가능한 Athena 사용량 제한에 걸리기 딱 좋은 작업이라 알람 연동을 통해 쿼리 경찰 역할도 충분히 할 수 있게 된다. 이렇듯, 제한량 설정과 알람 설정은 분명 필요하다 판단했다.
해당 게시글에서는 Athena Workgroup 에서
Workgroup data usage alerts규칙에 설정해둔 데이터 스캔 임계값을 초과할 경우 Slack 으로 알림 받는 방법을 기록했다.
필요한 경우 Lambda Function 에서 Athena Workgroup 을 비활성화하는 등의 여러 방법을 검토해볼 수 있다.