오늘도 Service로 기존 코드 기능 분리를 하고 재사용 가능한 함수가 있는 부분을 처리해주었다.
from app.db import get_connection, close_connection
import traceback
import json, os
from itertools import combinations
# config.json 불러오기
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json")
with open(CONFIG_PATH) as f:
config = json.load(f)
# 공통 로직 함수
def _init_power(battery, power, channel_config=None, duration_minutes=None):
# 공통 로직 처리
# 에러 처리
if not 0 <= battery <= 100:
raise ValueError("battery 값이 잘못되었습니다.")
if power < 0:
raise ValueError("power 값이 잘못되었습니다.")
# 채널별 소비 전력값 설정 (W)
if channel_config is None:
channel_config = config["channel_config"]
# 유지 시간 설정 (minute)
if duration_minutes is None:
duration_minutes = config["duration_minutes"]
# 배터리 용량 설정
battery_capacity_wh = config["battery_capacity_wh"]
# 배터리 W 계산
battery_wh = battery_capacity_wh * (battery / 100)
battery_w = battery_wh / (duration_minutes / 60)
# 총 가용 전력
available_power = battery_w + power
print(f"배터리 잔량 : {battery} % \n실시간 생산량 : {power} W \n총 가용 전력({duration_minutes}분 기준) : {available_power} W\n")
return available_power, channel_config, duration_minutes
# 최적 판매 조합 추천 함수
#TODO config 파일 업데이트 (현재는 테스트용 값 적용)
def get_optimal_combination(battery, power, channel_config=None, duration_minutes=None):
"""
매개변수
battery : 배터리 저장 전력 %
power : 실시간 생산 전력
channel_config : 각 채널별 소비 전력
duration_minutes : 유지 시간
"""
try:
# 공통 로직 함수 불러오기
available_power, channel_config, _ = _init_power(battery, power, channel_config, duration_minutes)
# 모든 조합 생성
channels = ["A", "B", "C", "D"]
all_combinations = []
# 1개 ~ 4개 조합 생성
for i in range(1, len(channels) + 1):
for combi in combinations(channels, i):
all_combinations.append(list(combi))
# 유효 조합
valid_combinations = []
for combi in all_combinations:
# 조합의 총 소비 전력 계산
total_power = sum(channel_config[ch] for ch in combi)
# 현재 가용 전력으로 사용 가능한지 확인
if total_power <= available_power:
valid_combinations.append({
"channels": combi,
"power": total_power
})
print(f"유효 조합 수 : {len(valid_combinations)} 개\n")
# 최적 조합 선택 (가장 많은 채널 활성화 -> 더 큰 전력 소비 기준)
if not valid_combinations:
print("판매 가능한 채널이 없습니다.")
return [], None, 200
best = max(valid_combinations, key=lambda x: (len(x["channels"]), x["power"]))
print(f"최적 판매 조합 : {best['channels']}")
return best["channels"], None, 200
except ValueError as e:
print(f"최적 조합 추천 : 입력값 오류 - {e}")
return None, str(e), 400
except Exception as e:
print(f"최적 조합 추천 : 오류 - {e}")
traceback.print_exc()
return None, "최적 조합 추천 : 서버 오류 발생", 500
# 실시간 선택 가능 채널 확인 및 배터리 보호 함수
def get_available_channels(battery, power, selected_channels=[], channel_config=None, duration_minutes=None, battery_protection_threshold=None):
"""
매개변수
battery : 배터리 저장 전력 %
power : 실시간 생산 전력
selected_channels : 이미 선택된 채널 리스트
channel_config : 각 채널별 소비 전력
duration_minutes : 유지 시간
battery_protection_threshold : 배터리 보호 임계값 %
"""
try:
# 공통 로직 함수 불러오기
available_power, channel_config, duration_minutes = _init_power(battery, power, channel_config, duration_minutes)
# 배터리 보호 로직
if battery_protection_threshold is None:
battery_protection_threshold = config["battery_protection_threshold"]
if battery < battery_protection_threshold:
print(f"배터리 잔량이 {battery_protection_threshold} % 미만이므로 판매가 불가합니다.")
return {
"A": True,
"B": False,
"C": False,
"D": False
}, None, 200
# 이미 선택된 채널 전력 계산
selected_power = sum(channel_config[ch] for ch in selected_channels)
if selected_channels:
print(f"현재 판매중인 채널 : {selected_channels}\n소비중인 전력 : {selected_power} W\n")
# 남은 전력
remaining_power = available_power - selected_power
print(f"현재 가용 전력({duration_minutes}분 기준) : {remaining_power} W\n")
# 결과 딕셔너리 반환
result = {}
for key, value in channel_config.items():
if key in selected_channels:
result[key] = False
else:
result[key] = value <= remaining_power
print(f"판매 가능 채널 : {[ch for ch, val in result.items() if val]}")
return result, None, 200
except ValueError as e:
print(f"판매 가능 채널 조회 : 입력값 오류 - {e}")
return None, str(e), 400
except Exception as e:
print(f"판매 가능 채널 조회 : 오류 - {e}")
traceback.print_exc()
return None, "판매 가능 채널 조회 : 서버 오류 발생", 500
from app.db import get_connection, close_connection
import traceback
import requests
# DB에 릴레이 상태 저장 함수
def update_relay_status_in_db(data):
"""
릴레이 상태를 DB에 업데이트
"""
# DB에서 데이터 조회
conn, cursor = None, None
try:
# DB 연결
conn, cursor = get_connection()
if conn is None:
print("릴레이 상태 저장 : DB 연결 실패")
return False, "DB 연결 실패", 503
# 상태 저장할 빈 리스트 생성
relays_to_update = []
for relay_ch, ch_status in data.items():
# True -> "on", False -> "off"
status_string = "on" if ch_status else "off"
relays_to_update.append((status_string, relay_ch))
print(f"릴레이 상태 저장 : 업데이트할 데이터 - {relays_to_update}")
# DB UPDATE
sql = "UPDATE relay_status SET status = %s WHERE relay_name = %s"
cursor.executemany(sql, relays_to_update)
conn.commit()
print("릴레이 상태 저장 : 커밋 완료")
return True, "DB 저장 성공", 201
except Exception as e:
print(f"릴레이 상태 저장 : 오류 - {e}")
traceback.print_exc()
if conn:
conn.rollback() # 실패하면 롤백
return False, "서버 오류 발생", 500
finally:
close_connection(conn, cursor)
# DB에 거래 내역 저장
def insert_trade_history(buyer_id, amount):
"""
거래 내역을 DB에 저장
"""
# DB에서 데이터 조회
conn, cursor = None, None
try:
# DB 연결
conn, cursor = get_connection()
if conn is None:
print("거래 내역 저장 : DB 연결 실패")
return False, "DB 연결 실패", 503
# DB INSERT
sql = "INSERT INTO trade_history (buyer_id, amount) VALUES (%s, %s)"
cursor.execute(sql, (buyer_id, amount))
conn.commit()
print("거래 내역 저장 : 커밋 완료")
return True, "DB 저장 성공", 201
except Exception as e:
print(f"거래 내역 저장 : 오류 - {e}")
traceback.print_exc()
if conn:
conn.rollback() # 실패하면 롤백
return False, "서버 오류 발생", 500
finally:
close_connection(conn, cursor)
# 거래 내역 조회
def get_trade_history(user_id=None, start_date=None, end_date=None, date=None):
"""
거래 내역 조회 (필터 옵션)
"""
conn, cursor = None, None
try:
conn, cursor = get_connection()
if conn is None:
print("거래 내역 조회 : DB 연결 실패")
return None, "거래 내역 조회 : DB 연결 실패", 503
sql = "SELECT * FROM trade_history WHERE 1=1"
params = []
# 쿼리 파라미터 조건 추가
if user_id:
sql += " AND buyer_id = %s"
params.append(user_id)
if start_date:
sql += " AND timestamp >= %s"
params.append(start_date)
if end_date:
sql += " AND DATE(timestamp) <= %s"
params.append(end_date)
if date:
sql += " AND DATE(timestamp) = %s"
params.append(date)
sql += " ORDER BY timestamp DESC"
cursor.execute(sql, params)
result = cursor.fetchall()
return result, None, 200
except Exception as e:
print(f"거래 내역 조회 : 오류 - {e}")
traceback.print_exc()
return None, "거래 내역 조회 : 서버 오류 발생", 500
finally:
close_connection(conn, cursor)
# 현재 릴레이 상태 조회
def get_current_relay_status():
"""
현재 릴레이 상태 조회
"""
conn, cursor = None, None
try:
conn, cursor = get_connection()
if conn is None:
print("현재 상태 조회 : DB 연결 실패")
return None, "DB 연결 실패", 503
sql = "SELECT relay_name, status FROM relay_status WHERE relay_name IN ('A','B','C','D')"
cursor.execute(sql)
current_status = {row['relay_name']: row['status'] for row in cursor.fetchall()}
return current_status, None, 200
except Exception as e:
print(f"현재 상태 조회 : 오류 - {e}")
traceback.print_exc()
return None, "서버 오류 발생", 500
finally:
close_connection(conn, cursor)
# 아두이노에 릴레이 제어 전송
def send_to_arduino(data, arduino_url):
"""
아두이노에 릴레이 제어 전송
"""
try:
if not arduino_url:
print("아두이노 제어 : URL 미설정")
return False, "아두이노 URL 미설정", 503
response = requests.post(arduino_url, json=data, timeout=5)
if response.status_code != 200:
print(f"아두이노 제어 : 응답 오류 (status: {response.status_code})")
return False, "아두이노 응답 오류", 502
except requests.exceptions.Timeout:
print("아두이노 제어 : 응답 시간 초과")
return False, "아두이노 응답 시간 초과", 504
except requests.exceptions.ConnectionError:
print("아두이노 제어 : 연결 실패")
return False, "아두이노 연결 실패", 503
except Exception as e:
print(f"아두이노 제어 : 통신 오류 - {e}")
return False, "아두이노 통신 오류", 500
print("아두이노 제어 : 성공")
return True, "success", 200
# 현재 사용중인 채널 확인
def get_active_relay_list():
"""
활성화된 릴레이 채널 반환
"""
current_status, message, status_code = get_current_relay_status()
if current_status is None:
return None, message, status_code
# 활성화 채널만 필터링
active_list = [name for name, status in current_status.items() if status == "on"]
print(f"현재 사용중인 채널 : {active_list}")
return active_list, None, 200
from app.db import get_connection, close_connection
import traceback
from datetime import datetime
# 최신 센서 데이터 조회
def get_latest_sensor_data():
"""
최신 센서 데이터 조회
"""
# DB에서 데이터 조회
conn, cursor = None, None
try:
# DB 연결
conn, cursor = get_connection()
if conn is None:
print("최신 센서 데이터 조회 : DB 연결 실패")
return None, "최신 센서 데이터 조회 : DB 연결 실패", 503
# 데이터 조회
sql = "SELECT * FROM sun_data ORDER BY timestamp DESC LIMIT 1"
cursor.execute(sql)
sensor_data = cursor.fetchone()
if not sensor_data:
print("최신 센서 데이터 조회 : 데이터가 존재하지 않습니다.")
return None, "최신 센서 데이터 조회 : 데이터가 존재하지 않습니다.", 404
return sensor_data, None, 200
except Exception as e:
print(f"최신 센서 데이터 조회 : 오류 - {e}")
traceback.print_exc()
return None, "최신 센서 데이터 조회 : 서버 오류 발생", 500
finally:
close_connection(conn, cursor)
# 센서 데이터 DB에 저장
def save_sensor_data(soc, solar_w, lux):
"""
센서 데이터 DB에 저장
"""
conn, cursor = None, None
try:
conn, cursor = get_connection()
if conn is None:
print("센서 데이터 저장 : DB 연결 실패")
return False, "센서 데이터 저장 : DB 연결 실패", 503
# 서버 현재 시간을 timestamp로 사용
now = datetime.now()
# 데이터 저장
sql = "INSERT INTO sun_data (soc, solar_w, lux, timestamp) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, (soc, solar_w, lux, now))
conn.commit()
print("센서 데이터 저장 : 성공")
return True, "센서 데이터 저장 : DB 저장 성공", 201
except Exception as e:
print(f"센서 데이터 저장 : 오류 - {e}")
traceback.print_exc()
if conn:
conn.rollback()
return False, "센서 데이터 저장 : 서버 오류 발생", 500
finally:
close_connection(conn, cursor)
이제 어느 정도 기능들은 Service로 분리해두어서 이후 새로운 엔드포인트를 구현할 때 재사용을 하면서 구현이 가능해질 것 같다.