데이터 파이프라인 구축 - 개인 과제

Suhyeon Lee·2025년 1월 15일
0
post-thumbnail

요구사항(필수 조건)

  1. Python 스크립트 작성
    • Python 3 버전을 사용하며, Linux 환경에서 실행 가능해야 함.
    • requests, pyupbit, pytz, datetime 등의 라이브러리 사용.
  2. 코드 구조
    • 시가총액 기준 Top5 코인을 추출하되, USDT/USDC는 제외할 것.
      1) CoinGecko API로 시총 데이터 받아오기
      2) Upbit(KRW) 마켓에 실제 상장되어 있는 심볼만 매칭
      3) 그중 시총 상위 5개를 선정
      4) USDT, USDC는 제외
      5) https://api.coingecko.com/api/v3/coins/markets
    • 오늘(KST) 날짜를 자동으로 구해서, 오늘 -1일(예: 실행일이 2025-01-12라면 실제 기준은 2025-01-11) 시점을 기준으로 1년치(365일) 데이터를 수집.
      1) Upbit 일봉 캔들 API 사용
      2) 여러 번(최대 200캔들씩) 호출하여 1년치(365개)까지 모두 모으기
      3) 최신→과거 순으로 응답이 오므로, 최종 결과는 과거→최신 순으로 정렬하여 저장
    • 폴더 구조 / CSV 저장
      1) /home/test01/upbit_raw/연-월-일/마켓명/candles.csv
      예: /home/test01/upbit_raw/2025-01-11/KRW-BTC/candles.csv
      2) 일자별 폴더를 만들고, 그 안에 마켓명 폴더를 만든 다음, candles.csv 파일 생성
      3) 하루에 1캔들(1행)씩 기록되도록 저장
      4) “converted_trade_price”는 수집하지 않음 (필드 제외)

코드

import os
import csv
import requests
import datetime
import pytz
import pyupbit
import time

# (1) 결과를 저장할 최상위 폴더
BASE_DIR = "/home/test01/upbit_raw"

# (2)  사용할 API url
COINGECKO_API_URL = "https://api.coingecko.com/api/v3/coins/markets"
UPBIT_CANDLE_API_URL = "https://api.upbit.com/v1/candles/days"

def get_coingecko_top_coins_by_marketcap(n=30):
    """
    (과제 안내)
    - CoinGecko의 "시가총액 순" 코인 목록을 n개 가져와야 함
    - vs_currency=usd, order=market_cap_desc, per_page=n 등 파라미터 사용
    - 응답(JSON)에서 각 코인별 id, symbol, name, market_cap 등을 추출
    - symbol은 대문자로 변환
    - 결과 리스트(딕셔너리 형태)로 반환
    """
    # TODO: CoinGecko API 호출 후 데이터 파싱
    # TODO: 'id', 'symbol'(upper), 'name', 'market_cap' 포함한 리스트로 구성
    params = {
        "vs_currency": "usd"
        , "order": "market_cap_desc"
        , "per_page": n
        , "page": 1
    }
    response = requests.get(COINGECKO_API_URL, params=params)
    data = response.json()
    result = [
        {
            "id": coin["id"]
            , "symbol": coin["symbol"].upper()
            , "name": coin["name"]
            , "market_cap": coin["market_cap"]
        }
        for coin in data
    ]
    return result

def get_upbit_tickers():
    """
    (과제 안내)
    - Upbit에서 'KRW' 마켓에 상장된 티커 목록 가져오기
    - pyupbit.get_tickers(fiat="KRW") 활용 가능
    - 결과 예) ["KRW-BTC", "KRW-ETH", ...]
    """
    # TODO: pyupbit 라이브러리 사용, KRW 마켓 티커 목록 반환
    krw_tickers = pyupbit.get_tickers(fiat="KRW")
    time.sleep(0.1) # 100ms 대기
    return krw_tickers

def get_top5_coins_exclude_usdt_usdc():
    """
    (과제 안내)
    - (1) get_coingecko_top_coins_by_marketcap() 함수로 시가총액 상위 30개 받기
    - (2) get_upbit_tickers() 함수로 업비트 'KRW' 티커 목록 받기
    - (3) 티커 "KRW-BTC" -> "BTC" 형태 추출 → CoinGecko의 symbol과 비교
    - (4) 그 중 USDT, USDC는 제외
    - (5) 상위 5개만 추려서 반환
    """
    # TODO: 두 함수(get_coingecko_top_coins_by_marketcap, get_upbit_tickers) 결과를 매칭
    # TODO: USDT, USDC 제외 후 상위 5개 선택
    coingecko = get_coingecko_top_coins_by_marketcap(30)
    upbit_org = get_upbit_tickers()
    upbit_symbol = set(ticker.split('-')[1] for ticker in upbit_org)
    matched_coin = [
        coin 
        for coin in coingecko 
        if coin["symbol"] in upbit_symbol and coin["symbol"] not in ["USDT", "USDC"]
    ]
    return matched_coin[:5]
    

def get_one_year_daily_candles(market, end_kst_str):
    """
    (과제 안내)
    - Upbit의 일봉 캔들 API를 사용, 시장(market=KRW-BTC 등)의 1년치(365일) 데이터 가져오기
    - end_kst_str (예: "2025-01-11 15:10:25") 는 (오늘 -1일)을 KST로 표현한 값
    - API는 UTC 기준 'to' 파라미터 사용 (최신→과거 순 200개씩)
    - while문 돌면서 1년치가 될 때까지 수집
    - 최종적으로 (과거→최신) 순으로 정렬해서 리턴
    """
    # TODO: (1) end_kst_str → datetime 변환 (KST)
    # TODO: (2) KST를 UTC로 바꿔서 API 'to' 파라미터에 사용
    # TODO: (3) 응답이 없거나 1년 전보다 더 과거로 내려가면 수집 중단
    # TODO: (4) 최신→과거 순으로 받은 데이터 -> 과거→최신 순으로 재정렬
    # TODO: (5) 1년 범위 내의 캔들만 필터링 후 반환
    kst = pytz.timezone("Asia/Seoul")
    end_kst = datetime.datetime.strptime(end_kst_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=kst)
    end_utc = end_kst.astimezone(pytz.utc)

    params = {
        "market": market
        , "count": 200
    }
    headers = {"accept": "application/json"}

    candles = []
    while len(candles) < 365:
        params["to"] = end_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
        response = requests.get(UPBIT_CANDLE_API_URL, params=params, headers=headers)
        time.sleep(0.1) # 100ms 대기
        data = response.json()
        
        if not data:
            break
        
        candles.extend(data)
        oldest_candle_time = datetime.datetime.strptime(data[-1]['candle_date_time_utc'], "%Y-%m-%dT%H:%M:%S")
        end_utc = oldest_candle_time - datetime.timedelta(seconds=1)
    
    candles.reverse()
    one_year_ago = end_kst - datetime.timedelta(days=365)
    result = [
        candle 
        for candle in candles
        if kst.localize(datetime.datetime.strptime(candle['candle_date_time_kst'], "%Y-%m-%dT%H:%M:%S")) >= one_year_ago
    ]
    return result
        

def save_candles_to_csv_by_date(candles, market):
    """
    (과제 안내)
    - 가져온 일봉(candles) 데이터를 '일자별' 폴더 → '마켓명' 폴더에 CSV로 저장
    - 예) /home/test01/upbit_raw/2025-01-11/KRW-BTC/candles.csv
    - 필드: market, candle_date_time_utc, candle_date_time_kst, opening_price, high_price, low_price, trade_price, timestamp, candle_acc_trade_price, candle_acc_trade_volume, prev_closing_price, change_price, change_rate
    - 'converted_trade_price'는 제외
    - 하루에 1캔들이므로 CSV 한 줄씩 append
    """
    # TODO: (1) candles 리스트를 순회하며 KST기준 날짜(YYYY-MM-DD) 추출
    # TODO: (2) 해당 날짜 폴더 + 마켓 폴더 생성
    # TODO: (3) candles.csv 파일에 (헤더 + 데이터) 작성 (이미 있으면 헤더 생략)
    
    fieldnames=[
        "market"
        , "candle_date_time_utc"
        , "candle_date_time_kst"
        , "opening_price"
        , "high_price"
        , "low_price"
        , "trade_price"
        , "timestamp"
        , "candle_acc_trade_price"
        , "candle_acc_trade_volume"
        , "prev_closing_price"
        , "change_price"
        , "change_rate"
    ]
    
    for candle in candles:
        date = candle['candle_date_time_kst'].split("T")[0]
        folder_path = os.path.join(BASE_DIR, date, market)
        os.makedirs(folder_path, exist_ok=True)
        
        file_path = os.path.join(folder_path, "candles.csv")
        file_exists = os.path.exists(file_path)
        
        with open(file_path, 'a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            if not file_exists:
                writer.writeheader()
            writer.writerow({key: candle[key] for key in writer.fieldnames})


def main():
    """
    (과제 안내)
    - 전체 실행 흐름:
      1) 시총 Top5 (USDT/USDC 제외) 코인 목록 구하기
      2) (오늘 KST -1일) 시점을 end_kst_str로 설정
      3) 위 코인 각각에 대해 1년치 일봉 수집(get_one_year_daily_candles)
      4) 날짜별로 CSV 저장(save_candles_to_csv_by_date)
    - 콘솔에 현재 시각, 기준 시각, 진행 상태, 수집된 캔들 수 등 출력
    """
    # TODO: (1) today_kst = datetime.now(kst), end_kst = today_kst - 1 day
    # TODO: (2) print()로 시작 시각, end_kst_str 찍기
    # TODO: (3) get_top5_coins_exclude_usdt_usdc() -> 코인목록
    # TODO: (4) 반복문 돌며 get_one_year_daily_candles() → save_candles_to_csv_by_date()
    kst = pytz.timezone('Asia/Seoul')
    today_kst = datetime.datetime.now(kst)
    end_kst = today_kst - datetime.timedelta(days=1)
    end_kst_str = end_kst.strftime("%Y-%m-%d %H:%M:%S")
    
    print(f"Start time: {today_kst}")
    print(f"End KST: {end_kst_str}")
    
    top_coins = get_top5_coins_exclude_usdt_usdc()
    
    for coin in top_coins:
        market = f"KRW-{coin['symbol']}"
        print(f"Processing {market}...")
        
        candles = get_one_year_daily_candles(market, end_kst_str)
        print(f"Collected {len(candles)} candles for {market}")
        
        save_candles_to_csv_by_date(candles, market)
        print(f"Saved candles for {market}")

if __name__ == "__main__":
    main()

코드 작성 내용 정리

과제 1

TODO: CoinGecko API 호출 후 데이터 파싱

  • CoinGecko의 "시가총액 순" 코인 목록을 n개 가져와야 함
  • vs_currency=usd, order=market_cap_desc, per_page=n 등 파라미터 사용
  • 응답(JSON)에서 각 코인별 id, symbol, name, market_cap 등을 추출
  • symbol은 대문자로 변환

TODO: 'id', 'symbol'(upper), 'name', 'market_cap' 포함한 리스트로 구성

  • 결과 리스트(딕셔너리 형태)로 반환

과제 2

TODO: pyupbit 라이브러리 사용, KRW 마켓 티커 목록 반환

  • Upbit에서 'KRW' 마켓에 상장된 티커 목록 가져오기
import pyupbit
import time

# 원화 티커를 krw_tikers라는 변수에 저장합니다.
krw_tickers = pyupbit.get_tickers(fiat = "KRW") 

# pyupbit의 모듈을 사용했다면 업비트 사이트에서 자료를 받아오니 시간이 걸린다고 합니다.
# 그래서 이렇게 타임 모듈을 사용해 0.1초 정도 쉬는 구간을 만들어 줘야 합니다.
# 이런 말이 있죠 모르면 외워
time.sleep(0.1)

print(krw_tickers)
['KRW-BTC', 'KRW-ETH', 'KRW-NEO', 'KRW-MTL', 'KRW-LTC', 'KRW-XRP', 'KRW-ETC', 'KRW-OMG', 'KRW-SNT', 'KRW-WAVES', 'KRW-XEM', 'KRW-QTUM', 'KRW-LSK', 'KRW-STEEM', 'KRW-XLM', 'KRW-ARDR', 'KRW-ARK', 'KRW-STORJ', 'KRW-GRS', 'KRW-REP', 'KRW-ADA', 'KRW-SBD', 'KRW-POWR', 'KRW-BTG', 'KRW-ICX', 'KRW-EOS', 'KRW-TRX', 'KRW-SC', 'KRW-ONT', 'KRW-ZIL', 'KRW-POLY', 'KRW-ZRX', 'KRW-LOOM', 'KRW-BCH', 'KRW-BAT', 'KRW-IOST', 'KRW-RFR', 'KRW-CVC', 'KRW-IQ', 'KRW-IOTA', 'KRW-MFT', 'KRW-ONG', 'KRW-GAS', 'KRW-UPP', 'KRW-ELF', 'KRW-KNC', 'KRW-BSV', 'KRW-THETA', 'KRW-QKC', 'KRW-BTT', 'KRW-MOC', 'KRW-ENJ', 'KRW-TFUEL', 'KRW-MANA', 'KRW-ANKR', 'KRW-AERGO', 'KRW-ATOM', 'KRW-TT', 'KRW-CRE', 'KRW-MBL', 'KRW-WAXP', 'KRW-HBAR', 'KRW-MED', 'KRW-MLK', 'KRW-STPT', 'KRW-ORBS', 'KRW-VET', 'KRW-CHZ', 'KRW-STMX', 'KRW-DKA', 'KRW-HIVE', 'KRW-KAVA', 'KRW-AHT', 'KRW-LINK', 'KRW-XTZ', 'KRW-BORA', 'KRW-JST', 'KRW-CRO', 'KRW-TON', 'KRW-SXP', 'KRW-HUNT', 'KRW-PLA', 'KRW-DOT', 'KRW-SRM', 'KRW-MVL', 'KRW-STRAX', 'KRW-AQT', 'KRW-BCHA', 'KRW-GLM', 'KRW-SSX', 'KRW-META', 'KRW-FCT2', 'KRW-CBK', 'KRW-SAND', 'KRW-HUM', 'KRW-DOGE', 'KRW-STRK', 'KRW-PUNDIX', 'KRW-FLOW', 'KRW-DAWN', 'KRW-AXS', 'KRW-STX']

추가: 원하는 티커만 추리기

target_tickers = [] # 자신이 원하는 티커들을 담을 리스트

#원화 티커들을 for문을 통해 하나씩 가져옵니다 
for i in krw_tickers: 

    # 가져온 티커의 현재가를 구합니다.
    cur_price = pyupbit.get_current_price(i)

    time.sleep(0.1)

    # 티커의 현재가격이 1000초과 5000미만이면
    if 1000 < cur_price < 5000:

        #해당티커를 자신이 만든 리스트에 저장합니다.
        target_tickers.append(i)

print(target_tickers)

과제 3

TODO: 두 함수(get_coingecko_top_coins_by_marketcap, get_upbit_tickers) 결과를 매칭

  • get_coingecko_top_coins_by_marketcap() 함수로 시가총액 상위 30개 받기
  • get_upbit_tickers() 함수로 업비트 'KRW' 티커 목록 받기
  • 티커 "KRW-BTC" -> "BTC" 형태 추출 → CoinGecko의 symbol과 비교

TODO: USDT, USDC 제외 후 상위 5개 선택

  • 그 중 USDT, USDC는 제외
  • 상위 5개만 추려서 반환
    • 리스트 컴프리헨션으로 해결
      matched_coin = [coin for coin in coingecko if coin["symbol"] in upbit_symbol and coin["symbol"] not in ["USDT", "USDC"]]

과제 4

  • Upbit의 일봉 캔들 API를 사용, 시장(market=KRW-BTC 등)의 1년치(365일) 데이터 가져오기
    • 일(Day) 캔들
      • https://api.upbit.com/v1/candles/days
    • '일봉 캔들'이 뭘까?
      • 일봉: 봉 하나하나가 하루 거래일의 데이터로 당일 가장 낮은 가격이 얼마인지, 가장 높은 가격이 얼마인지, 현재 가격이 얼마인지 알 수 있습니다.
      • 캔들 == 봉
        봉차트 == 캔들 차트

시가에 장이 시작하고 주삭의 가격이 오르락 내리락하며 종가에 마감하는 걸 일봉이라고 부르고 일봉을 봄으로써 하루의 주가가 어떻게 흘러갔는지 알 수 있다고 함

업비트 일 캔들

# install requests
# python -m pip install requests

import requests

# KRW-BTC 마켓에 2024년 10월 1일(UTC) 이전 일봉 1개를 요청
url = "https://api.upbit.com/v1/candles/days"
params = {  
    'market': 'KRW-BTC',  
    'count': 1,
    'to': '2024-10-01 00:00:00'
}  
headers = {"accept": "application/json"}

response = requests.get(url, params=params, headers=headers)

print(response.text)



TODO: (1) end_kst_str → datetime 변환 (KST)

  • end_kst_str (예: "2025-01-11 15:10:25") 는 오늘 -1일을 KST로 표현한 값을 의미
    • main에서 오늘 -1일 만들어서 함수에 넣을 것

TODO: (2) KST를 UTC로 바꿔서 API 'to' 파라미터에 사용

  • API는 UTC 기준 'to' 파라미터 사용 (최신→과거 순 200개씩)
  • 따라서 입력 받은 end_kst_str → utc로 바꿔야 함!

TODO: (3) 응답이 없거나 1년 전보다 더 과거로 내려가면 수집 중단

  • while문 돌면서 1년치가 될 때까지 수집
    • 365개가 될 때까지 모으고 366개가 되면 나가기

TODO: (4) 최신→과거 순으로 받은 데이터 -> 과거→최신 순으로 재정렬

  • 최종적으로 (과거→최신) 순으로 정렬해서 리턴
    • 인덱싱, .reverse() 활용

TODO: (5) 1년 범위 내의 캔들만 필터링 후 반환

과제 5

TODO: (1) candles 리스트를 순회하며 KST기준 날짜(YYYY-MM-DD) 추출

  • 날짜 포맷이 "2024-08-31T00:00:00"과 같은 ISO 8601 형식임
    • 'T'를 기준으로 split

TODO: (2) 해당 날짜 폴더 + 마켓 폴더 생성

  • 가져온 일봉(candles) 데이터를 '일자별' 폴더 → '마켓명' 폴더에 CSV로 저장
    • 예) /home/test01/upbit_raw/2025-01-11/KRW-BTC/candles.csv
  • 필드: market, candle_date_time_utc, candle_date_time_kst, opening_price, high_price, low_price, trade_price, timestamp, candle_acc_trade_price, candle_acc_trade_volume, prev_closing_price, change_price, change_rate
    • 'converted_trade_price'는 제외

TODO: (3) candles.csv 파일에 (헤더 + 데이터) 작성 (이미 있으면 헤더 생략)

  • 하루에 1캔들이므로 CSV 한 줄씩 append

과제 6

  • 전체 실행 흐름:
    1) 시총 Top5 (USDT/USDC 제외) 코인 목록 구하기
    2) (오늘 KST -1일) 시점을 end_kst_str로 설정
    3) 위 코인 각각에 대해 1년치 일봉 수집(get_one_year_daily_candles)
    4) 날짜별로 CSV 저장(save_candles_to_csv_by_date)
  • 콘솔에 현재 시각, 기준 시각, 진행 상태, 수집된 캔들 수 등 출력

TODO: (1) today_kst = datetime.now(kst), end_kst = today_kst - 1 day

  • get_one_year_daily_candles이 받아야 하는 하루 전 날짜를 만들어야 함

TODO: (2) print()로 시작 시각, end_kst_str 찍기

  • 터미널에 시작 시간, end_kst_str(시작 시간보다 하루 전 날짜) 나타나야 함

TODO: (3) get_top5_coins_exclude_usdt_usdc() -> 코인목록

  • 터미널에 어떤 market 처리 중인지 나타나야 함

TODO: (4) 반복문 돌며 get_one_year_daily_candles() → save_candles_to_csv_by_date()

profile
2 B R 0 2 B

0개의 댓글

관련 채용 정보