
from pathlib import Path
from urllib.request import Request, urlopen
import json
import shutil
import zipfile
RESOURCE_MANIFESTS = [
"https://assets.deeplearningnotes.com/code-support-resources/datasets/books/latest.json",
]
def download_file(url, path):
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request) as response, open(path, "wb") as file:
shutil.copyfileobj(response, file)
def extract_zip_safely(zip_path, target_dir="."):
target_dir = Path(target_dir).resolve()
with zipfile.ZipFile(zip_path, "r") as zip_ref:
for member in zip_ref.infolist():
target_path = (target_dir / member.filename).resolve()
if not str(target_path).startswith(str(target_dir)):
raise RuntimeError(f"Unsafe zip path: {member.filename}")
zip_ref.extractall(target_dir)
for manifest_url in RESOURCE_MANIFESTS:
name = manifest_url.rstrip("/").split("/")[-2]
manifest_path = Path(f"{name}-latest.json")
archive_path = Path(f"{name}.zip")
download_file(manifest_url, manifest_path)
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
expected_paths = [Path(path) for path in manifest.get("expected_paths", [])]
if not all(path.exists() for path in expected_paths):
download_file(manifest["archive_url"], archive_path)
extract_zip_safely(archive_path, manifest.get("extract_to", "."))
missing = [str(path) for path in expected_paths if not path.exists()]
if missing:
raise FileNotFoundError(f"Missing expected paths: {missing}")
print(f"{manifest['name']} ready.")Download books
Choose your corpus
# use local books from ./datasets/books
import os
books_dir = "./datasets/books"
if not os.path.isdir(books_dir):
raise FileNotFoundError(f"Books folder not found: {books_dir}")
book_files = sorted([
f for f in os.listdir(books_dir)
if f.endswith(".txt") and os.path.isfile(os.path.join(books_dir, f))
])
if not book_files:
raise FileNotFoundError(f"No .txt books found in {books_dir}")
print("Books found in ./datasets/books:")
for f in book_files:
print(f" - {f}")Imports
# import the required packages
import os
import time
import torch
import torch.nn as nn
import numpy as npSet hyperparameters and options
Set here your hyperparameters (to be used later in the code), so that you can run and compare different experiments operating on these values.
Note
A better alternative would be to use command-line arguments to set hyperparameters and other options (see argparse Python package).
# hyperparameters
batch_size = 32
learning_rate = 0.002
epochs = 20
hidden_neurons = 1024
embed_size = 128 # embedding size
num_layers = 1 # number of LSTM layers
seq_length = 30 # length of sequence at training phase
num_samples = 1000 # number of words to be sampled at testing phase
# options
training_set = os.path.join(books_dir, "grimms_tales.txt")
device = "cuda:0" # force GPU with ID 0
# enforce CUDA usage on GPU 0 and verify it works
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available. This notebook requires GPU ID 0 (cuda:0).")
if device != "cuda:0":
raise ValueError(f"Invalid device '{device}'. Expected 'cuda:0'.")
torch.cuda.set_device(0)
active_gpu = torch.cuda.current_device()
if active_gpu != 0:
raise RuntimeError(f"Wrong active GPU: {active_gpu}. Expected GPU 0.")
# quick CUDA sanity check: tensor allocation and simple op on GPU 0
_cuda_test = torch.tensor([1.0], device=device)
_cuda_test = _cuda_test * 2.0
assert _cuda_test.item() == 2.0, "CUDA sanity check failed on GPU 0."
print(f"CUDA check OK | device={device} | name={torch.cuda.get_device_name(0)}")Define the model architecture
Define here your network.
Note
A better alternative would be to have a pool of network architectures defined in a python file (module) that one could import.
class LanguageModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
super(LanguageModel, self).__init__()
# need a 'Word Embedding' to map each word to a 1D float vector of size embed_size
self.embed = nn.Embedding(vocab_size, embed_size)
# LSTM layer(s)
# using batch_first = true, inputs/outputs will have size (batch_size, seq_length, <input or hidden>_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
# last fully connected layer to map the hidden neurons of the last LSTM layer to the
# the number of outputs (all possible words in the dictionary)
self.linear = nn.Linear(hidden_size, vocab_size)
def forward(self, x, h):
# embed word ids to float vectors
x = self.embed(x)
# forward propagate LSTM
# out are the LSTM hidden states for all time steps in seq_length
# (h, c) are the LSTM hidden and cell/memory states for the last time step only
out, (h, c) = self.lstm(x, h)
# reshape 3D output (batch_size, seq_legth, vocab_size) to 2D(batch_size*seq_length, hidden_size)
# so that it can be inputted to the enxt linear layer (accepts one or more samples of size hidden_size)
out = out.reshape(out.size(0)*out.size(1), out.size(2))
# decode hidden states of all time steps
out = self.linear(out)
return out, (h, c)Create datasets
In this application, only a training dataset is created. There is no strict need for a testing dataset, since text will be generated from scratch. The training dataset consists of a sequence of identifiers (ids), one for each unique word found in the text file.
# Dictionary class
# create dictionary on-the-fly by adding one word at the time
# every time a new word is added, it is added to the dictionary
# and an index is associated to it
class Dictionary(object):
def __init__(self):
self.word2idx = {}
self.idx2word = {}
self.idx = 0
def add_word(self, word):
if not word in self.word2idx:
self.word2idx[word] = self.idx
self.idx2word[self.idx] = word
self.idx += 1
def __len__(self):
return len(self.word2idx)
# Corpus class
# First pass scans a given text file and adds all words found to the Dictionary
# also the end of sentence <eos> is added as a 'word'
# Second pass scans the same text file and associates an id to each word found
# using the Dictionary previously built
class Corpus(object):
def __init__(self):
self.dictionary = Dictionary()
def get_data(self, path, batch_size=32):
# First scan: add words to the dictionary
with open(path, 'r') as f:
tokens = 0
for line in f:
words = line.split() + ['<eos>']
tokens += len(words)
for word in words:
self.dictionary.add_word(word)
# Second scan: tokenize the file content
ids = torch.LongTensor(tokens)
token = 0
with open(path, 'r') as f:
for line in f:
words = line.split() + ['<eos>']
for word in words:
ids[token] = self.dictionary.word2idx[word]
token += 1
# make ids length multiple of batch_size, and remove
# a few ids at the end if needed
print('original ids are %d' % ids.size(0))
num_batches = int(ids.size(0) / batch_size)
ids = ids[:num_batches*batch_size]
print('trimmed ids are %d (now multiple of batch size %d)' % (ids.size(0), batch_size))
# reshape ids (1D tensor) to a 2D tensor of size batch_size x num_batches
print('reshape ids from %s to %s' % (ids.shape, ids.view(batch_size, -1).shape))
return ids.view(batch_size, -1)
corpus = Corpus()
ids = corpus.get_data(training_set, batch_size)
vocab_size = len(corpus.dictionary)
print('dictionary size (#unique ids) is %d' % vocab_size)
# since one batch sample is a sequence of words
# the actual number of batches is
num_batches = ids.size(1) / seq_length
print('num_batches is %d / %d = %d since each batch sample is a sequence of %d words' % (ids.size(1), seq_length, num_batches, seq_length))Create the building blocks for training
Create an instance of the network, the loss function, the optimizer, and learning rate scheduler.
# create the network
net = LanguageModel(vocab_size, embed_size, hidden_neurons, num_layers)
# create loss function
criterion = nn.CrossEntropyLoss()
# create Adam optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
# create learning rate scheduler
# ...not needed (Adam automatically updates the learning rate)Train
The code below also includes visual loss/accuracy monitoring during training, both on training and validation sets.
# reset performance monitors
losses = []
ticks = []
# move net to device
net.to(device)
net.train()
# start training
for epoch in range(1, epochs+1):
# reset performance measures
loss_sum = 0.0
# measure time elapsed
t0 = time.time()
# differently from traditional ANNs, RNNs take in input also the previous
# states. So we need to initialize the '0th' state in some manner, e.g.
# by sampling from a gaussian/uniform distribution or even by setting all 0s
# in this case we have 2 different states (hidden and cell/memory)
states = (torch.zeros(num_layers, batch_size, hidden_neurons).to(device),
torch.zeros(num_layers, batch_size, hidden_neurons).to(device))
# for each batch of size seq_length, starting from 0 and
# ending at num_batches - seq_length
for i in range(0, ids.size(1) - seq_length, seq_length):
# get batch inputs and targets and send them to device
# targets have +1 sample(word) displacement w.r.t. to inputs
# indeed we want to predict the next word given the previous seq_length words
inputs = ids[:, i:i+seq_length].to(device)
targets = ids[:, (i+1):(i+1)+seq_length].to(device)
# truncated backpropagation
# we do not want to backpropagate towards the previous sequences
# left to the beginning, which would result in gradient vanishing/exploding
# so we 'detach' the states from the computational graph, so that autograd
# will ignore the previous iterations
# NOTE: this is different from .zero_grad(), which only clears the previous gradients
# but DOES NOT modify the computational graph
states = states[0].detach(), states[1].detach()
# zero the parameter gradients
optimizer.zero_grad()
# forward pass
outputs, states = net(inputs, states)
# calculate loss
# outputs is 2D tensor (batch_size*seq_length, vocab_size)
# targets is 2D tensor (batch_size, seq_length)
# need to reshape targets to 1D tensor (batch_size*seq_length)
loss = criterion(outputs, targets.reshape(-1))
# loss gradient backpropagation
loss.backward()
# accumulate loss
loss_sum += loss.item()
# clip big gradients to avoid overshooting near steep cliffs in the loss hyperspace
nn.utils.clip_grad_norm_(net.parameters(), 0.5)
# net parameters update
optimizer.step()
# update performance history
losses.append(loss_sum / num_batches)
ticks.append(epoch)
# print per-epoch performances
print (f"\nEpoch {epoch}\n"
f"...TIME: {time.time()-t0:.1f} seconds\n"
f"...loss: {losses[-1]} (best {min(losses)} at epoch {ticks[np.argmin(losses)]})\n")Test
One-to-one autoregression.
seed = 'The' # one word only
net.eval()
with torch.no_grad():
# convert seed word to index
idx = corpus.dictionary.word2idx[seed]
input = torch.tensor([[idx]], device=device) # shape = (1, 1)
# initial hidden and cell states
state = (
torch.zeros(num_layers, 1, hidden_neurons).to(device),
torch.zeros(num_layers, 1, hidden_neurons).to(device)
)
# start generated text with the seed
text = seed + ' '
for _ in range(num_samples):
# forward pass: input shape (1, 1)
output, state = net(input, state) # output shape: (1, vocab_size)
# sample next word index from output distribution (more creative than
# just picking the max probability)
probs = output.squeeze(0).exp() # shape: (vocab_size,)
predicted_idx = torch.multinomial(probs, num_samples=1).item()
# convert index back to word
word = corpus.dictionary.idx2word[predicted_idx]
text += '\n' if word == '<eos>' else word + ' '
# prepare input for next iteration (shape = (1, 1))
input.fill_(predicted_idx)
print(text)