from sqlalchemy import create_engine, MetaData, Table, Column
from sqlalchemy import Integer, String, Float, DateTime
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
[실행 결과]
True
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 5432,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table: Table) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {self.database_name}.{table.name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
데이터를 테이블에 삽입합니다. 테이블이 없으면 생성 후 추가합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
데이터를 테이블에 삽입하고, 기존 레코드가 있으면 업데이트합니다. 테이블이 없으면 생성 후 추가합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy 테이블 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.
Parameters:
- df (pd.DataFrame): 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
# 기존 테이블 데이터 삭제
self.drop_table(table=table)
# 테이블이 존재하지 않으면 생성
self.create_table(metadata=metadata)
# 새로운 데이터를 테이블에 삽입: upsert 써도 됨
self.insert(df=df, table=table, metadata=metadata)
🡆 def drop_table
형태가 이전 강의와 달라졌으니 참고!
# 수정 전
def drop_table(self, table_name: str) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
# 수정 후
def drop_table(self, table: Table) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table (Table): 삭제할 SQLAlchemy Table 객체.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {self.database_name}.{table.name}"))
# 2회차에서 쌓아둔 데이터 불러오기
df = pd.read_csv(
'weather_api.csv',
dtype={
'dt': 'object',
'time': 'object',
'id': 'int64',
'city': 'object',
'temperature': 'float64',
'humidity': 'int64',
'wind_speed': 'float64'
},
parse_dates=['measured_at']
)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST')
DB_USERNAME = os.environ.get('DB_USERNAME')
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DB_DATABASE = os.environ.get('DB_DATABASE')
DB_PORT = os.environ.get('DB_PORT')
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
etl_module/
├── assets/
│ └── __init__.py
│ └── weather.py
├── connectors/
│ └── __init__.py
│ ├── weather_api.py
│ └── mysql_client.py
├── pipeline/
│ └── etl_pipeline.py
├── .env
├── 4_challenge_module.ipynb
└── requirements.txt
VS Code Terminal에 pip install -r requirements.txt
입력하면 실습에 필요한 모든 라이브러리를 설치할 수 있음
etl_module
폴더를 VS Code로 열기: 작업환경 설정
이제 우리는 .ipynb
파일보다 .py
파일에 익숙해져야 함!
connectors
폴더에는 지금까지 만든 class가 모두 들어가 있음
asset
폴더에는 실제로 쓸 거(부를 거) 넣음
__init__.py
assets.weather
하고 부를 수가 없음!weather.py
)__init__.py
mysql.py
)weather_api.py
)etl_pipeline.py
)connectors
)지난번 실습에 만들었던 Class를 모듈 파일로 변환해 보겠습니다.
Docstring 및 주석도 꼭 포함해주세요.
connectors/weather_api.py
import requests
class WeatherApiClient:
"""
OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스입니다.
"""
def __init__(self, api_key: str):
self.base_url = "http://api.openweathermap.org/data/2.5"
if api_key is None:
raise Exception("API 키는 None으로 설정할 수 없습니다.")
self.api_key = api_key
def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
"""
지정된 도시의 최신 날씨 데이터를 가져옵니다.
Parameters:
- city_name (str): 날씨 정보를 조회할 도시 이름.
- temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.
Returns:
- dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.
Raises:
- Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
"""
params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
response = requests.get(f"{self.base_url}/weather", params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
)
connectors/mysql.py
from sqlalchemy import create_engine, MetaData, Table, MetaData, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
업서트(upsert)와 같은 작업을 지원합니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 3306,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table: Table) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 갱신합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.
Parameters:
- df (pd.DataFrame): 테이블의 기존 데이터를 대체할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
# 테이블이 존재하지 않으면 생성
self.create_table(metadata=metadata)
# 기존 테이블 데이터 삭제
self.drop_table(table=table)
# 새로운 데이터를 테이블에 삽입
self.insert(df=df, table=table, metadata=metadata)
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
from etl_module.connectors.weather_api import WeatherApiClient
from connectors.weather_api import WeatherApiClient
로 작성# 환경변수 설정
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME') # 데이터베이스 아이디
DB_PASSWORD = os.environ.get('DB_PASSWORD') # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT') # 데이터베이스 연결을 위한 포트 (Default: 3306)
DB_DATABASE
[실행 결과]
'test_db'
.env
파일에 들어가 있는 값 가져오는 것# Client
weather_api_client = WeatherApiClient(api_key = API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
# 잘 들어갔는지 확인
weather_api_client.api_key
[실행 결과]
(내 api key 출력)
# 잘 들어갔는지 확인 (2)
my_sql_client.database_name
[실행 결과]
'test_db'
# test
weather_api_client.get_city(city_name='seoul')
assets
)import os
from dotenv import load_dotenv
import pandas as pd
from connectors.weather_api import WeatherApiClient
from connectors.mysql_client import MySqlClient
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float
load_dotenv()
def extract_weather(api_client: WeatherApiClient) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
cities = ['seoul', 'busan', 'sejong', 'daegu', 'incheon', 'daejeon', 'ulsan']
weather_data = [api_client.get_city(city_name=city) for city in cities]
df = pd.json_normalize(weather_data)
return df
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename(columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed"
})
return df_selected
insert
및 overwrite
함수를 추가def load_weather(df: pd.DataFrame, my_sql_client: MySqlClient, method: str = "upsert",) -> None:
"""
변환된 날씨 데이터를 MySQL에 로드하는 함수.
Parameters:
- df (pd.DataFrame): 변환된 데이터
- my_sql_client (MySqlClient): 데이터베이스 클라이언트
- method (str, optional): 삽입 방법 ('insert', 'upsert', 'overwrite')
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
# 서울말고도 여러 도시에 대해 날씨 정보를 추출해볼까요?
def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
df = extract_weather(weather_api_client=weather_api_client)
df.head()
# 전처리 코드를 함수로 만들어볼게요.
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename(columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed"
})
return df_selected
clean_df = transform_weather(df)
clean_df.head()
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float
def load_weather(df: pd.DataFrame, my_sql_client: MySqlClient, method: str = "upsert",) -> None:
"""
변환된 날씨 데이터를 MySQL 데이터베이스에 로드합니다.
Parameters:
- df (pd.DataFrame): 변환된 날씨 데이터를 포함하는 DataFrame.
- my_sql_client (MySqlClient): MySQL 데이터베이스와 상호작용하는 클라이언트.
- table (Table): 데이터를 로드할 대상 데이터베이스 테이블.
- metadata (MetaData): 테이블 정의에 대한 SQLAlchemy 메타데이터 객체.
- method (str, optional): 데이터 삽입 방법을 지정합니다.
옵션: "insert", "upsert", "overwrite".
기본값은 "upsert"입니다.
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
load_weather(df=clean_df, my_sql_client=my_sql_client, method="upsert")
assets
)connectors/mysql.py
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float
def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename( # 컬럼명 수정
columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed",
}
)
return df_selected
def load_weather(
df: pd.DataFrame,
my_sql_client: MySqlClient,
method: str = "upsert",
) -> None:
"""
변환된 날씨 데이터를 MySQL 데이터베이스에 로드합니다.
Parameters:
- df (pd.DataFrame): 변환된 날씨 데이터를 포함하는 DataFrame.
- my_sql_client (MySqlClient): MySQL 데이터베이스와 상호작용하는 클라이언트.
- table (Table): 데이터를 로드할 대상 데이터베이스 테이블.
- metadata (MetaData): 테이블 정의에 대한 SQLAlchemy 메타데이터 객체.
- method (str, optional): 데이터 삽입 방법을 지정합니다.
옵션: "insert", "upsert", "overwrite".
기본값은 "upsert"입니다.
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
pipeline
)import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
def main():
"""
ETL 프로세스를 실행하는 메인 함수.
1. 환경 변수를 로드하여 API 및 데이터베이스 연결 정보를 가져옵니다.
2. WeatherApiClient 및 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다 (데이터 추출 -> 변환 -> 데이터베이스 적재).
"""
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST')
DB_USERNAME = os.environ.get('DB_USERNAME')
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DB_DATABASE = os.environ.get('DB_DATABASE')
DB_PORT = os.environ.get('DB_PORT')
weather_api_client = WeatherApiClient(api_key = API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
# ETL 실행
df = extract_weather(weather_api_client=weather_api_client)
clean_df = transform_weather(df)
load_weather(df=clean_df, my_sql_client=my_sql_client)
df = extract_weather(weather_api_client=weather_api_client)
clean_df = transform_weather(df)
load_weather(df=clean_df, my_sql_client=my_sql_client, method="overwrite")
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
def main():
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST')
DB_USERNAME = os.environ.get('DB_USERNAME')
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DB_DATABASE = os.environ.get('DB_DATABASE')
DB_PORT = os.environ.get('DB_PORT')
weather_api_client = WeatherApiClient(api_key = API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
# ETL 실행
df = extract_weather(weather_api_client=weather_api_client)
clean_df = transform_weather(df)
load_weather(df=clean_df, my_sql_client=my_sql_client)
main()
pipeline
)main
함수를 모듈 파일로 변환pipeline/etl_pipeline.py
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
def main():
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
weather_api_client = WeatherApiClient(api_key=API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
# ETL 실행
df = extract_weather(weather_api_client=weather_api_client)
clean_df = transform_weather(df)
print(clean_df.shape)
load_weather(df=clean_df, my_sql_client=my_sql_client)
if __name__ == "__main__":
main()
if __name__ == "__main__":
# Mac
python3 -m etl_module.pipeline.etl_pipeline
# Window
python -m etl_module.pipeline.etl_pipeline
python -m
명령어는 모듈에서 직접 실행하기 위해 사용AWS
Networking
IP 허용
Inflow Outflow