이 글은 아래 글을 읽은 사람을 대상으로 쓰여졌습니다.
- 1편 : Feature Store - why?
- 2편 : Feature store 핵심 개념
- 3편: Feasture store 구조
aws feature store는 offline storage로 aws athena를 쓴다.
AWS athena에 데이터가 저장되면 glue에도 스키마가 남게된다.
그러면 EMR에서 hive metastore로 쓸 수 있기 때문에 spark등에서 쓸 수 있게 된다.
실제로 feature group을 추가하고 나면 athena에 테이블이 추가된 것이 보인다.
https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-introduction-notebook.html 참고 함
customers_feature_group = FeatureGroup(
name=customers_feature_group_name, sagemaker_session=sagemaker_session
)
customers_feature_group.load_feature_definitions(data_frame=customer_data)
customers_feature_group.create(
s3_uri=f"s3://{s3_bucket_name}/{prefix}",
record_identifier_name=record_identifier_feature_name,
event_time_feature_name="EventTime",
role_arn=role,
enable_online_store=True
)
load_feature_definitions
통해서 Feature Definition
을 정의함 create
를 통해 실제 Feature group을 정의하게 된다. customers_feature_group.ingest(
data_frame=customer_data, max_workers=3, wait=True
)
(sagemaker_session.
boto_session.
client('sagemaker', region_name=region).
list_feature_groups()
)# We use the boto client to list FeatureGroups
customers_feature_group.describe()
Feature group
과 Record Identifier name
에 해당하는 값을 get_record
를 통해 부르면 feature 값을 return 해준다. customer_id = 573291
sample_record = (sagemaker_session.
boto_session.
client('sagemaker-featurestore-runtime',region_name=region).
get_record(FeatureGroupName=customers_feature_group_name,
RecordIdentifierValueAsString=str(customer_id))
)
########
all_records = sagemaker_session.
boto_session.
client("sagemaker-featurestore-runtime", region_name=region).
batch_get_record(
Identifiers=[
{
"FeatureGroupName": customers_feature_group_name,
"RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
},
{
"FeatureGroupName": orders_feature_group_name,
"RecordIdentifiersValueAsString": ["573291", "109382", "828400", "124013"],
},
]
)
AWS feature store에 넣은 데이터들은 모두 athena + glue + s3에 저장된다.
as_hive_ddl
로 glue에 저장될 hive ddl을 알아낼 수 있다.
print(feature_group.as_hive_ddl())
table이름은 아래처럼 athena_query().table_name으로 얻을 수 있다.
identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()
identity_table = identity_query.table_name
transaction_table = transaction_query.table_name
SELECT *
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>
LIMIT 1000
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>)
WHERE row_num = 1;
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>)
WHERE row_num = 1 and NOT is_deleted;
SELECT *
FROM
(SELECT *,
row_number()
OVER (PARTITION BY <RecordIdentiferFeatureName>
ORDER BY <EventTimeFeatureName> desc, Api_Invocation_Time DESC, write_time DESC) AS row_num
FROM <FeatureGroup.DataCatalogConfig.DatabaseName>.<FeatureGroup.DataCatalogConfig.TableName>
where <EventTimeFeatureName> <= timestamp '<timestamp>')
-- replace timestamp '<timestamp>' with just <timestamp> if EventTimeFeature is of type fractional
WHERE row_num = 1 and
NOT is_deleted
[1]https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html
[2]https://aws.amazon.com/ko/sagemaker/feature-store/
[3]https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-feature-group.html
[4]https://aws.amazon.com/ko/sagemaker/pricing