model = keras.Sequential([
layers.Dense(64, activation='relu', name='first_layer'),
layers.Dense(32, activation='relu')
], name='sequential_model')
model.add(layers.Dense(10 ,activation='softmax', name='last_layer'))
model.weights 나 model.summary() 등은 build나 complie 이후에 볼 수 있다.
inputs = keras.Input(shape=(3,), name='input')
hidden = keras.Sequential([layers.Dense(32, activation='relu'), layers.Dense(64, activation='relu')])(inputs)
features = layers.Dense(64, activation='relu')(hidden)
outputs = layers.Dense(10, activation='softmax')(features)
model = keras.Model(inputs=inputs, outputs=outputs)
위와 같이 쓸 수 있다.
vocab_size = 10000
num_tags = 100
num_departments = 4
#3개의 입력
title = keras.Input(shape=(vocab_size,), name='title')
text_body = keras.Input(shape=(vocab_size,), name='text_body')
tags = keras.Input(shape=(num_tags,), name='tags')
features = layers.Concatenate()([title, text_body, tags])
features = layers.Dense(64, activation='relu')(features)
#2개의 출력
priority = layers.Dense(1, activation='sigmoid', name='priority')(features)
department = layers.Dense(num_departments, activation='softmax', name='department')(features)
model = keras.Model(inputs=[title, text_body, tags], outputs=[priority, department])
컴파일과 학습은 아래와 같이 할 수 있다.
model.compile(optimizer='rmsprop', loss={'priority':'mean_squared_error', 'department':'categorical_crossentropy'}, metrics={'priority': ['mean_absolute_error'], 'department': ['accuracy']})
model.fit({'title': title_data, 'text_body': text_body_data, 'tags': tags_data}, {'priority': priority_data, 'department': department_data})
priority_pred, department_pred = model.predict({'title': title_data, 'text_body': text_body_data, 'tags': tags_data})
keras.utils.plot_model(model, 'ticket_classifier.png', show_shapes=True, show_layer_activations=True)

features = model.layers[3].output #concatenate층의 output
difficulty = layers.Dense(3, activation='softmax', name='difficulty')(features)
model = keras.Model(inputs=[title, text_body, tags], outputs=[priority, department, difficulty])
keras.utils.plot_model(model, 'ticket_classifier.png', show_shapes=True, show_layer_activations=True)

class CustomerTicketModel(keras.Model):
def __init__(self, num_departments):
super().__init__()
self.concat_layer = layers.Concatenate()
self.mixing_layer = layers.Dense(64, activation='relu')
self.priority_score = layers.Dense(1, activation='sigmoid', name='priority')
self.department_classifier = layers.Dense(num_departments, activation='softmax', name='department')
def call(self, inputs):
title = inputs['title']
text_body = inputs['text_body']
tags = inputs['tags']
features = self.concat_layer([title, text_body, tags])
features = self.mixing_layer(features)
priority = self.priority_score(features)
department = self.department_classifier(features)
#output을 반환
return {'priority': priority, 'department': department}
컴파일 및 학습, 그리고 예측은 동일하게 가능하다.
model = CustomerTicketModel(num_departments=num_departments)
model.compile(optimizer='rmsprop', loss={'priority':'mean_squared_error', 'department':'categorical_crossentropy'}, metrics={'priority': ['mean_absolute_error'], 'department': ['accuracy']})
model.fit({'title': title_data, 'text_body': text_body_data, 'tags': tags_data}, {'priority': priority_data, 'department': department_data})
model.evaluate({'title': title_test_data, 'text_body': text_body_test_data, 'tags': tags_test_data}, {'priority': priority_test_data, 'department': department_test_data})
함수형 모델에 서브클래스나 시퀀셜을 이용할 수 있다.
class Classifier(keras.Model):
def __init__(self, num_classes=2):
super().__init__()
if num_classes == 2:
num_units = 1
activation = "sigmoid"
else:
num_units = num_classes
activation = "softmax"
self.dense = layers.Dense(num_units, activation=activation)
def call(self, inputs):
return self.dense(inputs)
inputs = keras.Input(shape=(3,))
features = layers.Dense(64, activation="relu")(inputs)
outputs = Classifier(num_classes=10)(features)
model = keras.Model(inputs=inputs, outputs=outputs)
또 서브클래스의 일부로 함수형 모델을 사용할 수있다.
inputs = keras.Input(shape=(64,))
outputs = layers.Dense(1, activation="sigmoid")(inputs)
binary_classifier = keras.Model(inputs=inputs, outputs=outputs)
class MyModel(keras.Model):
def __init__(self, num_classes=2):
super().__init__()
self.dense = layers.Dense(64, activation="relu")
self.classifier = binary_classifier
def call(self, inputs):
features = self.dense(inputs)
return self.classifier(features)
model = MyModel()
from numpy import float32
class RootMeanSquaredError(keras.metrics.Metric):
def __init__(self, name="rmse", **kwargs):
super().__init__(name=name, **kwargs)
#add_weight를 활용해 변수 정의
self.mse_sum = self.add_weight(name='mse_sum', initializer='zeros')
self.total_samples = self.add_weight(name='total_samples', initializer='zeros', dtype='int32')
def update_state(self, y_true, y_pred, sample_weight=None):
y_true = tf.one_hot(y_true, depth=y_pred.shape[1])
mse = tf.reduce_sum(tf.square(y_pred - y_true))
self.mse_sum.assign_add(mse)
num_samples = tf.shape(y_pred)[1] #y_pred.shape은 (None, 10)이기 때문에 못 쓰는데 이건 왜 되는지 모르겠다. 정적이라서 그런 듯
self.total_samples.assign_add(num_samples)
def result(self):
#결과 반환
return tf.sqrt(self.mse_sum / tf.cast(self.total_samples, tf.float32))
def reset_state(self):
#초기화하여 재사용
self.mse_sum.assign(0.)
self.total_samples.assign(0)
컴파일 할 때 추가해주면 사용가능하다.
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy', RootMeanSquaredError()])
Callback의 가장 자주 쓰이는 옵션들은 다음과 같다.
Callbacks - Keras Documentation
이외의 것들은 위의 링크에서 확인할 수 있다.
EarlyStopping과 ModelCheckpoint를 함께 쓰는 것이 가장 흔하다.
callbacks_list = [
keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=2),
keras.callbacks.ModelCheckpoint(filepath='checkpoint_path.keras', monitor='val_loss', save_best_only=True)
]
model = get_mnist_model()
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(train_imgs, train_labels, callbacks=callbacks_list, validation_data=(val_imgs, val_labels), epochs=3)
다음과 같이 여러 지점에서 호출될 메서드를 구현할 수 있다.
on_epoch_begin(epoch, logs) on_epoch_end(epoch, logs) on_batch_begin(epoch, logs) on_batch_end(epoch, logs) on_train_begin(epoch, logs) on_train_end(epoch, logs)
import matplotlib.pyplot as plt
class LossHistory(keras.callbacks.Callback):
def on_train_begin(self, logs):
self.per_batch_loss = []
def on_batch_end(self, batch, logs):
self.per_batch_loss.append(logs.get('loss'))
def on_epoch_end(self, epoch, logs):
plt.clf()
plt.plot(range(len(self.per_batch_loss)), self.per_batch_loss, label='Training loss for each batch')
plt.xlabel(f'Batch (epoch {epoch})')
plt.ylabel('Loss')
plt.legend()
plt.savefig(f'plt_at_epoch_{epoch}')
self.per_batch_loss = []
훈련하는 동안 모델 내부의 동작을 모니터링하기 좋은 도구이다.
사용법은 아래와 같다.
%load_ext tensorboard
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
tensorboard = keras.callbacks.TensorBoard(log_dir='./log_dir')
#callback에 추가한다.
model.fit(train_imgs, train_labels, callbacks=[tensorboard], validation_data=(val_imgs, val_labels), epochs=10)
%tensorboard --logdir ./log_dir

model.fit() 메서드는 supervised 학습에 치중되어 있기 때문에 이외의 경우 직접 훈련 로직을 작성할 수도 있다. tf.function 데코레이터를 사용하면 라인 단위로 실행되지 않아 전역적인 최적화가 가능하다.
loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_tracking_metric = keras.metrics.Mean()
@tf.function
def train_step(inputs, targets):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
loss = loss_fn(targets, predictions)
gradients = tape.gradient(loss, model.trainable_weights)
optimizer.apply_gradients(zip(gradients, model.trainable_weights)) #둘 다 list임.
logs = {}
for metric in metrics:
metric.update_state(targets, predictions)
logs[metric.name] = metric.result()
loss_tracking_metric.update_state(loss)
logs["loss"] = loss_tracking_metric.result()
return logs
def reset_metrics():
for metric in metrics:
metric.reset_state()
loss_tracking_metric.reset_state()
training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3
for epoch in range(epochs):
reset_metrics()
for inputs_batch, targets_batch in training_dataset:
logs = train_step(inputs_batch, targets_batch)
print(f"{epoch}번째 에포크 결과")
for key, value in logs.items():
print(f"...{key}: {value:.4f}")
keras.Model의 fit메서드에서는 배치 데이터마다 train_step을 호출한다. 또 evaluate메서드는 test_step을 호출한다.
여기서는 프레임워크가 알아서 처리하기 때문에 @tf.function을 쓸 필요는 전혀 없다.
class CustomModel(keras.Model):
def train_step(self, data):
inputs, targets = data
with tf.GradientTape() as tape:
predictions = self(inputs, training=True)
loss = self.compiled_loss(targets, predictions)
gradients = tape.gradient(loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
self.compiled_metrics.update_state(targets, predictions)
return {m.name: m.result() for m in self.metrics}
def test_step(inputs, targets):
predictions = self(inputs, training=False)
loss = self.compiled_loss(targets, predictions)
self.compiled_metrics.update_state(targets, predictions)
return {m.name: m.result() for m in self.metrics}
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
#아래와 같이 함수형 API에 서브클래싱된 클래스를 사용할 수 있다.
model = CustomModel(inputs, outputs)
model.compile(optimizer=keras.optimizers.RMSprop(),
loss=keras.losses.SparseCategoricalCrossentropy(),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
history = model.fit(train_images, train_labels, epochs=3)
result = model.evaluate(test_images, test_labels)