Steam API를 활용한 사용자 프로필 및 데이터 관리 자동화

2star_·2025년 1월 10일
0

최종 프로젝트

목록 보기
12/32

Steam API를 활용한 사용자 프로필 및 데이터 관리 자동화

DjangoSelenium, 그리고 Steam API를 활용하여 Steam 사용자 프로필, 리뷰, 플레이타임 데이터를 수집하고 이를 데이터베이스에 저장하는 코드를 살펴보겠습니다. 주요 코드는 management command로 작성되었으며, 아래 핵심 과정을 단계별로 설명합니다.


1. 주요 기능 개요

이 코드는 Steam 사용자 정보를 자동으로 수집 및 업데이트하기 위한 스크립트입니다. 다음과 같은 주요 기능을 포함합니다:
1. Steam 프로필 공개 여부 확인
2. 사용자 리뷰 크롤링 (상위 3개 추천 리뷰)
3. API를 활용한 플레이타임 데이터 수집 (상위 2개 게임)
4. 데이터베이스 업데이트 및 기존 데이터 정리


2. 주요 코드 구조

handle 메서드

BaseCommandhandle 메서드는 전체 스크립트의 핵심 흐름을 담당합니다. 주요 단계는 아래와 같습니다:

  1. 환경변수에서 API 키를 불러옴.
  2. Selenium WebDriver 초기화.
  3. steamId가 있는 모든 사용자 계정 가져오기.
  4. 각 계정에 대해:
    • 프로필 공개 여부 확인
    • 리뷰 및 플레이타임 데이터 수집
    • 수집한 데이터를 데이터베이스에 저장
def handle(self, *args, **options):
    # 1) 환경 변수에서 API 키 불러오기
    env = environ.Env()
    api_key = env("STEAM_API_KEY")  # .env에 STEAM_API_KEY=... 설정

    # 2) Selenium WebDriver 준비 (#Headless 예시)
    options = webdriver.ChromeOptions()
    driver = webdriver.Chrome(options=options)

    # 3) steamId가 비어있지 않은 Account를 모두 가져온다
    accounts = Account.objects.exclude(steamId="")

3. Steam 프로필 공개 여부 확인

Steam 사용자 프로필이 공개 상태인지 확인하려면 GetPlayerSummaries API를 사용합니다. communityvisibilitystate 값이 3인 경우, 프로필이 공개 상태입니다.

코드 예시

def check_profile_public(self, api_key, steam_id_str):
    url = "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/"
    params = {"key": api_key, "steamids": steam_id_str}
    resp = requests.get(url, params=params).json()
    players = resp.get("response", {}).get("players", [])
    if not players:
        return False
    player = players[0]
    vis_state = player.get("communityvisibilitystate", 1)
    return (vis_state == 3)

4. 리뷰 크롤링 (Selenium 사용)

Steam 프로필 페이지에서 리뷰 데이터를 가져오기 위해 Selenium을 활용합니다. 추천 리뷰(Recommended) 상위 3개를 수집하며, CSS Selector를 사용해 리뷰 데이터를 추출합니다.

코드 예시

def fetch_top3_reviews(self, driver, steam_id_str):
    base_url = "https://steamcommunity.com/"
    if steam_id_str.isdigit():
        url = f"{base_url}profiles/{steam_id_str}/recommended"
    else:
        url = f"{base_url}id/{steam_id_str}/recommended"

    try:
        driver.get(url)
        WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".review_box"))
        )
        boxes = driver.find_elements(By.CSS_SELECTOR, ".review_box")
    except:
        return []

    recommended = []
    for box in boxes:
        try:
            title_elem = box.find_element(By.CSS_SELECTOR, ".vote_header .title > a")
            if "Recommended" not in title_elem.text:
                continue
            href = title_elem.get_attribute("href")
            app_id = href.split("/recommended/")[1].split("/")[0]
            recommended.append({"app_id": app_id})
            if len(recommended) >= 3:
                break
        except Exception as e:
            continue

    return recommended

5. 플레이타임 데이터 수집

사용자의 상위 2개 게임 플레이타임 데이터를 수집하기 위해 Steam의 GetOwnedGames API를 활용합니다.

코드 예시

def fetch_top2_playtime_api(self, api_key, steam_id_str):
    if not steam_id_str.isdigit():
        steam_id_str = self.resolve_vanity_url(api_key, steam_id_str)
        if not steam_id_str:
            return []

    url = "https://api.steampowered.com/IPlayerService/GetOwnedGames/v1/"
    params = {"key": api_key, "steamid": steam_id_str}
    try:
        resp = requests.get(url, params=params).json()
    except Exception as e:
        return []

    games = resp.get("response", {}).get("games", [])
    game_data = [{"app_id": g.get("appid"), "playtime": round(g.get("playtime_forever", 0) / 60.0, 2)} for g in games]
    sorted_data = sorted(game_data, key=lambda x: x["playtime"], reverse=True)
    return sorted_data[:2]

6. 데이터베이스 저장

수집한 데이터를 Django ORM으로 데이터베이스에 저장합니다. 기존 리뷰 및 플레이타임 데이터는 삭제한 후, 새 데이터를 추가해 중복을 방지합니다.

코드 예시

with transaction.atomic():
    sp, _ = SteamProfile.objects.get_or_create(account=account)
    sp.is_review = is_review
    sp.is_playtime = is_playtime
    sp.save()

    SteamReview.objects.filter(account=account).delete()
    SteamPlaytime.objects.filter(account=account).delete()

    if is_review:
        for rd in review_data:
            SteamReview.objects.create(
                account=account,
                app_id=rd["app_id"]
            )
    if is_playtime:
        for pd in playtime_data:
            SteamPlaytime.objects.create(
                account=account,
                app_id=pd["app_id"],
            )
profile
안녕하세요.

0개의 댓글

관련 채용 정보