import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd
from datetime import datetime
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
featurestore_runtime = boto_session.client(
service_name="sagemaker-featurestore-runtime",
region_name=region
)
feature_store_session = Session(
boto_session=boto_session,
sagemaker_client=boto_session.client("sagemaker"),
sagemaker_featurestore_runtime_client=featurestore_runtime
)
from datetime import datetime, timezone
data = pd.DataFrame({
"user_id": ["u001", "u002", "u003"],
"purchase_count": [5, 12, 3],
"avg_order_value": [45.2, 120.5, 22.0],
"event_time": [datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")] * 3
})
# Feature Store가 요구하는 필수 컬럼
data["event_time"] = data["event_time"].astype(str)
feature_group_name = "user-stats-test"
fg = FeatureGroup(
name=feature_group_name,
sagemaker_session=feature_store_session
)
# 스키마 자동 감지
fg.load_feature_definitions(data_frame=data)
# Feature group 생성 (offline store = S3, online store 활성화)
fg.create(
s3_uri=f"s3://{sagemaker_session.default_bucket()}/feature-store-test",
record_identifier_name="user_id",
event_time_feature_name="event_time",
role_arn=role,
enable_online_store=True
)
# 생성 완료 대기 (약 1~2분)
import time
while fg.describe().get("FeatureGroupStatus") != "Created":
print("waiting...")
time.sleep(10)
# 데이터 적재
try:
fg.ingest(data_frame=data, max_workers=1, wait=True)
except Exception as e:
print(type(e))
print(e)
# IngestionManager 내부 에러 직접 확인
import traceback
traceback.print_exc()
- feature group 가져와서 ingest 로 데이터 적재
fg = FeatureGroup(
name="user-stats-test",
sagemaker_session=feature_store_session
)
print(fg.describe()["FeatureGroupStatus"])
fg.ingest(data_frame=data, max_workers=1, wait=True)
record = featurestore_runtime.get_record(
FeatureGroupName=feature_group_name,
RecordIdentifierValueAsString="u001"
)
print(record["Record"])
- offline store 는 glue 인데, suffix 가 붙어서 바로 가져오기 어려움
- feature group 에서 table 명을 가져와서 처리해야 제대로 동작
desc = fg.describe()
actual_table = desc["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
database = desc["OfflineStoreConfig"]["DataCatalogConfig"]["Database"]
query = fg.athena_query()
query.run(
query_string=f'SELECT * FROM "{database}"."{actual_table}"',
output_location=f"s3://{sagemaker_session.default_bucket()}/athena-results/"
)
query.wait()
df = query.as_dataframe()
print(df)