로컬 스케줄러를 활용해 최신 데이터 유지하기 MacOS (crontab)

Kangjik Kim·2024년 3월 8일
post-thumbnail

1. 로컬에서 하려는 이유

# Get memory usage of each column
memory_usage = df.memory_usage(deep=True)

# Sum the memory usage of all columns
total_memory_usage = memory_usage.sum()

# Convert bytes to megabytes for easier interpretation
total_memory_usage_mb = total_memory_usage / (1024 * 1024)

print("Total memory usage of DataFrame:", total_memory_usage_mb, "MB")

이걸로 dataframe이 메모리를 얼마나 사용하는지 확인해보면 3기가가 넘는 메모리를 사용중이다.

현재 회사 인프라 구조에서 t3.small 인스턴스에 docker를 활용해 여러 환경을 실행하고 있기 때문에
실제 운영중인 인스턴스에 추가로 올리기가 불가능한 상황이다.(터질 수 있음)

새로운 인스턴스를 파는 방법이 있지만.. 비용문제로 안됨

2. 스케줄링 방법

매일 최신 데이터 반영하게 스케줄링

  • upload_parquet.py 로컬에서 스케줄링

  • upload_parquet.py 파일

    import pandas as pd
    import os
    from dotenv import load_dotenv
    from sqlalchemy import create_engine
    import boto3
    from datetime import datetime
    import time
    
    start_time = datetime.now()
    print(f"스크립트 시작 시간: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    print("환경 변수 로딩...\n")
    load_dotenv()
    print("환경 변수 로딩 완료.\n")
    
    print("환경 변수에서 연결 세부 정보 검색...\n")
    DB_HOST = os.getenv("DB_HOST")
    DB_NAME = os.getenv("DB_NAME")
    DB_USER = os.getenv("DB_USER")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    print(f"데이터베이스 세부 정보: 호스트 - {DB_HOST}, 데이터베이스 - {DB_NAME}\n")
    
    print("SQLAlchemy 엔진 생성...\n")
    engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")
    print("SQLAlchemy 엔진 생성 완료.\n")
    
    print("SQL 쿼리 정의...\n")
    sql_query = "SELECT * FROM points" 
    print(f"SQL 쿼리 정의 완료: {sql_query}\n")
    
    print("pandas DataFrame으로 데이터 가져오기...\n")
    points_df = pd.read_sql_query(sql_query, engine)
    current_time = datetime.now()
    print(f"DataFrame으로 데이터 가져오기 완료. 현재 시간: {current_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    print("오늘 날짜 가져오기...\n")
    unix_timestamp = int(time.time())
    today = datetime.today().strftime('%Y-%m-%d')
    print(f"오늘 날짜: {today}\n")
    
    print("필요한 디렉토리 생성...\n")
    script_dir = os.path.dirname(os.path.abspath(__file__))
    data_dir = os.path.join(script_dir, f'data/{today}')
    os.makedirs(data_dir, exist_ok=True)
    print("디렉토리 생성 완료.\n")
    
    print("DataFrame을 parquet로 변환...\n")
    parquet_file_path = os.path.join(data_dir, f"{today}_{unix_timestamp}_points.parquet")
    points_df.to_parquet(parquet_file_path, index=False)
    current_time = datetime.now()
    print(f"DataFrame을 parquet로 변환 완료. 현재 시간: {current_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    print("S3에 파일 업로드...\n")
    s3 = boto3.client('s3')
    bucket_name = 'prod.reviewduck'
    s3_file_path = f"points.parquet"
    s3.upload_file(parquet_file_path, bucket_name, s3_file_path)
    current_time = datetime.now()
    print(f"S3에 파일 업로드 완료. 현재 시간: {current_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
    
    print("스크립트 완료.\n")
  • macos 기준 crontab 사용

     crontab -e

    로 crontab 편집기 실행, 아래의 내용들 추가

  • 테스트용 10분단위 스케줄링

    */10 * * * * /usr/local/anaconda3/bin/python /Users/kangjik/cashdd_parquet/upload_parquet.py >> ~/Desktop/cron.log 2>&1
  • 실제 사용용 매일 새벽 3시 스케줄링(유저가 가장 적은 시간대)

    0 3 * * * /usr/local/anaconda3/bin/python /Users/kangjik/cashdd_parquet/upload_parquet.py >> ~/Desktop/cron.log 2>&1
  • crontab 스케줄링 확인

    crontab -l

3.crontab 상대경로 이슈

상대경로를 사용해 파일을 저장하게 .py 파일을 작성하면

일반적으로 홈 디렉토리에 저장하게 된다.

script_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(script_dir, f'data/{today}')
os.makedirs(data_dir, exist_ok=True)

위에 처럼 절대경로 사용하면 해결된다.

  • crontab 모니터링 방법
tail -f ~/Desktop/cron.log (위에 Crontab에 지정한 Logfile 위치, 파일명)
  • 스케줄링 전에 파일 실행 확인방법
/usr/local/anaconda3/bin/python ~/cashdd_parquet/upload_parquet.py

터미널에서 이렇게 실행해서 실행 확인,
conda를 사용중이지 않다면 python 경로가 달라질 수 있음

which python

으로 파이썬 경로를 확인할 수 있다.

일반적으로 사용되는 경로듣은 아래와 같다.

  • /usr/local/bin/python3
  • /usr/local/anaconda3/bin/python

4. 결과


테스트용 10분 단위로 스케줄링을 하면,
이렇게 매시 10분/20분/30분/40분/50분/60분에 로그파일이 생긴다.
로그파일은 최초 1회 생성되며 그 이후의 내용은 덮어쓰게 된다.

tail -f 옵션을 통해 로깅을 하면?

이렇게 실행 로그를 확인할 수 있다.

이 결과를 통해 나는 매번 통계 자료를 분석할때
rds에 접속해서 데이터를 가져오는 작업(시간이 많이 소요된다..!) 대신,
저장된 parquet을 가지고 바로 통계 자료를 분석할 수 있게 됐다.

0개의 댓글