[1] import & Utilities
시간에 대한 값을 플로팅할 수 있는 함수와 추세, 계절성, 노이즈를 만드는 유틸리티 함수를 정의한다.
이 함수를 통해서 시계열 데이터 및 모델 예측을 시각화하고 합성 데이터를 생성 한다.
def plot_series(time, series, format="-", start=0, end=None):
"""
Visualizes time series data
Args:
time (array of int) - contains the time steps
series (array of int) - contains the measurements for each time step
format - line style when plotting the graph
label - tag for the line
start - first time step to plot
end - last time step to plot
"""
# Setup dimensions of the graph figure
plt.figure(figsize=(10, 6))
if type(series) is tuple:
for series_num in series:
# Plot the time series data
plt.plot(time[start:end], series_num[start:end], format)
else:
# Plot the time series data
plt.plot(time[start:end], series[start:end], format)
# Label the x-axis
plt.xlabel("Time")
# Label the y-axis
plt.ylabel("Value")
# Overlay a grid on the graph
plt.grid(True)
# Draw the graph on screen
plt.show()
def trend(time, slope=0):
"""
Generates synthetic data that follows a straight line given a slope value.
Args:
time (array of int) - contains the time steps
slope (float) - determines the direction and steepness of the line
Returns:
series (array of float) - measurements that follow a straight line
"""
# Compute the linear series given the slope
series = slope * time
return series
def seasonal_pattern(season_time):
"""
Just an arbitrary pattern, you can change it if you wish
Args:
season_time (array of float) - contains the measurements per time step
Returns:
data_pattern (array of float) - contains revised measurement values according
to the defined pattern
"""
# Generate the values using an arbitrary pattern
data_pattern = np.where(season_time < 0.4,
np.cos(season_time * 2 * np.pi),
1 / np.exp(3 * season_time))
return data_pattern
def seasonality(time, period, amplitude=1, phase=0):
"""
Repeats the same pattern at each period
Args:
time (array of int) - contains the time steps
period (int) - number of time steps before the pattern repeats
amplitude (int) - peak measured value in a period
phase (int) - number of time steps to shift the measured values
Returns:
data_pattern (array of float) - seasonal data scaled by the defined amplitude
"""
# Define the measured values per period
season_time = ((time + phase) % period) / period
# Generates the seasonal data scaled by the defined amplitude
data_pattern = amplitude * seasonal_pattern(season_time)
return data_pattern
def noise(time, noise_level=1, seed=None):
"""Generates a normally distributed noisy signal
Args:
time (array of int) - contains the time steps
noise_level (float) - scaling factor for the generated signal
seed (int) - number generator seed for repeatability
Returns:
noise (array of float) - the noisy signal
"""
# Initialize the random number generator
rnd = np.random.RandomState(seed)
# Generate a random number for each time step and scale by the noise level
noise = rnd.randn(len(time)) * noise_level
return noise
[2] Generate the Synthetic Data
시계열 합성 데이터를 생성한다. 여기에는 추세, 계절성, 노이즈가 포함된 1,461개의 데이터 포인트가 포함된다.
# Parameters
time = np.arange(4 * 365 + 1, dtype="float32")
baseline = 10
amplitude = 40
slope = 0.05
noise_level = 5
# Create the series
series = baseline + trend(time, slope) + seasonality(time, period=365, amplitude=amplitude)
# Update with noise
series += noise(time, noise_level, seed=42)
# Plot the results
plot_series(time, series)
[3] Split the Dataset
split_time =1000
time_train= time[:split_time]
x_train = series[:split_time]
time_valid = time[split_time:]
x_valid = series[split_time:]
print(x_train.shape)
print(x_valid.shape)
# output
(1000,)
(461,)
# Plot the train set
plot_series(time_train, x_train)
# Plot the validation set
plot_series(time_valid, x_valid)
[4] Prepare features and labels
def windowed_dataset(series, window_size, batch_size, shuffle_buffer):
"""Generates dataset windows
Args:
series (array of float) - contains the values of the time series
window_size (int) - the number of time steps to include in the feature
batch_size (int) - the batch size
shuffle_buffer(int) - buffer size to use for the shuffle method
Returns:
dataset (TF Dataset) - TF Dataset containing time windows
"""
# Generate a TF Dataset from the series values
dataset = tf.data.Dataset.from_tensor_slices(series)
# Window the data but only take those with the specified size
dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
# Flatten the windows by putting its elements in a single batch
dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
# Create tuples with features and labels
dataset = dataset.map(lambda window: (window[:-1], window[-1]))
# Shuffle the windows
dataset = dataset.shuffle(shuffle_buffer)
# Create batches of windows
dataset = dataset.batch(batch_size).prefetch(1)
return dataset
# Parameters
window_size = 20
batch_size = 32
shuffle_buffer_size = 1000
dataset = windowed_dataset(x_train, window_size, batch_size, shuffle_buffer_size)
이제 훈련 세트에서 데이터 세트 window를 생성할 수 있다.
# Print properties of a single batch
for windows in dataset.take(1):
print(f'data type: {type(windows)}')
print(f'number of elements in the tuple: {len(windows)}')
print(f'shape of first element: {windows[0].shape}')
print(f'shape of second element: {windows[1].shape}')
# output
data type: <class 'tuple'>
number of elements in the tuple: 2
shape of first element: (32, 20)
shape of second element: (32,)
2024-04-18 10:01:44.540294: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
출력을 통해서 함수가 예상대로 작동하는지 확인한다.
tf.data.Dataset API의 take() 메소드를 사해 단일 배치를 가져와서, 해당 요소의 데이터 유형 및 모양과 같은 이 배치의 여러 속성을 프린트해본다.
예상한 대로 2요소 튜플(예: (리처, 레이블))이 있어야 하며 이들의 모양은 이전에 선언한 배치 및 창 크기(기본적으로 각각 32 및 20)와 일치해야 한다.
[5] Build and compile the model
l0 = tf.keras.layers.Dense(units=1, input_shape=[window_size])
model = tf.keras.models.Sequential([l0]
model.compile(loss='mse', optimizer=tf.keras.optimizers.SGD(momentum=0.9, learning_rate=1e-6))
print(f"Layer weights : {l0.get_weights()}")
model.summary()
# output
Layer wieghts : [array([[-0.06430715],
[-0.43933353],
[ 0.0349232 ],
[-0.30867416],
[-0.28906775],
[ 0.16387522],
[ 0.03538144],
[-0.14311546],
[ 0.1772303 ],
[ 0.03114396],
[-0.48953128],
[-0.0481953 ],
[ 0.42053586],
[-0.26347485],
[-0.26953095],
[ 0.42382163],
[ 0.2824847 ],
[-0.16823238],
[ 0.19963837],
[-0.0442909 ]], dtype=float32), array([0.], dtype=float32)]
[6] Train the Model
이제 모델 학습을 진행하기 위해 준비된 데이터로 100 epoch 동안 학습을 진행한다.
history = model.fit(dataset, epochs=100)
get_weights() 메서드를 다시 호출하면 최종 가중치를 볼 수 있다.
print(f"Layer weights -> {l0.get_weights()}")
# output
Layer weights -> [array([[-1.49012934e-02],
[-5.41206449e-02],
[ 7.94099048e-02],
[-3.35199684e-02],
[-3.66221159e-03],
[ 3.79930586e-02],
[-7.20266835e-05],
[-1.97986476e-02],
[ 3.43745314e-02],
[ 3.04847732e-02],
[-8.96111652e-02],
[-2.18748581e-04],
[ 5.89985587e-02],
[-1.62686165e-02],
[-4.22310904e-02],
[ 1.18681304e-01],
[ 7.76400790e-02],
[ 1.20957568e-01],
[ 2.88641304e-01],
[ 4.29465711e-01]], dtype=float32), array([0.0168612], dtype=float32)]
model prediction
series[0:20]
# output
array([52.48357 , 49.35275 , 53.314735, 57.711823, 48.934444, 48.931244,
57.982895, 53.897125, 47.67393 , 52.68371 , 47.591717, 47.506374,
50.959415, 40.086178, 40.919415, 46.612473, 44.228207, 50.720642,
44.454983, 41.76799 ], dtype=float32)
series[0:20][np.newaxis]
# output
array([[52.48357 , 49.35275 , 53.314735, 57.711823, 48.934444, 48.931244,
57.982895, 53.897125, 47.67393 , 52.68371 , 47.591717, 47.506374,
50.959415, 40.086178, 40.919415, 46.612473, 44.228207, 50.720642,
44.454983, 41.76799 ]], dtype=float32)
np.expand_dims(series[0:20], axis=0)
# output
array([[52.48357 , 49.35275 , 53.314735, 57.711823, 48.934444, 48.931244,
57.982895, 53.897125, 47.67393 , 52.68371 , 47.591717, 47.506374,
50.959415, 40.086178, 40.919415, 46.612473, 44.228207, 50.720642,
44.454983, 41.76799 ]], dtype=float32)
# Shape of the first 20 data points slice
print(f'shape of series[0:20]: {series[0:20].shape}')
# Shape after adding a batch dimension
print(f'shape of series[0:20][np.newaxis]: {series[0:20][np.newaxis].shape}')
# Shape after adding a batch dimension (alternate way)
print(f'shape of series[0:20][np.newaxis]: {np.expand_dims(series[0:20], axis=0).shape}')
# Sample model prediction
print(f'model prediction: {model.predict(series[0:20][np.newaxis])}')
# output
shape of series[0:20]: (20,)
shape of series[0:20][np.newaxis]: (1, 20)
shape of series[0:20][np.newaxis]: (1, 20)
1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 28ms/step
model prediction: [[44.937992]]
기본적으로 전체 계열을 한 번에 20포인트씩 모델에 공급하고 모든 결과를 예측 목록에 추가한다. 그런 다음 검증 세트에 해당하는 포인트를 분할한다.
# Initialize a list
forecast = []
# Use the model to predict data points per window size
for time in range(len(series) - window_size):
forecast.append(model.predict(series[time:time + window_size][np.newaxis]))
# Slice the points that are aligned with the validation set
forecast = forecast[split_time - window_size:]
# Compare number of elements in the predictions and the validation set
print(f'length of the forecast list: {len(forecast)}')
print(f'shape of the validation set: {x_valid.shape}')
# output
length of the forecast list: 461
shape of the validation set: (461,)
아래의 슬라이스 인덱스는 Split_time - window_size인데, 예측 목록이 계열보다 20포인트(즉, window 크기)만큼 작기 때문이다. window 크기가 20이므로 예측 목록의 첫 번째 데이터 포인트는 인덱스 20의 시간에 대한 예측에 해당한다. 인덱스 0~19는 창 크기보다 작기 때문에 예측을 할 수 없다. 따라서 Split_time - window_size:로 분할하면 검증 세트의 시간 인덱스와 일치하는 시간 인덱스의 포인트를 얻게 된다.
결과를 시각화하려면 예측을 plot_series() 유틸리티 함수가 허용하는 형식으로 변환한다. 여기에는 배열을 numpy 배열로 변환하고 1차원 축을 삭제하면 된다.
# Preview shapes after using the conversion and squeeze methods
print(f'shape after converting to numpy array: {np.array(forecast).shape}')
print(f'shape after squeezing: {np.array(forecast).squeeze().shape}')
# Convert to a numpy array and drop single dimensional axes
results = np.array(forecast).squeeze()
# Overlay the results with the validation set
plot_series(time_valid, (x_valid, results))
# outout
shape after converting to numpy array: (461, 1, 1)
shape after squeezing: (461,)
이전과 동일한 함수를 호출하여 측정항목을 계산할 수 있다.
현재 MAE는 5에 가깝다.
# Compute the metrics
print(tf.keras.metrics.mean_squared_error(x_valid, results).numpy())
print(tf.keras.metrics.mean_absolute_error(x_valid, results).numpy())
# output
45.70831
5.035314
여기서는 시계열 데이터를 기반으로 단일 계층 신경망을 구축하고 학습했다. window 데이터를 준비하여 모델에 제공하면 최종 예측은 1주차에 수행한 통계 분석과 비슷한 결과를 보여준다. 다음은 더 많은 레이어를 추가하고 다음과 같은 경우에 수행할 수 있는 몇 가지 최적화를 반영하여 모델을 학습시켜보도록 한다.