
Import the required packages
Insert here all the packages you require, so in case they are not found an error will be shown before any other operation is performed.
# import the required packages
import os
import time
import math
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import numpy as npSet hyperparameters and options
Set here your hyperparameters (to be used later in the code), so that you can run and compare different experiments operating on these values.
Note
A better alternative would be to use command-line arguments to set hyperparameters and other options (see argparse Python package)
Note
In this small experiment, curve data will be organized into one “mega-batch” with each sample being a set [seq_length … seq_length] of chunks. In a larger (more realistic) experiment, a data loader would fetch chunks directly with no hidden state continuity (random sampling = implicit BPTT) or with hidden state continuity (time-continuous sampling + .detach = truncated BPTT)
# hyperparameters
batch_size = 10
learning_rate = 0.002
epochs = 100
hidden_neurons = 50
num_layers = 2 # number of LSTM layers
seq_length = 20 # length of sequence at training phase
num_samples = 500 # number of (groundtruth) curve points for training
num_samples_to_generate = 500 # number of (generated) curve points for inference
sine_period = 25.0 # sine period in samples (distance between peaks)
# try 100 and see how the RNN learns (hint: it will not learn in this case because the pattern is too long)
# original curve to learn: a sin wave (but it could have been any other curve)
t = np.arange(num_samples, dtype='float32')
curve_original = np.sin(2.0 * math.pi * t / sine_period).astype('float32')
# curve used for trained = original curve + random noise, so that the RNN does not get overconfident
# on the (perfect) input provided
curve = torch.from_numpy(curve_original + 0.05*np.random.randn(num_samples).astype('float32'))
# options
seed = 1.0 # seed (starting point) for generating the curve at test time
device = "cuda:0" # force GPU with ID 0# sanity check: verify the generated training wave before reshaping
# This supports both execution orders: before or after the reshape cell.
curve_noisy_np = curve.detach().cpu().numpy().reshape(-1)
curve_clean_np = curve_original[:curve_noisy_np.shape[0]]
noise = curve_noisy_np - curve_clean_np
print(f"curve shape (raw tensor): {tuple(curve.shape)}")
print(f"curve length (flattened): {curve_noisy_np.shape[0]}")
print(f"curve_original min/max: {curve_clean_np.min():.3f} / {curve_clean_np.max():.3f}")
print(f"curve_noisy min/max: {curve_noisy_np.min():.3f} / {curve_noisy_np.max():.3f}")
print(f"noise mean/std: {noise.mean():.4f} / {noise.std():.4f}")
assert np.isfinite(curve_noisy_np).all(), "Found NaN/Inf in generated training curve."
assert np.max(np.abs(curve_clean_np)) > 0.9, "The clean wave does not look like a sine in [-1, 1]."
fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
axes[0].plot(curve_clean_np, label='clean sine', linewidth=2)
axes[0].plot(curve_noisy_np, label='noisy training wave', linewidth=1, alpha=0.8)
axes[0].set_ylabel('amplitude')
axes[0].set_title('Sanity check of generated training signal')
axes[0].legend(loc='upper right')
axes[0].grid(alpha=0.3)
axes[1].plot(noise, color='tab:red', linewidth=1)
axes[1].set_xlabel('sample index')
axes[1].set_ylabel('noise')
axes[1].set_title('Injected noise = noisy - clean')
axes[1].grid(alpha=0.3)
plt.tight_layout()
plt.show()Define the model architecture
Define here your network.
Note
A better alternative would be to have a pool of network architectures defined in a python file (module) that one could import.
class TimeSeriesEstimator(nn.Module):
def __init__(self, hidden_size, num_layers):
super(TimeSeriesEstimator, self).__init__()
# LSTM layer(s)
# using batch_first = true, inputs/outputs will have size (batch_size, sequence_length, <input or hidden>_size)
self.lstm = nn.LSTM(1, hidden_size, num_layers, batch_first=True)
# last fully connected layer to map the hidden neurons of the last LSTM layer to the
# the number of outputs (1 = next curve point)
# this can serve for both many-to-one or many-to-many strategies
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x, h):
# Forward propagate LSTM
# out are the LSTM hidden states for all time steps
# (h, c) are the LSTM hidden and cell/memory states for the last time step only
out, (h, c) = self.lstm(x, h)
# Reshape 3D output to 2D(batch_size*sequence_length, hidden_size)
out = out.reshape(out.size(0)*out.size(1), out.size(2))
# Decode hidden states of all time steps: this is a many-to-many or
# (better) just a parallelized version of one-to-one
out = self.linear(out)
return out, (h, c)Create datasets
In this application, only a training dataset is created. There is no strict need for a testing dataset, since curve points will be generated from scratch
# trim training set to have length multiple of batch size
print(f'original curve points are {len(curve)}')
curve = curve[:int(len(curve) / batch_size)*batch_size]
print(f'trimmed curve points are {len(curve)} (now multiple of batch size {batch_size})')
# reshape curve points (1D tensor) to a 2D tensor of size batch_size x num_batches
print(f'reshape curve points from {curve.shape} to {curve.reshape(batch_size, -1).shape}')
curve = curve.reshape(batch_size, -1)
# since one batch sample is a sequence of curve points
# the actual number of batches is
num_chunks_per_sample = curve.size(1) / seq_length
print(f'no. of chunks per batch sample is {curve.size(1)} / {seq_length} = {num_chunks_per_sample}')
# add a third dimension since RNNs/LSTMs want a 3D tensor (batch_size, sequence_length, input_size)
# where input_size = 1 in this case
curve = curve.unsqueeze(2)
print('reshape curve points so as to be 3D tensor %s' % (curve.shape,))Create the building blocks for training
Create an instance of the network, the loss function, the optimizer, and learning rate scheduler.
# enforce CUDA usage on GPU 0 and verify it works
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available. This notebook requires GPU ID 0 (cuda:0).")
if device != "cuda:0":
raise ValueError(f"Invalid device '{device}'. Expected 'cuda:0'.")
torch.cuda.set_device(0)
active_gpu = torch.cuda.current_device()
if active_gpu != 0:
raise RuntimeError(f"Wrong active GPU: {active_gpu}. Expected GPU 0.")
# quick CUDA sanity check: tensor allocation and simple op on GPU 0
_cuda_test = torch.tensor([1.0], device=device)
_cuda_test = _cuda_test * 2.0
assert _cuda_test.item() == 2.0, "CUDA sanity check failed on GPU 0."
print(f"CUDA check OK | device={device} | name={torch.cuda.get_device_name(0)}")
# create the network
net = TimeSeriesEstimator(hidden_neurons, num_layers)
# create loss function
criterion = nn.MSELoss()
# create Adam optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
# create learning rate scheduler
# ...optional for this experiment
# experiment ID
experiment_ID = "%s_%s_%s_%s_bs(%d)lr(%.4f)e(%d)" % ("sin", type(net).__name__, type(criterion).__name__, type(optimizer).__name__, batch_size, learning_rate, epochs)Train
Many-to-many strategy.
# reset performance monitors
losses = []
ticks = []
# move net to device
net.to(device)
net.train()
# start training
for epoch in range(1, epochs+1):
# reset performance measures
loss_sum = 0.0
# measure time elapsed
t0 = time.time()
# differently from traditional ANNs, RNNs take in input also the previous
# states. So we need to initialize the '0th' state in some manner, e.g.
# by sampling from a gaussian/uniform distribution or even by setting all 0s
# in this case we have 2 different states (hidden and cell/memory)
states = (torch.zeros(num_layers, batch_size, hidden_neurons).to(device),
torch.zeros(num_layers, batch_size, hidden_neurons).to(device))
# no dataloader / one mega-batch
# --> we loop over time chunks (of size seq_length)
for i in range(0, curve.size(1) - seq_length, seq_length):
# get batch inputs and targets and send them to device
# targets have +1 sample(point) displacement w.r.t. to inputs
# indeed we want to predict the next point given the previous seq_length points
inputs = curve[:, i:i+seq_length].to(device)
targets = curve[:, (i+1):(i+1)+seq_length].to(device)
# truncated backpropagation
# we do not want to backpropagate towards the previous sequences
# left to the beginning, which would result in gradient vanishing/exploding
# so we 'detach' the states from the computational graph, so that autograd
# will ignore the previous iterations
# NOTE: this is different from .zero_grad(), which only clears the previous gradients
# but DOES NOT modify the computational graph
states = states[0].detach(), states[1].detach()
# zero the parameter gradients
optimizer.zero_grad()
# forward pass
outputs, states = net(inputs, states)
# calculate loss
# outputs is 2D tensor (batch_size*seq_length, 1)
# targets is 2D tensor (batch_size, seq_length)
# need to reshape targets to 2D tensor (batch_size*seq_length, 1)
loss = criterion(outputs, targets.reshape(batch_size*seq_length,-1))
# loss gradient backpropagation
loss.backward()
# accumulate loss
loss_sum += loss.item()
# clip big gradients to avoid overshooting near steep cliffs in the loss hyperspace
nn.utils.clip_grad_norm_(net.parameters(), 0.5)
# net parameters update
optimizer.step()
# update performance history
losses.append(loss_sum / num_chunks_per_sample)
ticks.append(epoch)
# print per-epoch performances
print(("\nEpoch %d\n"
"...TIME: %.1f seconds\n"
"...loss: %g (best %g at epoch %d)\n") % (
epoch,
time.time() - t0,
losses[-1],
min(losses),
ticks[np.argmin(losses)]
))Inference
Generate curves with autoregressive -to strategy (it matches many-to-many strategy used for training, we are just doing it one step at the time).
# inference / curve generation
net.eval()
with torch.no_grad():
# convert seed to 1 x 1 x 1 input
input = torch.from_numpy(np.asarray(seed, dtype='float32')).to(device)
input = input.unsqueeze(0)
input = input.unsqueeze(1)
input = input.unsqueeze(2)
# set intial hidden and cell states
state = (torch.zeros(num_layers, 1, hidden_neurons).to(device),
torch.zeros(num_layers, 1, hidden_neurons).to(device))
# start generating curve
predicted = []
predicted.append(seed)
for i in range(num_samples_to_generate):
# forward
output, state = net(input, state)
# append output to predicted curve
predicted.append(output.item())
# fill next input with current output
input.fill_(output.item())
# draw original and predicted curve
fig = plt.figure(figsize=(10, 5))
plt.plot(predicted, 'b-', linewidth=1.0, aa=True, label='predicted')
plt.plot(curve_original, 'r-', linewidth=1.0, aa=True, label='original')
plt.legend(loc="upper right")
#plt.savefig(experiment_ID + "time_series_forecasting.png", dpi=300)
plt.draw()
plt.show()