import json
import os
import boto3
import pymysql
def resp(status_code, body):
return {
'statusCode': status_code,
'body': json.dumps(body, ensure_ascii=False, default=str)
}
def get_secret():
client = boto3.client('secretsmanager')
secret = client.get_secret_value(
SecretId=os.environ['SECRET_NAME']
)
return json.loads(secret['SecretString'])
def get_conn():
secret = get_secret()
try:
conn = pymysql.connect(
host=os.environ['DB_HOST'],
user=os.environ['DB_USER'],
password=secret['password'],
port=3306,
db=os.environ['DB_NAME']
)
print("DB 연결 성공")
return conn
except Exception as e:
print(f"DB 연결 실패: {e}")
def insert_data(data):
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "INSERT INTO deliveries (delivery_id, region, status, weight_kg, cost_krw, delivered_at) VALUES (%s, %s, %s, %s, %s, %s)"
cur.execute(sql, (data['delivery_id'], data['region'], data['status'], data['weight_kg'], data['cost_krw'], data['delivered_at']))
conn.commit()
inserted_id = data['delivery_id']
return resp(200, {"message": "Delivery inserted successfully", "delivery_id": inserted_id})
def lambda_handler(event, context):
method = event['requestContext']['http']['method']
data = json.loads(event['body'])
if method == 'POST':
return insert_data(data)
import json
import pymysql
import boto3
import os
from urllib.parse import parse_qs
def resp(status_code, body):
return {
'statusCode': status_code,
'body': json.dumps(body, ensure_ascii=False, default=str)
}
def get_secret():
client = boto3.client('secretsmanager')
secret = client.get_secret_value(
SecretId=os.environ['SECRET_NAME']
)
return json.loads(secret['SecretString'])
def get_conn():
secret = get_secret()
try:
conn = pymysql.connect(
host=os.environ['DB_HOST'],
user=os.environ['DB_USER'],
password=secret['password'],
port=3306,
db=os.environ['DB_NAME']
)
print("DB 연결 성공")
return conn
except Exception as e:
print(f"DB 연결 실패: {e}")
def select_all():
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "select region, COUNT(*) as total, SUM(CASE WHEN status = '완료' THEN 1 ELSE 0 END) AS completed, SUM(CASE WHEN status = '지연' THEN 1 ELSE 0 END) AS 'delayed', SUM(CASE WHEN status = '취소' THEN 1 ELSE 0 END) AS canceled, CAST(ROUND(AVG(cost_krw)) AS SIGNED) as avg_cost_krw, sum(weight_kg) AS total_weight_kg from deliveries group by region"
cur.execute(sql)
stats_result = cur.fetchall()
cur.execute("SELECT COUNT(*) as total_count FROM deliveries")
total_count_result = cur.fetchone()
response_body = {
"count": total_count_result['total_count'],
"data": stats_result
}
return resp(200, response_body)
def select_region(region):
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "select region, COUNT(*) as total, SUM(CASE WHEN status = '완료' THEN 1 ELSE 0 END) AS completed, SUM(CASE WHEN status = '지연' THEN 1 ELSE 0 END) AS 'delayed', SUM(CASE WHEN status = '취소' THEN 1 ELSE 0 END) AS canceled, CAST(ROUND(AVG(cost_krw)) AS SIGNED) as avg_cost_krw, sum(weight_kg) AS total_weight_kg from deliveries where region=%s group by region"
cur.execute(sql, (region,))
stats_result = cur.fetchall()
count_sql = "SELECT COUNT(*) as total_count FROM deliveries WHERE region = %s"
cur.execute(count_sql, (region,))
total_count_result = cur.fetchone()
response_body = {
"count": total_count_result['total_count'] if total_count_result else 0,
"data": stats_result
}
return resp(200, response_body)
def lambda_handler(event, context):
queryString = event['rawQueryString']
if not queryString:
return select_all()
else:
params=parse_qs(queryString)
region = params.get('region', [None])[0]
return select_region(region)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::cl-067383;642415/home/${aws:username}/*",
"Condition": {
"StringLike": {
"aws:username": "*"
}
}
}
]
}