오렐리앙 제롱, 『Hands-On Machine Learning with Scikit-Learn, Keras & TensorFlow』, 박해선, 한빛미디어-OREILLY(2020), p503-540.
데이터셋 API
TFRecord 포맷
TF 변환
tf.data.Dataset
drop_remainder=True
- 모자란 마지막 배치를 버림
파일 여러 개를 무작위로 선택 → 파일에서 동시에 읽은 레코드를 돌아가면서 반환
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
for line in dataset.take(5):
print(line.numpy())
파일을 병렬로 읽고 싶을 때 (num_parallel_calls)
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
num_parallel_calls=tf.data.experimental.AUTOTUNE,
cycle_length=n_readers)
for line in dataset.take(5):
print(line.numpy())
n_inputs = 8 # X_train.shape[-1]
@tf.function
def preprocess(line):
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
decode_csv
: 스칼라 텐서의 리스트 반환def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
dataset = dataset.batch(batch_size)
return dataset.prefetch(1)
훈련 script로 파일을 다운로드해야 할 때, 시간 단축을 위해서 일반적으로 압축을 활성화함
훈련 script와 파일이 같은 공간에 있다면, 압축을 할 필요 없음 (CPU 자원 낭비)
# 압축 파일 쓰기
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
f.write(b"This is the first record")
f.write(b"And this is the second record")
# 압축 파일 읽기
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")
구글이 개발한 이식성과 확장성이 좋고 효율적인 이진 포맷
일반적으로는 protocol butter를 설치해야됨
from person_pb2 import Person
person = Person(name="Al", id=123, email=["a@b.com"]) # Person 생성
print(person) # Person 출력
# name: "Al"
# id: 123
# email: "a@b.com"
print(person.name) # Al
person.name = "Alice" # 필드 수정
person.email[0] # a@b.com 배열처럼 사용할 수 있는 반복 필드
person.email.append("c@d.com") # 이메일 추가
s = person.SerializeToString() # 바이트 문자열로 직렬화
print(s) # b'\n\x05Alice\x10{\x1a\x07a@b.com\x1a\x07c@d.com'
tensorflow는 tf.train.Example
타입의 protocol buffer를 만들고 파싱할 수 있는 함수를 제공
TFRecord 파일에서 주로 사용
tf.train.Example
protocol buffer 정의 예시
syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
packed = true
: 반복적인 수치 필드에 사용
Features
: 특성 이름과 특성값을 매핑한 dictionary를 가짐
# 적용 예시
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example
person_example = Example(
features=Features(
feature={
"name": Feature(bytes_list=BytesList(value=[b"Alice"])),
"id": Feature(int64_list=Int64List(value=[123])),
"emails": Feature(bytes_list=BytesList(value=[b"a@b.com", b"c@d.com"]))
}))
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
f.write(person_example.SerializeToString()) # 직렬화 후 TFRecord 파일에 저장
feature_description = {
"name": tf.io.FixedLenFeature([], tf.string, default_value=""),
"id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"emails": tf.io.VarLenFeature(tf.string),
}
for serialized_example in tf.data.TFRecordDataset(["my_contacts.tfrecord"]):
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
parse_single_example
: 직렬화된 문자열 스칼라 텐서, description 필요
FixedLenFeature
: 고정 길이 (텐서로 파싱)
VarLenFeature
: 가변 길이 (희소 텐서로 파싱)
이미지 처리 적용 예시
data = tf.io.encode_jpeg(img)
example_with_image = Example(features=Features(feature={
"image": Feature(bytes_list=BytesList(value=[data.numpy()]))}))
serialized_example = example_with_image.SerializeToString()
# then save to TFRecord
feature_description = { "image": tf.io.VarLenFeature(tf.string) }
example_with_image = tf.io.parse_single_example(serialized_example, feature_description)
decoded_img = tf.io.decode_jpeg(example_with_image["image"].values[0])
# decoded_img = tf.io.decode_image(example_with_image["image"].values[0])
tf.io.decode_image
: BMP, GIF, JPEG, PNG 이미지 디코딩 가능SequenceExample
syntax = "proto3";
message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
Features context = 1;
FeatureLists feature_lists = 2;
};
단일 : tf.io.parse_single_sequence_example()
배치 : tf.io.parse_sequence_example()
⇒ 문맥 특성(dict)과 특성 리스트(dict)를 담은 튜플 반환
tf.RaggedTensor.from_sparse
from tensorflow.train import FeatureList, FeatureLists, SequenceExample
context = Features(feature={
"author_id": Feature(int64_list=Int64List(value=[123])),
"title": Feature(bytes_list=BytesList(value=[b"A", b"desert", b"place", b"."])),
"pub_date": Feature(int64_list=Int64List(value=[1623, 12, 25]))
})
content = [["When", "shall", "we", "three", "meet", "again", "?"],
["In", "thunder", ",", "lightning", ",", "or", "in", "rain", "?"]]
comments = [["When", "the", "hurlyburly", "'s", "done", "."],
["When", "the", "battle", "'s", "lost", "and", "won", "."]]
def words_to_feature(words):
return Feature(bytes_list=BytesList(value=[word.encode("utf-8")
for word in words]))
content_features = [words_to_feature(sentence) for sentence in content]
comments_features = [words_to_feature(comment) for comment in comments]
sequence_example = SequenceExample(
context=context,
feature_lists=FeatureLists(feature_list={
"content": FeatureList(feature=content_features),
"comments": FeatureList(feature=comments_features)
}))
context_feature_descriptions = {
"author_id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"title": tf.io.VarLenFeature(tf.string),
"pub_date": tf.io.FixedLenFeature([3], tf.int64, default_value=[0, 0, 0]),
}
sequence_feature_descriptions = {
"content": tf.io.VarLenFeature(tf.string),
"comments": tf.io.VarLenFeature(tf.string),
}
parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(
serialized_sequence_example, context_feature_descriptions,
sequence_feature_descriptions)
class TextVectorization(keras.layers.Layer):
def __init__(self, max_vocabulary_size=1000, n_oov_buckets=100, dtype=tf.string, **kwargs):
super().__init__(dtype=dtype, **kwargs)
self.max_vocabulary_size = max_vocabulary_size
self.n_oov_buckets = n_oov_buckets
def adapt(self, data_sample):
self.vocab = get_vocabulary(data_sample, self.max_vocabulary_size)
words = tf.constant(self.vocab)
word_ids = tf.range(len(self.vocab), dtype=tf.int64) # 어휘 사전 정의
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
self.table = tf.lookup.StaticVocabularyTable(vocab_init, self.n_oov_buckets)
def call(self, inputs):
preprocessed_inputs = preprocess(inputs)
return self.table.lookup(preprocessed_inputs)
tf.lookup.KeyValueTensorInitializer
: 범주 리스트와 인덱스로 룩업 테이블용 초기화 객체tf.lookup.StaticVocabularyTable
: 어휘 사전에 있는 단어 인덱스 조회n_oov_buckets=100
⇒ 어휘 사전에 없는 범주를 찾으면 룩업 테이블이 계산한 범주의 해시값을 이용하여 oov 버킷 중 하나에 할당텐서플로 모델 상품화를 위한 end-to-end 플랫폼인 TFX의 일부분
모든 텐서플로 연산 사용 가능
전처리 연산을 한 번만 정의 가능
배포할 모델에 추가할 수 있도록 동일한 역할을 수행하는 텐서플로 함수를 생성
try:
import tensorflow_transform as tft
def preprocess(inputs): # inputs is a batch of input features
median_age = inputs["housing_median_age"]
ocean_proximity = inputs["ocean_proximity"]
standardized_age = tft.scale_to_z_score(median_age - tft.mean(median_age))
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return {
"standardized_median_age": standardized_age,
"ocean_proximity_id": ocean_proximity_id
}
except ImportError:
print("TF Transform is not installed. Try running: pip3 install -U tensorflow-transform")
@tf.function
??tf.data.experimental.prefetch_to_device()
: GPU에서 바로 prefetch 할 수 있음