import json
import pymysql
import boto3
import os
def resp(status_code, body):
return {
'statusCode': status_code,
'body': json.dumps(body, ensure_ascii=False, default=str)
}
def get_secret():
client = boto3.client('secretsmanager')
secret = client.get_secret_value(
SecretId='ws/mysql/password'
)
return json.loads(secret['SecretString'])
def get_conn():
secret = get_secret()
try:
conn = pymysql.connect(
host=os.environ['DB_HOST'],
user=os.environ['DB_USER'],
password=secret['password'],
port=3306,
db=os.environ['DB_NAME']
)
print("DB 연결 성공")
return conn
except Exception as e:
print(f"DB 연결 실패: {e}")
def insert_data(data):
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "insert into orders (order_id,item_name,user_name,quantity,total_price,ordered_at) values (%s,%s,%s,%s,%s,%s)"
cur.execute(sql, (data['order_id'], data['item_name'], data['user_name'], data['quantity'], data['total_price'], data['ordered_at']))
conn.commit()
order_id = data['order_id']
return resp(201 ,{"message": "Order created successfully", "order_id": order_id})
def lambda_handler(event, context):
method = event['requestContext']['http']['method']
data = json.loads(event['body'])
if method == 'POST':
return insert_data(data)
import json
import pymysql
import boto3
import os
def resp(status_code, body):
return {
'statusCode': status_code,
'body': json.dumps(body, ensure_ascii=False, default=str)
}
def get_secret():
client = boto3.client('secretsmanager')
secret = client.get_secret_value(
SecretId='ws/mysql/password'
)
return json.loads(secret['SecretString'])
def get_conn():
secret = get_secret()
try:
conn = pymysql.connect(
host=os.environ['DB_HOST'],
user=os.environ['DB_USER'],
password=secret['password'],
port=3306,
db=os.environ['DB_NAME']
)
print("DB 연결 성공")
return conn
except Exception as e:
print(f"DB 연결 실패: {e}")
def select_all():
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "select order_id, item_name, user_name, quantity, total_price, status, ordered_at from orders"
cur.execute(sql)
stats_result = cur.fetchall()
cur.execute("SELECT COUNT(*) as total_count FROM orders")
total_count_result = cur.fetchone()
response_body = {
"count": total_count_result['total_count'],
"data": stats_result
}
return resp(200, response_body)
def select_user(user_name):
conn = get_conn()
cur = conn.cursor(pymysql.cursors.DictCursor)
sql = "select order_id, item_name, user_name, quantity, total_price, status, ordered_at from orders where user_name=(%s)"
cur.execute(sql, (user_name,))
stats_result = cur.fetchall()
count_sql = "SELECT COUNT(*) as total_count FROM orders WHERE user_name = %s"
cur.execute(count_sql, (user_name,))
total_count_result = cur.fetchone()
response_body = {
"count": total_count_result['total_count'] if total_count_result else 0,
"data": stats_result
}
return resp(200, response_body)
def lambda_handler(event, context):
queryString = event['rawQueryString']
format_queryString = queryString.split("=")[1]
print(format_queryString)
if not queryString:
return select_all()
else:
return select_user(format_queryString)
SELECT p.category, COUNT(o.order_id) as order_count, SUM(p.price * o.quantity) as total_revenue
FROM "ws_mall_db"."orders" o
JOIN "ws_mall_db"."products" p
ON p.product_id = o.product_id
WHERE o.status = '완료'
GROUP BY p.category
ORDER BY total_revenue DESC;
SELECT user_name, sum(total_price) as total_spent FROM "ws_mall_db"."orders"
where status = '완료'
group by user_name
order by total_spent desc;
SELECT month,
COUNT(*) AS total_orders,
SUM(CASE when status='완료' then 1 else 0 end) as completed,
SUM(CASE when status='지연' then 1 else 0 end) as delayed,
SUM(CASE when status='취소' then 1 else 0 end) as cancelled,
SUM(CASE when status in ('완료', '지연') then total_price else 0 end) as total_revenue
FROM "ws_mall_db"."orders"
group by month;