Athena workgroup

김재민·2025년 4월 13일

athena

목록 보기
1/1

1. Background


  • 보통 Amazon S3 기반 데이터 분석 쿼리를 위해 Athena 를 많이 사용한다.
  • 이 때, 쿼리 스캔 제한이 없을 경우 비용 폭탄을 맞을 수 있다.
  • 이번에는 Athena 쿼리 그룹을 분리하는 Workgroup 에서 쿼리당 스캔량을 제한하고 Workgroup 의 Data usage alerts 구성을 통해 쿼리 임계값에 도달할 경우 사용량 알림을 연동하는 방법을 기록한다.

2. Architecture


알람을 위한 전체 구조는 아래와 같이 단순하다.
Athena Workgroup의 데이터 사용량이 사전에 설정한 규칙에 따라 임계치에 도달하면, 연결된 SNS 토픽을 통해 알림 로그가 전송된다. 해당 SNS 토픽을 Lambda Function의 트리거로 설정하면, 수신된 메시지를 파싱하여 Slack으로 전달할 수 있다.

그림 출처: LG CNS - 사용자 별 Athena 쿼리 제약을 통한 비용 제어 방안


Athena Workgroup


아래 규칙은 workgroup 생성 단계 또는 workgroup edit 를 통해 수정할 수 있다.

Managed per query data usage control

  • Athena workgroup 에서 쿼리당 스캔 가능한 데이터 사이즈를 지정할 수 있다.
  • 규칙에 설정한 임계치에 도달하면 쿼리가 취소 된다.
  • 아래 이미지와 같이 설정할 경우 하나의 쿼리에서 데이터 스캔 사이즈가 300GB 를 초과할 경우 쿼리가 취소된다.

Workgroup data usage alerts

  • 특정 기간 동안 workgroup의 모든 쿼리에 대해 스캔할 수 있는 총 데이터 사이즈를 지정할 수 있다.
  • Managed per query data usage control 와 달리 데이터 스캔 사이즈가 임계값을 넘겨도 자체적으로 취소하지는 않는다.
  • 예를 들어, 15분동안 해당 workgroup에서 수행된 쿼리들의 총 데이터 스캔 사이즈가 Data threshold 에 지정해둔 크기를 초과하면 SNS 토픽을 통해 Lambda Function 을 트리깅할 수 있다.
  • Workgroup data usage alerts 규칙은 workgroup 당 여러개를 등록할 수 있다.


3. Create SNS TOPIC


SNS 토픽을 생성하고 Workgroup data usage alerts 에서 SNS topic selection 선택 박스를 통해 연결하면 임계값에 도달했을 때 해당 토픽으로 로그가 전송된다.

  • 아래 그림처럼 토픽 타입은 Standard 로 설정해야 Lambda Function 에서 구독할 수 있다.

  • 우선 아래와 같은 옵션은 따로 수정하지 않았다. 필요 시 작업하면 된다.


4. Create Lambda Function


Lambda Function 에서 Add trigger를 통해 SNS 토픽과 연동할 수 있다.
lambda_handler 함수의 event 파리미터를 파싱해서 원하는 작업을 하면 된다.

Event message example

  • event 파라미터로 전달 되는 데이터 예시는 아래와 같다.
{
  "Records": [
    {
      "EventSource": "aws:sns",
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic:xxxx1234-5678-1234-abcd-*********",
      "Sns": {
        "Type": "Notification",
        "MessageId": "xxxxxxx-1234-5678-abcd-example",
        "TopicArn": "arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic",
        "Subject": "ALARM: \"xxxxxx....\" in Asia Pacific (Seoul)",
        "Message": "{\"AlarmName\":\"AWS_Athena_Workgroup_xxxxxx...\",\"AlarmDescription\":null,\"AWSAccountId\":\"xxxx\",\"AlarmConfigurationUpdatedTimestamp\":\"2025-04-03T10:01:43.287+0000\",\"NewStateValue\":\"ALARM\",\"NewStateReason\":\"Threshold Crossed: 1 out of the last 1 datapoints [6.291456E8 (03/04/25 13:01:00)] was greater than the threshold (1.048576E8) (minimum 1 datapoint for OK -> ALARM transition).\",\"StateChangeTime\":\"2025-04-03T14:01:42.185+0000\",\"Region\":\"Asia Pacific (Seoul)\",\"AlarmArn\":\"arn:aws:cloudwatch:ap-northeast-2:xxxx:alarm:AWS_Athena_Workgroup_xxxxx...\",\"OldStateValue\":\"OK\",\"OKActions\":[],\"AlarmActions\":[\"arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic\"],\"InsufficientDataActions\":[],\"Trigger\":{\"MetricName\":\"ProcessedBytes\",\"Namespace\":\"AWS/Athena\",\"StatisticType\":\"Statistic\",\"Statistic\":\"SUM\",\"Unit\":null,\"Dimensions\":[{\"value\":\"my-work-group\",\"name\":\"WorkGroup\"}],\"Period\":3600,\"EvaluationPeriods\":1,\"DatapointsToAlarm\":1,\"ComparisonOperator\":\"GreaterThanThreshold\",\"Threshold\":1.048576E8,\"TreatMissingData\":\"notBreaching\",\"EvaluateLowSampleCountPercentile\":\"\"}}",
        "Timestamp": "2025-04-03T14:01:42.224Z",
        "SignatureVersion": "1",
        "Signature": "xxxxx....",
        "SigningCertUrl": "https://sns.ap-northeast-2.amazonaws.com/SimpleNotificationService-xxxx.pem",
        "UnsubscribeUrl": "https://sns.ap-northeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-2:xxxx:athena-workgroup-alert-topic:example-uuid-xxxx...",
        "MessageAttributes": {}
      }
    }
  ]
}

Slack Alerts code

  • 아래와 같이 개발할 경우 환경변수에 등록해둔 Slack webhook 주소를 참조하여 메시지를 전송할 수 있다.
import boto3
import json
import logging
import os

from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

# 환경변수에서 Slack 채널 및 Webhook URL 읽어오기 (CRITICAL 전용)
SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID']
SLACK_HOOK_URL = os.environ['SLACK_HOOK_URL']

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def send_message(message, hook_url, channel):
    """
    지정된 hook_url과 channel로 Slack 메시지 전송.
    """
    message["channel"] = channel

    req = Request(
        hook_url,
        data=json.dumps(message).encode('utf-8'),
        headers={'Content-Type': 'application/json'}
    )
    try:
        response = urlopen(req)
        status_code = response.getcode()
        response.read()
        logger.info("Message posted to %s with status code: %s", channel, status_code)
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

def lambda_handler(event, context):
    logger.info("Received event: %s", json.dumps(event))

    sns_record = event['Records'][0]['Sns']
    raw_message = sns_record.get('Message', '{}')
    try:
        alarm_data = json.loads(raw_message)
    except Exception as e:
        logger.error("Failed to parse SNS Message: %s", e)
        alarm_data = {}

    alarm_name = alarm_data.get("AlarmName", "N/A")
    new_state = alarm_data.get("NewStateValue", "N/A")
    new_state_reason = alarm_data.get("NewStateReason", "N/A")
    state_change_time = alarm_data.get("StateChangeTime", "N/A")
    trigger = alarm_data.get("Trigger", {})
    
    # WorkGroup 값 추출
    workgroup = "N/A"
    dimensions = trigger.get("Dimensions", [])
    for d in dimensions:
        if d.get("name") == "WorkGroup":
            workgroup = d.get("value", "N/A")
            break

    # Slack 메시지 텍스트 구성
    slack_text = (
        f"*🚨 AWS Athena Workgroup Alert*\n\n"
        f"● *WorkGroup:* {workgroup}\n"
        f"● *Reason:* *{workgroup} 워크그룹 데이터 스캔 임계값을 초과하였습니다.*\n{new_state_reason}\n"
        f"● *State Change Time:* {state_change_time}"
    )

    # Slack 메시지 페이로드 구성
    slack_message = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": "AWS Athena Workgroup Alert",
                    "emoji": True
                }
            },
            {
                "type": "divider"
            }
        ],
        "attachments": [
            {
                "color": "#FF0000" if new_state == "ALARM" else "#36a64f",
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": slack_text
                        }
                    }
                ]
            }
        ]
    }

    # CRITICAL hook_url 과 channel 로 메시지 전송
    send_message(slack_message, SLACK_HOOK_URL, SLACK_CHANNEL_ID)

Slack message example

  • Lambda function 을 통해 전송된 메시지 예시는 아래와 같다.

Slack - Block Kit Builder


5. Q&A

Q1. Workgroup data usage alerts 을 Slack 으로 연동하는 작업 중 Managed per query data usage control 로 이미 쿼리당 스캔 사이즈가 제한 되어 있는데 굳이 Workgroup 사용량 제한 규칙을 추가로 만들 필요가 있냐는 질문을 받았다.

A1. 맞는 말인 것 같았다. 한 세션에서 하나의 쿼리에 대한 스캔 가능 사이즈가 제한 되어 있으니 굳이 필요 없을 것 같기도 했다. 그럼에도 연동한 이유는 다음과 같다.
1. 특정 기간동안 쿼리 작업이 몰려 있는 것을 간접적으로 체크하고 작업을 다른 시간대로 분배할 수 있다.
2. 운영 기간이 길어질 수록 예상 했던 동시 쿼리 활성화 수와 데이터 스캔 사이즈를 초과하는 경우가 분명 발생한다. 이 때, 알람을 받고 쿼리 사용량에 대한 when?, where?, what?, why? 에 대한 의문으로 작업을 리펙토링 하듯 경량화하는 신호탄이 될 수 있다. 그리고 이런 경우는 많이 없겠지만, 누군가 작업 속도를 빠르게 하기위해 비동기 로직으로 Ahtena 쿼리를 제출하도록 만들어 짧은 기간 내 많은 DML 이 제출 되도록 작업할 수도 있다. 이럴 경우 동일한 AWS Region 에서 한 번에 활성화 가능한 Athena 사용량 제한에 걸리기 딱 좋은 작업이라 알람 연동을 통해 쿼리 경찰 역할도 충분히 할 수 있게 된다. 이렇듯, 제한량 설정과 알람 설정은 분명 필요하다 판단했다.


6. Ref


해당 게시글에서는 Athena Workgroup 에서 Workgroup data usage alerts 규칙에 설정해둔 데이터 스캔 임계값을 초과할 경우 Slack 으로 알림 받는 방법을 기록했다.
필요한 경우 Lambda Function 에서 Athena Workgroup 을 비활성화하는 등의 여러 방법을 검토해볼 수 있다.

profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글