AWS S3 Event Notification to Lambda Function 설정하는 방법

김재민·2024년 10월 29일
0

대략 플로우

  • Lambda function 생성
  • Lambda function trigger 를 이용해서 s3 에 이벤트 알람 설정
    • 이 때 bucket prefix 지정하면 prefix 하위에 해당하는 Object 이벤트가 발생했을 때 Lambda 를 호출함
  • Lambda 에서 아래 코드를 이용하여 Slack 채널에 알람 메시지를 전송할 수 있음

아래 참고 자료 이용해서 게시글 다시 정리하기


lambda function 코드 예제


import boto3
import json
import logging
import os
import urllib.parse

from base64 import b64decode
from datetime import datetime, timedelta
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

   
# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']
   
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def generate_s3_url(region, bucket_name, object_key):
    # S3 주소 만드는 함수
    
    # URL 인코딩된 `object key`에서 디렉토리 경로만 추출
    prefix = "/".join(object_key.split("/")[:-1]) + "/"
    
    # URL 생성
    url = (
        f"https://{region}.console.aws.amazon.com/s3/buckets/{bucket_name}"
        f"?region={region}&bucketType=general&prefix={urllib.parse.quote(prefix)}"
    )
    return url


def send_message(message):
    
    req = Request(HOOK_URL, data = json.dumps(message).encode('utf-8'))

    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted to %s", SLACK_CHANNEL)
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

   
def lambda_handler(event, context):
    
    # Event detail
    records = event["Records"][0]
    region = records["awsRegion"]
    bucket_name = records["s3"]["bucket"]["name"]
    object_key = records["s3"]["object"]["key"]
    object_key = urllib.parse.unquote(object_key)
    s3_url = generate_s3_url(region, bucket_name, object_key)

    slack_message = {
        "channel": SLACK_CHANNEL,
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "입력하고 싶은 메시지 제목"
                }
            },
            {
                "type": "divider"
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"🏷️ Bucket: `{bucket_name}`"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"🏷️ Object: `{object_key}`"
                }
            },
            {
                "type": "divider"
            },
            {
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": "AWS S3 bucket object url"
			},
			"accessory": {
				"type": "button",
				"text": {
					"type": "plain_text",
					"text": "Click",
					"emoji": True
				},
				"value": "click_value",
				"url": s3_url,
				"action_id": "button-action"
			}
		}
        ]
    }
    
    
    send_message(slack_message)
profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글