slack 메시지 보내는 방법 블로그: https://honglab.tistory.com/212
amazon s3 이벤트 알림 개요: https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/EventNotifications.html
amazon s3 콘솔 이벤트 알림 설정 방법: https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/enable-event-notifications.html
객체 키 이름 필터링을 포함하는 유효한 알림 구성 예제:
https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/notification-how-to-filtering.html#notification-how-to-filtering-example-valid
이벤트 알림 메시지를 게시할 권한 부여: https://docs.aws.amazon.com/ko_kr/AmazonS3/latest/userguide/grant-destinations-permissions-to-s3.html
Event Bridge를 이용하는 방법: https://www.wisen.co.kr/pages/blog/blog-detail.html?idx=12085
import boto3
import json
import logging
import os
import urllib.parse
from base64 import b64decode
from datetime import datetime, timedelta
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# Get values from Environments variables
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
HOOK_URL = os.environ['HOOK_URL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def generate_s3_url(region, bucket_name, object_key):
# S3 주소 만드는 함수
# URL 인코딩된 `object key`에서 디렉토리 경로만 추출
prefix = "/".join(object_key.split("/")[:-1]) + "/"
# URL 생성
url = (
f"https://{region}.console.aws.amazon.com/s3/buckets/{bucket_name}"
f"?region={region}&bucketType=general&prefix={urllib.parse.quote(prefix)}"
)
return url
def send_message(message):
req = Request(HOOK_URL, data = json.dumps(message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", SLACK_CHANNEL)
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
def lambda_handler(event, context):
# Event detail
records = event["Records"][0]
region = records["awsRegion"]
bucket_name = records["s3"]["bucket"]["name"]
object_key = records["s3"]["object"]["key"]
object_key = urllib.parse.unquote(object_key)
s3_url = generate_s3_url(region, bucket_name, object_key)
slack_message = {
"channel": SLACK_CHANNEL,
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "입력하고 싶은 메시지 제목"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"🏷️ Bucket: `{bucket_name}`"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"🏷️ Object: `{object_key}`"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "AWS S3 bucket object url"
},
"accessory": {
"type": "button",
"text": {
"type": "plain_text",
"text": "Click",
"emoji": True
},
"value": "click_value",
"url": s3_url,
"action_id": "button-action"
}
}
]
}
send_message(slack_message)