특성 저장소 만들기
feast init my_feature_repo
cd my_feature_repo/feature_repo
피처 정의를 등록하고 피처 스토어를 설정
feast apply
UI 보기
feast ui
project = Project(name="my_feature_repo", description="A project for driver statistics")
driver = Entity(name="driver", join_keys=["driver_id"])
driver_id가 고유 식별자로 사용됩니다.driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path="data\driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
timestamp_field: 과거 시점 피처 조회 시 기준이 되는 컬럼.created_timestamp_column: 데이터 생성 시점을 추적할 컬럼.driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
],
online=True,
source=driver_stats_source,
tags={"team": "driver_performance"},
)
entities: 어떤 엔티티를 기준으로 피처를 조회할지.ttl: 피처의 유효 기간(Time-To-Live).online=True: 실시간 추론 시 사용할 수 있는 피처임을 의미.source: 데이터를 가져올 소스 정의.tags: 사용자 정의 메타데이터.input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
inputs로 들어오는 데이터프레임은 sources의 모든 피처가 합쳐진 형태.conv_rate에 실시간 요청값(val_to_add)을 더해 새로운 피처를 생성.driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[driver_stats_fv[["conv_rate"]], transformed_conv_rate],
logging_config=LoggingConfig(
destination=FileLoggingDestination(path="data")
),
)
logging_config: 피처 사용 로그를 저장할 위치를 지정.driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[...],
online=True,
source=driver_stats_push_source,
tags={"team": "driver_performance"},
)
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
from feast import FeatureStore
import pandas as pd
from datetime import datetime
entity_df = pd.DataFrame.from_dict({
"driver_id": [1001, 1002, 1003, 1004],
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
datetime(2021, 4, 12, 15, 1 , 12)
]
})
store = FeatureStore(repo_path="repo_path")
training_df = store.get_historical_features(
entity_df=entity_df,
features = [
'driver_hourly_stats:conv_rate',
'driver_hourly_stats:acc_rate',
'driver_hourly_stats:avg_daily_trips'
],
).to_df()
print(training_df.head())
driver_id가 특정 event_timestamp 시점에 어떤 피처 값을 가졌는지driver_hourly_stats)에서 지정한 피처를 각 엔티티 + 시점 기준으로 조회.to_df() → Pandas DataFrame 변환