펭귄 데이터 세트를 사용한 간단한 TFX 파이프라인 튜토리얼
pip install -U tfx
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
>>>>>>>>>>>>>>>>
TensorFlow version: 2.6.2
TFX version: 1.4.0
import os
PIPELINE_NAME = "penguin-simple"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
os.path.join
os.path.join 함수는 운영체제에 맞게 폴더 구분자를 다뤄서 경로를 생성
# Windows
import os
os.path.join('a', 'b', 'c')
>>>
a\b\c
→ 사실 그동안 슬래쉬 그으면 되는걸 왜 쓰나 했는데 운영체제에 따라 달라지는 것을 고려한 것!
absl
Google의 내부 코드베이스의 가장 근반이되는 부분으로부터 만들어진 C++ 라이브러리의 오픈소스 모음!
import urllib.request
import tempfile
DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data') # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
[urllib](https://docs.python.org/3/library/urllib.html#module-urllib)
URLs와 함께 작동하기 위해 몇가지 모듈들을 모은 패키지
requests
라이브러리와 큰 차이는 없음
[tempfile](https://docs.python.org/ko/3/library/tempfile.html)
임시 파일과 디렉토리 생성
- 임시 파일이면 만들어졌다가 없어지는건가?
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
TensorFlow로 데이터를 전처리하기 위한 라이브러리
- 다음과 같이 전체 패스가 필요한 데이터에 유용
- 평균 및 표준 편차로 입력 값을 정규화
- 모든 입력 값에 대해 어휘를 생성하여 문자열을 정수로 변환
- 관찰된 데이터 분포를 기반으로 수레를 버킷에 할당하여 부동 소수점을 정수로 변환
- Apache Beam과 Apache Arrow가 종속성으로 필요한 것이 특징
tfx_bsl.public.tfxio
TFXIO는 모든 TFX 라이브러리 및 구성 요소가 공유하는 공통 메모리 내 데이터 표현과 이러한 표현을 생성하기 위한 I/O 추상화 계층을 정의
schema_pb2
Main aliases: tfmd.proto.v0.schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
tf.io.FixedLenFeature
고정된 길이의 input feature를 파싱하기 위한 구성요소
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
학습을 위한 특징들과 라벨을 생성함
file_pattern
: 인풋 tfrecord 파일들의 경로들 혹은 패턴의 리스트data_accessor
: input을 RecordBatch로 전환하기 위한 DataAccessorshcema
: 인풋 데이터의 스키마batch_size
: 단일 배치에서 결합하기위해 리턴된 데이터셋의 연이은 요소의 수를 대표 → 우리가 일반적으로 아는 배치사이즈. 말로 푸니까 되게 어렵네return
: 데이터셋 튜플 (features, indices)
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
schema_from_feature_spec
은 매우 단순한 feature_spce에서부터 스키마를 제공하는데 사용될 수 있음. 하지만 리턴된 스키마는 매우 primitive(원시적)할 것임TFX 파이프라인을 생성하는 함수를 정의.
Pipeline
객체는 하나의 TFX pipeline을 대표함. 이 파이프라인은 TFX가 지원하는 pipeline orchestration system 중 하나를 이용하여 작동시킬 수 있음
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
LocalDagRunner
를 사용LocalDagRunner
: 개발 및 디버깅을 위한 빠른 반복을 제공LocalDagRunner
를 생성하고 우리가 이미 정의한 함수에 의해 생성된 Pipeline
객체를 전달함. 파이프라인은 곧장 실행되고 ML model training을 포함하여 pipeline의 진행 로그를 볼 수 있음tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Component Pusher is finished.
를 마지막 로그에서 볼 수 있음. 왜냐하면 Pusher
컴포넌트는 파이프라인의 마지막 구성요소임serving_model/penguin-simple
) 에 푸시함