import requests
import pandas as pd
from dotenv import load_dotenv
import os
import json
load_dotenv()
class WeatherApiClient:
"""
OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스
"""
def __init__(self, api_key: str):
self.base_url = "http://api.openweathermap.org/data/2.5"
if api_key is None:
raise Exception("API 키는 None으로 설정할 수 없습니다.")
self.api_key = api_key
def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
"""
지정된 도시의 최신 날씨 데이터를 가져옵니다.
Parameters:
- city_name (str): 날씨 정보를 조회할 도시 이름.
- temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.
Returns:
- dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.
Raises:
- Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
"""
params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
response = requests.get(f"{self.base_url}/weather", params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
)
weather_api_client = WeatherApiClient(api_key=os.environ.get("API_KEY"))
data = weather_api_client.get_city("Sydney")
df_weather = pd.json_normalize(data) # JSON 데이터를 Pandas 형태로 변환
# 간단한 전처리
df_weather["measured_at"] = pd.to_datetime(df_weather["dt"], unit="s") + pd.Timedelta(hours=9) # 기준시간(한국 기준)
df_weather["dt"] = df_weather["measured_at"].dt.strftime('%Y%m%d') # 기준년월일 (YYYYMMDD)
df_weather["time"] = df_weather["measured_at"].dt.strftime('%H%M%S') # 기준년월일 (HHHHMMSS)
df_selected = df_weather[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename( # 컬럼명 수정
columns={"name": "city", "main.temp": "temperature", "main.humidity": "humidity", "wind.speed": "wind_speed"}
)
df_selected
# CSV 파일로 저장
df_selected.to_csv("weather_api.csv", index=False)
# 가져 오고 싶은 여러 도시 리스트화
cities = ["Sydney", "London", "New York", "Tokyo", "Toronto", "Seoul"]
all_data = []
# Collect data for each city and store it in a list
for city in cities:
data = weather_api_client.get_city(city)
all_data.append(data)
# normalize all data at once
df_weather = pd.json_normalize(all_data) # JSON 데이터를 Pandas 형태로 변환
df_weather["measured_at"] = pd.to_datetime(df_weather["dt"], unit="s") + pd.Timedelta(hours=9) # 기준시간
df_weather["dt"] = df_weather["measured_at"].dt.strftime('%Y%m%d') # 기준년월일 (YYYYMMDD)
df_weather["time"] = df_weather["measured_at"].dt.strftime('%H%M%S') # 기준년월일 (HHHHMMSS)
df_selected = df_weather[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename( # 컬럼명 수정
columns={"name": "city", "main.temp": "temperature", "main.humidity": "humidity", "wind.speed": "wind_speed"}
)
df_selected
.env
DB 서버 정보 설정 예시
→ Mac은 3306에다가 port를 못 만들어서 3307로 설정하신 거라고 함
(윈도우는 그냥 3306 쓰면 된다고 하셨음)
먼저 MySQL 데이터베이스와 상호작용하기 위해 필요한 라이브러리를 설치하고 불러오기
.env
파일에 저장된 MySQL 서버 정보로 데이터베이스에 연결
실행 전 설치해야 할 게 두 가지 있음
pip install sqlalchemy
pip install mysql-connector-python
pip3
# 패키지 불러오기
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
import os
from dotenv import load_dotenv
load_dotenv() # .env 파일에 저장된 환경 변수 로드
# MySQL 연결 정보 설정
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME') # 데이터베이스 아이디
DB_PASSWORD = os.environ.get('DB_PASSWORD') # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT') # 데이터베이스 연결을 위한 포트 (Default: 3306)
# 연결 URL 생성
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=DB_USERNAME,
password=DB_PASSWORD,
host=DB_SERVER_HOST,
port=DB_PORT,
database=DB_DATABASE,
)
# SQLAlchemy 엔진 생성
engine = create_engine(connection_url)
print("MySQL 데이터베이스에 연결되었습니다.")
from sqlalchemy import create_engine, MetaData, Table, Column
from sqlalchemy import text
from sqlalchemy import Integer, String, Float, DateTime
from sqlalchemy.engine import URL
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv() # .env에 입력한 모든 환경변수를 환경으로 가져옴
[실행 결과]
True
data = {
'dt': ['20241112', '20241112'],
'time': ['120000', '123000'],
'measured_at': pd.to_datetime(['2024-11-12 12:00:00', '2024-11-12 12:30:00']),
'id': [1, 2],
'city': ['Seoul', 'Seoul'],
'temperature': [15.5, 16.0],
'humidity': [45, 50],
'wind_speed': [3.5, 4.0]
}
df = pd.DataFrame(data)
print(df.head())
# MySQL 환경 변수 설정
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME') # 데이터베이스 아이디
DB_PASSWORD = os.environ.get('DB_PASSWORD') # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT') # 데이터베이스 연결을 위한 포트 (Default: 3306)
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=DB_USERNAME,
password=DB_PASSWORD,
host=DB_SERVER_HOST,
port=DB_PORT,
database=DB_DATABASE,
)
print(connection_url)
print(f"DB_SERVER_HOST: {DB_SERVER_HOST}")
print(f"DB_USERNAME: {DB_USERNAME}")
print(f"DB_PASSWORD: {'*' * len(DB_PASSWORD) if DB_PASSWORD else None}") # Masked for safety
print(f"DB_DATABASE: {DB_DATABASE}")
print(f"DB_PORT: {DB_PORT}")
[실행 결과]
mysql+mysqlconnector://root:***@localhost:3306/test_db
DB_SERVER_HOST: localhost
DB_USERNAME: root
DB_PASSWORD: ****
DB_DATABASE: test_db
DB_PORT: 3306
※ URL
→ DBeaver 연결할 때 입력하는 부분을 만드는 것!
→ sqlalchemy 패키지에서 왔음
from sqlalchemy.engine import URL
디비버가 데이터베이스와 접속을 시도할 때 사용
→ 참고:
python에서 쓰는 driver는 mysql+mysqlconnector:
DBeaver에서 쓰는 driver는 jdbc:mysql:
# SQLAlchemy 엔진 생성
engine = create_engine(connection_url)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
# CREATE
metadata.create_all(engine)
🡆 JOIN할 때 사용하는 키
student_id
는 학생 테이블의 student_id
와 일치하는 값만 가질 수 있으므로, 데이터 일관성이 유지됨dt
(기준년월일), time
(기준시간), id
(도시ID)를 활용하여 복합키 만들기from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, Float, DateTime
# 테이블 메타데이터 설정
metadata = MetaData()
table_name = 'daily_weather'
# 테이블 정의
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨
Column('time', String(6), nullable=False, primary_key=True), # 'time'은 문자열이며 기본 키로 설정됨
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키
Column('city', String(100), nullable=True), # 'city'는 문자열
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입
Column('humidity', Integer, nullable=True), # 'humidity'는 정수
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입
)
# 예제 데이터프레임 (API로부터 수집된 데이터가 있다고 가정)
data = {
'dt': ['20241112', '20241112'],
'time': ['120000', '123000'],
'measured_at': pd.to_datetime(['2024-11-12 12:00:00', '2024-11-12 12:30:00']),
'id': [1, 2],
'city': ['Seoul', 'Seoul'],
'temperature': [15.5, 16.0],
'humidity': [45, 50],
'wind_speed': [3.5, 4.0]
}
df = pd.DataFrame(data)
create_all
매서드를 사용하는 경우 테이블이 이미 존재하면 무시됨CREATE IF NOT EXIST
임# 테이블 메타데이터 설정
metadata = MetaData()
table_name = 'daily_weather'
# 테이블 정의
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨
Column('time', String(6), nullable=False, primary_key=True), # 'time'은 문자열이며 기본 키로 설정됨
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키
Column('city', String(100), nullable=True), # 'city'는 문자열
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입
Column('humidity', Integer, nullable=True), # 'humidity'는 정수
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입
)
# 테이블 생성
metadata.create_all(engine)
print(f"'{table_name}' 테이블이 생성되었습니다.")
# 테이블 삭제 (필요시 사용)
with engine.connect() as connection:
connection.execute(f"DROP TABLE IF EXISTS {table_name}")
print(f"'{table_name}' 테이블이 삭제되었습니다.")
# 존재하는 테이블에 데이터 삽입
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
print(f"'{table_name}' 테이블에 데이터가 삽입되었습니다.")
새로운 데이터를 데이터베이스에 추가하는 과정
if_exists='append'
옵션
index=False
새로운 데이터를 추가함으로써 테이블의 정보를 최신 상태로 유지할 수 있음
# 데이터프레임을 레코드의 리스트(딕셔너리로 구성된)로 변환
data = df.to_dict(orient='records')
# 테이블에서 기본 키 열을 가져옴
key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join([f"({', '.join(map(repr, values))})" for values in key_values])
# DELETE 문 실행 후 재삽입
with engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {database_name}.{table_name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(delete_sql)
connection.commit()
print(f"기존 데이터가 '{table_name}' 테이블에서 삭제되었습니다.")
# 새로운 데이터 삽입
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
print(f"새로운 데이터가 '{table_name}' 테이블에 삽입되었습니다.")
기존 데이터를 삭제한 후 새로운 데이터를 삽입하는 방식
주로 최신 데이터로 테이블을 업데이트해야 할 때 활용되며, 중복 데이터나 데이터 일관성을 유지하는 데 유용
특정 기준을 기반으로 기존 데이터를 삭제하고 새로운 데이터를 삽입하기 때문에 전체 데이터가 아닌 필요한 데이터만 교체할 수 있는 유연성을 제공
데이터가 중복되거나 업데이트가 필요한 경우, 기존 데이터 삭제 후 새로운 데이터 삽입을 통해 정확하고 최신의 데이터를 유지할 수 있음
🡆 SQL에는 없는 기능인데 파이썬으로는 짤 수 있음!
UPDATE
table_name
SET
column1 = 'new_value'
WHERE
column2 = 'specific_value'
;
DELETE
FROM
table_name
WHERE
column1 = 'value_to_delete'
;
SELECT
column1, column2
FROM
table_name
WHERE
column3 = 'value'
;
ALTER TABLE
table_name
ADD
column4 FLOAT
;
TRUNCATE TABLE
table_name
;
# 데이터 삽입 (INSERT)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
[실행 결과]
2
dt
(기준년월일), time
(기준시간), id
(도시ID) 중 하나라도 다르면 오류 안 뜸# 테이블 삭제 (DROP)
with engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
→ 테이블 자체를 날리는 거임!
# SQLAlchemy 엔진 생성
engine = create_engine(connection_url)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
# CREATE
metadata.create_all(engine)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
data = {
'dt': ['20241112', '20241112'],
'time': ['120000', '123000'],
'measured_at': pd.to_datetime(['2024-11-12 12:00:00', '2024-11-12 12:30:00']),
'id': [1, 5],
'city': ['Seoul', 'Seoul'],
'temperature': [15.5, 16.0],
'humidity': [45, 50],
'wind_speed': [3.5, 4.0]
}
df = pd.DataFrame(data)
# 데이터 삭제 후 삽입 (DELETE+INSERT)
# 데이터프레임을 레코드의 리스트(딕셔너리로 구성된)로 변환
data = df.to_dict(orient='records')
# 테이블에서 고유 키(Primary Key) 열을 가져옴
key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join([f"({', '.join(map(repr, values))})" for values in key_values])
with engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {DB_DATABASE}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # 생성된 DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
from sqlalchemy import create_engine, MetaData, Table, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
업서트(upsert)와 같은 작업을 지원합니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 5432,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table_name: str) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 업데이트합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
__init__
create_table
drop_table
insert
upsert
from sqlalchemy import Integer, String, Float, DateTime
from sqlalchemy import MetaData, Table, Column
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
# 2회차에서 쌓아둔 데이터 불러오기
df = pd.read_csv(
'weather_api.csv',
dtype={
'dt': 'object',
'time': 'object',
'id': 'int64',
'city': 'object',
'temperature': 'float64',
'humidity': 'int64',
'wind_speed': 'float64'
},
parse_dates=['measured_at']
)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME') # 데이터베이스 아이디
DB_PASSWORD = os.environ.get('DB_PASSWORD') # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT') # 데이터베이스 연결을 위한 포트 (Default: 3306)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
my_sql_client.create_table(metadata=metadata)
my_sql_client.insert(df=df, table=table, metadata=metadata)
# my_sql_client.upsert(df=df, table=table, metadata=metadata)
MySqlClient
class 안에overwrite
함수를 생성해주세요!