from pathlib import Path
from urllib.request import Request, urlopen
import json
import shutil
import zipfile
 
RESOURCE_MANIFESTS = [
    "https://assets.deeplearningnotes.com/code-support-resources/datasets/books/latest.json",
]
 
def download_file(url, path):
    request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
    with urlopen(request) as response, open(path, "wb") as file:
        shutil.copyfileobj(response, file)
 
def extract_zip_safely(zip_path, target_dir="."):
    target_dir = Path(target_dir).resolve()
 
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        for member in zip_ref.infolist():
            target_path = (target_dir / member.filename).resolve()
            if not str(target_path).startswith(str(target_dir)):
                raise RuntimeError(f"Unsafe zip path: {member.filename}")
 
        zip_ref.extractall(target_dir)
 
for manifest_url in RESOURCE_MANIFESTS:
    name = manifest_url.rstrip("/").split("/")[-2]
    manifest_path = Path(f"{name}-latest.json")
    archive_path = Path(f"{name}.zip")
 
    download_file(manifest_url, manifest_path)
    manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
 
    expected_paths = [Path(path) for path in manifest.get("expected_paths", [])]
 
    if not all(path.exists() for path in expected_paths):
        download_file(manifest["archive_url"], archive_path)
        extract_zip_safely(archive_path, manifest.get("extract_to", "."))
 
    missing = [str(path) for path in expected_paths if not path.exists()]
    if missing:
        raise FileNotFoundError(f"Missing expected paths: {missing}")
 
    print(f"{manifest['name']} ready.")

Download books

Choose your corpus

# use local books from ./datasets/books
import os
 
books_dir = "./datasets/books"
if not os.path.isdir(books_dir):
    raise FileNotFoundError(f"Books folder not found: {books_dir}")
 
book_files = sorted([
    f for f in os.listdir(books_dir)
    if f.endswith(".txt") and os.path.isfile(os.path.join(books_dir, f))
])
 
if not book_files:
    raise FileNotFoundError(f"No .txt books found in {books_dir}")
 
print("Books found in ./datasets/books:")
for f in book_files:
    print(f" - {f}")

Imports

# import the required packages
import os
import time
import torch
import torch.nn as nn
import numpy as np

Set hyperparameters and options

Set here your hyperparameters (to be used later in the code), so that you can run and compare different experiments operating on these values.

Note

A better alternative would be to use command-line arguments to set hyperparameters and other options (see argparse Python package).

# hyperparameters
batch_size = 32
learning_rate = 0.002
epochs = 20
hidden_neurons = 1024
embed_size = 128       # embedding size
num_layers = 1         # number of LSTM layers
seq_length = 30        # length of sequence at training phase
num_samples = 1000     # number of words to be sampled at testing phase
 
# options
training_set = os.path.join(books_dir, "grimms_tales.txt")
device = "cuda:0"   # force GPU with ID 0
 
# enforce CUDA usage on GPU 0 and verify it works
if not torch.cuda.is_available():
    raise RuntimeError("CUDA is not available. This notebook requires GPU ID 0 (cuda:0).")
 
if device != "cuda:0":
    raise ValueError(f"Invalid device '{device}'. Expected 'cuda:0'.")
 
torch.cuda.set_device(0)
active_gpu = torch.cuda.current_device()
if active_gpu != 0:
    raise RuntimeError(f"Wrong active GPU: {active_gpu}. Expected GPU 0.")
 
# quick CUDA sanity check: tensor allocation and simple op on GPU 0
_cuda_test = torch.tensor([1.0], device=device)
_cuda_test = _cuda_test * 2.0
assert _cuda_test.item() == 2.0, "CUDA sanity check failed on GPU 0."
print(f"CUDA check OK | device={device} | name={torch.cuda.get_device_name(0)}")

Define the model architecture

Define here your network.

Note

A better alternative would be to have a pool of network architectures defined in a python file (module) that one could import.

class LanguageModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(LanguageModel, self).__init__()
 
        # need a 'Word Embedding' to map each word to a 1D float vector of size embed_size
        self.embed = nn.Embedding(vocab_size, embed_size)
 
        # LSTM layer(s)
        # using batch_first = true, inputs/outputs will have size (batch_size, seq_length, <input or hidden>_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
 
        # last fully connected layer to map the hidden neurons of the last LSTM layer to the
        # the number of outputs (all possible words in the dictionary)
        self.linear = nn.Linear(hidden_size, vocab_size)
 
    def forward(self, x, h):
 
        # embed word ids to float vectors
        x = self.embed(x)
 
        # forward propagate LSTM
        # out are the LSTM hidden states for all time steps in seq_length
        # (h, c) are the LSTM hidden and cell/memory states for the last time step only
        out, (h, c) = self.lstm(x, h)
 
        # reshape 3D output (batch_size, seq_legth, vocab_size) to 2D(batch_size*seq_length, hidden_size)
        # so that it can be inputted to the enxt linear layer (accepts one or more samples of size hidden_size)
        out = out.reshape(out.size(0)*out.size(1), out.size(2))
 
        # decode hidden states of all time steps
        out = self.linear(out)
        return out, (h, c)

Create datasets

In this application, only a training dataset is created. There is no strict need for a testing dataset, since text will be generated from scratch. The training dataset consists of a sequence of identifiers (ids), one for each unique word found in the text file.

# Dictionary class
# create dictionary on-the-fly by adding one word at the time
# every time a new word is added, it is added to the dictionary
# and an index is associated to it
class Dictionary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0
 
    def add_word(self, word):
        if not word in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1
 
    def __len__(self):
        return len(self.word2idx)
 
# Corpus class
# First pass scans a given text file and adds all words found to the Dictionary
# also the end of sentence <eos> is added as a 'word'
# Second pass scans the same text file and associates an id to each word found
# using the Dictionary previously built
class Corpus(object):
    def __init__(self):
        self.dictionary = Dictionary()
 
    def get_data(self, path, batch_size=32):
 
        # First scan: add words to the dictionary
        with open(path, 'r') as f:
            tokens = 0
            for line in f:
                words = line.split() + ['<eos>']
                tokens += len(words)
                for word in words:
                    self.dictionary.add_word(word)
 
        # Second scan: tokenize the file content
        ids = torch.LongTensor(tokens)
        token = 0
        with open(path, 'r') as f:
            for line in f:
                words = line.split() + ['<eos>']
                for word in words:
                    ids[token] = self.dictionary.word2idx[word]
                    token += 1
 
        # make ids length multiple of batch_size, and remove
        # a few ids at the end if needed
        print('original ids are %d' % ids.size(0))
        num_batches = int(ids.size(0) / batch_size)
        ids = ids[:num_batches*batch_size]
        print('trimmed  ids are %d (now multiple of batch size %d)' % (ids.size(0), batch_size))
 
        # reshape ids (1D tensor) to a 2D tensor of size batch_size x num_batches
        print('reshape  ids from %s to %s' % (ids.shape, ids.view(batch_size, -1).shape))
        return ids.view(batch_size, -1)
 
corpus = Corpus()
ids = corpus.get_data(training_set, batch_size)
vocab_size = len(corpus.dictionary)
print('dictionary size (#unique ids) is %d' % vocab_size)
 
# since one batch sample is a sequence of words
# the actual number of batches is
num_batches = ids.size(1) / seq_length
print('num_batches is %d / %d = %d since each batch sample is a sequence of %d words' % (ids.size(1), seq_length, num_batches, seq_length))

Create the building blocks for training

Create an instance of the network, the loss function, the optimizer, and learning rate scheduler.

# create the network
net = LanguageModel(vocab_size, embed_size, hidden_neurons, num_layers)
 
# create loss function
criterion = nn.CrossEntropyLoss()
 
# create Adam optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
 
# create learning rate scheduler
# ...not needed (Adam automatically updates the learning rate)

Train

The code below also includes visual loss/accuracy monitoring during training, both on training and validation sets.

# reset performance monitors
losses = []
ticks = []
 
# move net to device
net.to(device)
net.train()
 
# start training
for epoch in range(1, epochs+1):
 
    # reset performance measures
    loss_sum = 0.0
 
    # measure time elapsed
    t0 = time.time()
 
    # differently from traditional ANNs, RNNs take in input also the previous
    # states. So we need to initialize the '0th' state in some manner, e.g.
    # by sampling from a gaussian/uniform distribution or even by setting all 0s
    # in this case we have 2 different states (hidden and cell/memory)
    states = (torch.zeros(num_layers, batch_size, hidden_neurons).to(device),
              torch.zeros(num_layers, batch_size, hidden_neurons).to(device))
 
    # for each batch of size seq_length, starting from 0 and
    # ending at num_batches - seq_length
    for i in range(0, ids.size(1) - seq_length, seq_length):
 
        # get batch inputs and targets and send them to device
        # targets have +1 sample(word) displacement w.r.t. to inputs
        # indeed we want to predict the next word given the previous seq_length words
        inputs = ids[:, i:i+seq_length].to(device)
        targets = ids[:, (i+1):(i+1)+seq_length].to(device)
 
        # truncated backpropagation
        # we do not want to backpropagate towards the previous sequences
        # left to the beginning, which would result in gradient vanishing/exploding
        # so we 'detach' the states from the computational graph, so that autograd
        # will ignore the previous iterations
        # NOTE: this is different from .zero_grad(), which only clears the previous gradients
        #       but DOES NOT modify the computational graph
        states = states[0].detach(), states[1].detach()
 
        # zero the parameter gradients
        optimizer.zero_grad()
 
        # forward pass
        outputs, states = net(inputs, states)
 
        # calculate loss
        # outputs is 2D tensor (batch_size*seq_length, vocab_size)
        # targets is 2D tensor (batch_size, seq_length)
        # need to reshape targets to 1D tensor (batch_size*seq_length)
        loss = criterion(outputs, targets.reshape(-1))
 
        # loss gradient backpropagation
        loss.backward()
 
        # accumulate loss
        loss_sum += loss.item()
 
        # clip big gradients to avoid overshooting near steep cliffs in the loss hyperspace
        nn.utils.clip_grad_norm_(net.parameters(), 0.5)
 
        # net parameters update
        optimizer.step()
 
    # update performance history
    losses.append(loss_sum / num_batches)
    ticks.append(epoch)
 
    # print per-epoch performances
    print (f"\nEpoch {epoch}\n"
            f"...TIME: {time.time()-t0:.1f} seconds\n"
            f"...loss: {losses[-1]} (best {min(losses)} at epoch {ticks[np.argmin(losses)]})\n")

Test

One-to-one autoregression.

seed = 'The'  # one word only
 
net.eval()
with torch.no_grad():
    # convert seed word to index
    idx = corpus.dictionary.word2idx[seed]
    input = torch.tensor([[idx]], device=device)  # shape = (1, 1)
 
    # initial hidden and cell states
    state = (
        torch.zeros(num_layers, 1, hidden_neurons).to(device),
        torch.zeros(num_layers, 1, hidden_neurons).to(device)
    )
 
    # start generated text with the seed
    text = seed + ' '
 
    for _ in range(num_samples):
        # forward pass: input shape (1, 1)
        output, state = net(input, state)  # output shape: (1, vocab_size)
 
        # sample next word index from output distribution (more creative than
        # just picking the max probability)
        probs = output.squeeze(0).exp()  # shape: (vocab_size,)
        predicted_idx = torch.multinomial(probs, num_samples=1).item()
 
        # convert index back to word
        word = corpus.dictionary.idx2word[predicted_idx]
        text += '\n' if word == '<eos>' else word + ' '
 
        # prepare input for next iteration (shape = (1, 1))
        input.fill_(predicted_idx)
 
    print(text)