[AWS] SageMaker Feature Store 사용해보기

Woong·2026년 3월 27일

기타

목록 보기
6/6
  • feature store 세션 설정
import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd
from datetime import datetime

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime",
    region_name=region
)
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=boto_session.client("sagemaker"),
    sagemaker_featurestore_runtime_client=featurestore_runtime
)
  • 데이터 생성
from datetime import datetime, timezone

data = pd.DataFrame({
    "user_id":         ["u001", "u002", "u003"],
    "purchase_count":  [5, 12, 3],
    "avg_order_value": [45.2, 120.5, 22.0],
    "event_time": [datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")] * 3
})
# Feature Store가 요구하는 필수 컬럼
data["event_time"] = data["event_time"].astype(str)
  • feature group 생성
feature_group_name = "user-stats-test"
fg = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=feature_store_session
)

# 스키마 자동 감지
fg.load_feature_definitions(data_frame=data)

# Feature group 생성 (offline store = S3, online store 활성화)
fg.create(
    s3_uri=f"s3://{sagemaker_session.default_bucket()}/feature-store-test",
    record_identifier_name="user_id",
    event_time_feature_name="event_time",
    role_arn=role,
    enable_online_store=True
)

# 생성 완료 대기 (약 1~2분)
import time
while fg.describe().get("FeatureGroupStatus") != "Created":
    print("waiting...")
    time.sleep(10)

# 데이터 적재
try:
    fg.ingest(data_frame=data, max_workers=1, wait=True)
except Exception as e:
    print(type(e))
    print(e)
    # IngestionManager 내부 에러 직접 확인
    import traceback
    traceback.print_exc()
  • feature group 가져와서 ingest 로 데이터 적재
# create() 호출 없이 기존 fg에 바로 연결
fg = FeatureGroup(
    name="user-stats-test",
    sagemaker_session=feature_store_session
)

# 상태 확인
print(fg.describe()["FeatureGroupStatus"])  # "Created" 여야 함

# ingest
fg.ingest(data_frame=data, max_workers=1, wait=True)
  • online store 조회
record = featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name,
    RecordIdentifierValueAsString="u001"
)
print(record["Record"])
  • offline store 는 glue 인데, suffix 가 붙어서 바로 가져오기 어려움
    • feature group 에서 table 명을 가져와서 처리해야 제대로 동작
# describe()로 테이블명 동적 조회 (권장)
desc = fg.describe()
actual_table = desc["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
database = desc["OfflineStoreConfig"]["DataCatalogConfig"]["Database"]

query = fg.athena_query()
query.run(
    query_string=f'SELECT * FROM "{database}"."{actual_table}"',
    output_location=f"s3://{sagemaker_session.default_bucket()}/athena-results/"
)
query.wait()
df = query.as_dataframe()
print(df)

0개의 댓글