파이프라인이 잘 돌아가는지 pipeline/etl_pipeline.py 중간 중간에 print 를 추가해 여러 step 을 출력하기
(예) [0시 0분] 파이프라인 시작
(예) [0시 0분] 파이프라인 종료, XYZ 테이블에 8건이 추가되었습니다.
etl_pipeline.py
에 print
함수로 내용만 추가해 주면 됩니다.
내가 작성한 코드
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
import time
def main():
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
"""
step 출력해야 할 것
- ETL 파이프라인 시작
- 환경변수(.env) 파일 읽음
- 객체 생성(weather, mysql)
- Extract 시작, 끝 log
- Transform 시작, 끝 log
- Load 시작, 끝 log
- ETL 파이프라인 끝
"""
print(f"[{time.strftime('%X')}] 파이프라인 시작")
print(f"[{time.strftime('%X')}] .env 파일 읽기 시작")
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
print(f"[{time.strftime('%X')}] .env 파일 읽기 끝")
print(f"[{time.strftime('%X')}] 객체 생성(weather, mysql) 시작")
weather_api_client = WeatherApiClient(api_key=API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
print(f"[{time.strftime('%X')}] 객체 생성(weather, mysql) 끝")
# ETL 실행
print(f"[{time.strftime('%X')}] Extract 시작")
df = extract_weather(weather_api_client=weather_api_client)
print(f"[{time.strftime('%X')}] Extract 끝")
print(f"[{time.strftime('%X')}] Transform 시작")
clean_df = transform_weather(df)
print(clean_df.shape)
print(f"[{time.strftime('%X')}] Transform 끝")
print(f"[{time.strftime('%X')}] Load 시작")
load_weather(df=clean_df, my_sql_client=my_sql_client)
print(f"[{time.strftime('%X')}] Load 끝")
print(f"[{time.strftime('%X')}] 파이프라인 종료, {DB_DATABASE} 테이블에 {clean_df.shape[0]}건이 추가되었습니다.")
if __name__ == "__main__":
main()
```python
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
import time
def main():
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
"""
step 출력해야 할 것
- ETL 시작
- 환경변수(.env) 읽었다
- 객체 생성(weather, mysql) 했다
- extract log 시작, 끝
- transform log 시작, 끝
- load log 시작, 끝
- ETL 파이프라인 끝
"""
print("ETL 시작")
print(".env 파일 읽기 시작")
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
print(".env 파일 읽기 끝")
print("객체 생성(weather, mysql) 시작")
weather_api_client = WeatherApiClient(api_key=API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
print("객체 생성(weather, mysql) 끝")
# ETL 실행
print("E 시작")
df = extract_weather(weather_api_client=weather_api_client)
print("E 끝")
print("T 시작")
clean_df = transform_weather(df)
print(clean_df.shape)
print("T 끝")
print("L 시작")
load_weather(df=clean_df, my_sql_client=my_sql_client)
print("L 끝")
print("ETL 끝")
if __name__ == "__main__":
main()
loguru
을 사용하여 다양한 로그 레벨로 메시지를 기록하고, 로그 파일로 저장하여 ETL 작업 추적schedule
을 사용하여 주기적으로 실행해야 하는 작업을 예약하고 자동화PyYAML
을 사용하여 YAML 파일을 읽고 쓰며, 애플리케이션 설정을 관리애플리케이션에서 발생하는 다양한 이벤트를 추적하는 중요한 도구
loguru 라이브러리를 활용하여 로그 레벨별로 로그를 기록하는 방법 배우기
로그 심각도 수준에 따른 순서
심각도 | 이름 | 설명 | 예시 |
---|---|---|---|
최하 | TRACE | 세부 정보 기록 | 함수 진입/출력, 변수 값 추척 등 |
하 | DEBUG | 디버깅 및 개발 중 필요한 정보 기록 | 함수 호출, 변수 값 등 |
중하 | INFO | 일반적인 경고 메시지 | 시스템 시작, 중요한 작업 완료 등 |
- | SUCCESS | (이 밑으로는 안 좋은 것들임) | |
중상 | WARNING | 경고 메시지 | 예기치 않은 상황, 잠재적인 오류 발생 가능성 등 |
상 | ERROR | 오류 메시지 | 예외 발생, 시스템 장애 등 |
최상 | CRITICAL | 심각한 오류 메시지 | 시스템 다운 등 |
→ DEBUG는 보통 확인용으로 많이 넣음
e.g. 데이터 클렌징 한 후 데이터프레임에 대한 shape 출력 등
→ 일반적으로 INFO와 ERROR를 자주 씀
pip install loguru
from loguru import logger
# 로그 레벨별 메시지 출력
logger.debug("디버그 메시지입니다.") # 디버깅 메시지
logger.info("정보 메시지입니다.") # 일반적인 정보 메시지
logger.warning("경고 메시지입니다.") # 경고 메시지
logger.error("에러 메시지입니다.") # 에러 메시지
logger.critical("심각한 에러 메시지입니다.") # 심각한 에러 메시지
from datetime import datetime
# 로그 파일 이름 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"etl_process_{current_datetime}.log"
# 로그를 파일로 기록
logger.add(log_filename)
# 로그 메시지 출력
logger.info("정보 메시지입니다.")
level="WARNING"
설정하면 WARNING 이상의 중요도를 가진 로그만 기록됨logger.add("logfile_format.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.info("정보 메시지")
from loguru import logger
# 다양한 심각도 수준으로 로그 메시지를 기록
logger.debug("디버그 메시지입니다.") # 디버깅 메시지
logger.info("정보 메시지입니다.") # 일반적인 정보 메시지
logger.success("모든 것이 잘 작동하고 있습니다!") # 성공 메시지
logger.warning("경고 메시지입니다.") # 경고 메시지
logger.error("에러 메시지입니다.") # 에러 메시지
logger.critical("심각한 에러 메시지입니다.") # 심각한 에러 메시지
→ 코드가 실행된 시간 | 심각도 | 코드 실행 줄 | 내용
from loguru import logger
from datetime import datetime
# 현재 시간에 기반한 파일명 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
current_datetime
[실행 결과]
'2024-11-21_12-07-53'
log_filename = f"etl_process_{current_datetime}.log"
log_filename
[실행 결과]
'etl_process_2024-11-21_12-07-53.log'
# 파일 생성
logger.add(log_filename) # 파일로 저장히기 위해 사용
logger.debug("디버그 메시지입니다.") # 디버깅 메시지
logger.info("정보 메시지입니다.") # 일반적인 정보 메시지
logger.success("모든 것이 잘 작동하고 있습니다!") # 성공 메시지
logger.warning("경고 메시지입니다.") # 경고 메시지
logger.error("에러 메시지입니다.") # 에러 메시지
logger.critical("심각한 에러 메시지입니다.") # 심각한 에러 메시지
→ logger.add
다음에 나오는 내용들을 모두 파일에 저장함
→ 한 번 더 코드를 돌리면 파일명을 바꾸지 않은 상태이므로 해당 파일에 누적되어 다시 저장됨
: add를 한 번 더 불러왔기 때문에 두 배로 생성이 됨
logger.add("logfile_level.log", level="WARNING")
logger.debug("디버그 메시지입니다.") # 기록되지 않음
logger.info("정보 메시지입니다.") # 기록되지 않음
logger.success("모든 것이 잘 작동하고 있습니다!") # 기록되지 않음
logger.warning("경고 메시지입니다.") # 기록됨
logger.error("에러 메시지입니다.") # 기록됨
logger.critical("심각한 에러 메시지입니다.") # 기록됨
logger.add(log_filename)
은 파일 하나 당(파일 이름 같으면) 한 개만 사용해야 함!(코드 전체에서 한 번만 불러와야 중복 없이 로그 기록됨)logger.add("logfile_all.log")
logger.add("logfile_level.log", level="WARNING")
logger.debug("디버그 메시지입니다.") # level 파일에는 기록되지 않음
logger.info("정보 메시지입니다.") # level 파일에는 기록되지 않음
logger.success("모든 것이 잘 작동하고 있습니다!") # level 파일에는 기록되지 않음
logger.warning("경고 메시지입니다.") # 둘 다 기록됨
logger.error("에러 메시지입니다.") # 둘 다 기록됨
logger.critical("심각한 에러 메시지입니다.") # 둘 다 기록됨
logger.add("logfile_format.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.info("정보 메시지")
→ 밀리초를 지우고 시간, 레벨, 메시지만 남기고 싶어서 출력 형식을 조절
cf. format 설정 전 로그
schedule
라이브러리를 사용하여 주기적인 작업을 설정하고 관리pip install schedule
job()
함수가 1초마다 실행되도록 설정schedule.every()
를 사용하여 주기적으로 실행할 시간을 설정do()
를 통해 실행할 작업을 지정합니다. schedule.run_pending()
을 통해 설정한 스케줄을 실행import schedule
import time
# 실행할 함수 정의
def job():
print("작업이 실행되었습니다!")
# 1초마다 작업 실행
schedule.every(1).seconds.do(job)
# 스케줄러 실행
while True:
schedule.run_pending() # 예약된 작업이 있으면 실행
time.sleep(1) # 1초마다 체크
# 10초마다 실행
schedule.every(10).seconds.do(job)
# 1분마다 실행
schedule.every(1).minute.do(job)
# 매일 자정에 실행
schedule.every().day.at("00:00").do(job)
# 매주 월요일 9시에 실행
schedule.every().monday.at("09:00").do(job)
import schedule
import time
# 실행할 함수 정의
def job():
print("작업이 실행되었습니다!")
# 작업을 1초마다 실행하도록 스케줄링
schedule.every(1).seconds.do(job)
# 스케줄러 실행
while True:
schedule.run_pending() # 예약된 작업이 있으면 실행
time.sleep(1) # 1초마다 체크
→ 내가 멈추게 하지 않는 이상 계속 돌아감
→ interrupt로 강제 멈춘 모습
# 10초마다 실행
schedule.every(10).seconds.do(job)
# 1분마다 실행
schedule.every(1).minute.do(job)
# 매일 자정에 실행
schedule.every().day.at("00:00").do(job)
# 매주 월요일 9시에 실행
schedule.every().monday.at("09:00").do(job)
[실행 결과]
Every 1 week at 09:00:00 do job() (last run: [never], next run: 2024-11-25 09:00:00)
def job():
print("작업실행")
schedule.every(1).second.do(job)
while True:
schedule.run_pending()
time.sleep(1)
예약 관련 포인트
- 스케줄 하나 걸 때마다 하나 돌아감
- 자유도가 높아서
schedule.every(1).second.do(job) schedule.every(1).second.do(job) schedule.every(1).second.do(job)
이렇게 하면 3개 켜져서 계속 돌아감
- 스케줄을 한 코드 당 하나만 하는 게 아니라 여러 가지를 할 수 있음 == 자유도가 높음
스레드(컴퓨팅)
- 스레드(thread): 어떠한 프로그램 내에서, 특히 프로세스 내에서 실행되는 흐름의 단위
- 일반적으로 한 프로그램은 하나의 스레드를 가지고 있지만, 프로그램 환경에 따라 둘 이상의 스레드를 동시에 실행할 수 있음 → 멀티스레드(multithread)
,
하고 파라미터 넣으면 됩니다.,
로 구분!.yaml
데이터 파이프라인은 다양한 설정과 구성이 필요하기 때문에 복잡해질 수 있습니다. 이때 YAML 파일을 사용하면 설정을 보다 간편하게 관리할 수 있습니다. 예를 들어, 데이터 소스, 처리 방식, 출력 위치, 스케줄링 정보 등을 YAML 파일로 정의하면 파이프라인을 더 쉽게 설정하고 관리할 수 있습니다.
.env
파일과의 차이점.env
파일은 주로 애플리케이션의 환경 변수를 저장하는 데 사용 → 보안 정보: 다른 사람이 보면 절대 안 됨!
반면, YAML
파일은 애플리케이션의 구성 정보를 저장하는 데 사용 → 다른 사람이 봐도 상관 없음
.env
파일은 주로 환경 변수나 민감한 정보를 저장하는 데 사용YAML
은 자동화 부분의 설정이나 구성을 관리하는 데 적합pip install pyyaml
config.yaml
)log_folder_path: "./etl_module/logs"
cities:
- "New York"
- "Los Angeles"
- "Chicago"
- "Houston"
- "Phoenix"
run_minutes: 5
cities = ["도시1", "도시2", …, "도시n"]
해서 입력했었는데 이제는 그렇게 안 해도 됨
- yaml 파일 cities의 장점
- city 바꾸고 싶을 때 기존 방식은 복잡
- YMAL에 넣으면 관리하기 편함!
import yaml
# YAML 파일 경로
yaml_file_path = 'config.yaml'
# YAML 파일을 읽어 Python 객체로 변환
with open(yaml_file_path, 'r') as yaml_file:
config = yaml.safe_load(yaml_file)
# 읽은 데이터 출력
print(config.get('log_folder_path')) # 로그 폴더 경로 출력
print(config.get('cities')) # 도시 목록 출력
import yaml
yaml_file_path = 'config.yaml'
# YAML 파일을 읽어서 Python 객체로 변환
with open(yaml_file_path, 'r') as yaml_file:
config = yaml.safe_load(yaml_file)
# 읽은 데이터 출력
config
[실행 결과]
{'log_folder_path': './etl_module/logs',
'cities': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'],
'run_minutes': 5}
→ with open()
: 여러 가지 파일을 읽을 때 사용하는 파이썬 내장 함수
→ 'r'는 읽기 전용으로 읽겠다는 의미!
→ as yaml_file
은 읽은 걸 yaml_file 변수에 넣겠다는 의미
→ config는 python dictionray 형태
config.get('log_folder_path')
[실행 결과]
'./etl_module/logs'
config.get('cities')
[실행 결과]
['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']
config.get('run_minutes')
[실행 결과]
5
: pipeline 폴더에 파일 저장
etl_pipeline.yaml
log_folder_path: "etl_module/logs"
cities:
- "New York"
- "Los Angeles"
- "Chicago"
- "Houston"
- "Phoenix"
run_minutes: 5
etl_pipeline.py
yaml은 line 104부터
__file__
: 현재 etl_pipeline.py
이 있는 경로 정보(파이썬 파일 만들면 기본으로 가지고 있는 정보임)line 111 os.makedirs(log_folder_path, exist_ok=True)
는 뭘 하는 건가요?
→ 코드 돌리면 logs 폴더가 생기는 걸 확인
import os
from datetime import datetime
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
from loguru import logger
import schedule
import yaml
import time
def main(config):
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
# 현재 날짜와 시간을 기반으로 Log 파일명 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"{config.get('log_folder_path')}/etl_process_{current_datetime}.log"
logger.add(log_filename)
logger.info("ETL 프로세스를 시작합니다.")
load_dotenv()
try:
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
if not all(
[API_KEY, DB_SERVER_HOST, DB_USERNAME, DB_PASSWORD, DB_DATABASE, DB_PORT]
):
# 누락된 변수들 확인
missing_vars = [
var
for var, value in [
("API_KEY", API_KEY),
("DB_SERVER_HOST", DB_SERVER_HOST),
("DB_USERNAME", DB_USERNAME),
("DB_PASSWORD", DB_PASSWORD),
("DB_DATABASE", DB_DATABASE),
("DB_PORT", DB_PORT),
]
if value is None
]
error_message = f"누락된 환경 변수: {', '.join(missing_vars)}"
logger.error(error_message)
raise ValueError(error_message) # 누락된 환경 변수가 있으면 예외를 발생시킴
logger.info("환경 변수를 성공적으로 로드했습니다.")
weather_api_client = WeatherApiClient(api_key=API_KEY)
logger.info("WeatherApiClient가 초기화되었습니다.")
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
logger.info("MySqlClient가 초기화되었습니다.")
# ETL 실행
logger.info("Weather API에서 데이터 추출을 시작합니다.")
df = extract_weather(
weather_api_client=weather_api_client, cities=config.get("cities")
)
logger.info(
f"데이터 추출이 완료되었습니다. 총 {len(df)}개의 레코드가 있습니다."
)
logger.info("데이터 변환을 시작합니다.")
clean_df = transform_weather(df)
logger.info(
f"데이터 변환이 완료되었습니다. 변환된 데이터프레임의 크기: {clean_df.shape}"
)
logger.info("MySQL 데이터베이스로 데이터 적재를 시작합니다.")
load_weather(df=clean_df, my_sql_client=my_sql_client)
logger.info("데이터 적재가 성공적으로 완료되었습니다.")
except Exception as e:
logger.exception(f"ETL 프로세스 중 오류가 발생했습니다. 오류: {e}")
if __name__ == "__main__":
# get config variables
yaml_file_path = __file__.replace(".py", ".yaml")
print(f"YAML 파일 위치: {yaml_file_path}")
with open(yaml_file_path) as yaml_file:
config = yaml.safe_load(yaml_file)
log_folder_path = config.get("log_folder_path")
os.makedirs(log_folder_path, exist_ok=True)
# 스케줄러 생성
schedule.every(config.get("run_minutes")).minutes.do(main, config=config)
while True:
schedule.run_pending()
time.sleep(5)
→ run_minutes가 5분으로 설정되어 있어서 terminal에서 실행하고 5분 지날 때마다 계속 돌아감
5회차 동안 열심히 수강하시느라 고생 많으셨습니다. 👏👏👏
Give yourselves a pat on the back!