Skip to content
Snippets Groups Projects
Commit f6b60e06 authored by marvnsch's avatar marvnsch
Browse files

Add copy cat notebook

parent 17adcf9e
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id:initial_id tags:
``` python
import torch
import torch.nn as nn
import torch.optim as optim
import random
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.processors import TemplateProcessing
```
%% Cell type:markdown id:2b9477923b668978 tags:
# Data Preparation
%% Cell type:code id:dbc5f26f27746098 tags:
``` python
def load_data() -> tuple[list[str], list[str]]:
with open("data/training-data/eup/europarl-v7.de-en.de", "r", encoding="utf8") as f:
data_de = [line.rstrip("\n") for line in f]
with open("data/training-data/eup/europarl-v7.de-en.en", "r", encoding="utf8") as f:
data_en = [line.rstrip("\n") for line in f]
ltd = set() # save lines to delete later
for i in range(max(len(data_de), len(data_en))):
# Move sentence to next line if line is empty other file
if data_de[i] == "":
data_en[i+1] = data_en[i] + " " + data_en[i+1]
ltd.add(i)
if data_en[i] == "":
data_de[i+1] = data_de[i] + " " + data_de[i+1]
ltd.add(i)
# Remove lines, where difference in words is > 40%
if abs(count_words(data_de[i]) - count_words(data_en[i])) / (max(count_words(data_de[i]), count_words(data_en[i])) + 1) > 0.4:
ltd.add(i)
# Remove lines < 3 words or > 10 words
if max(count_words(data_de[i]), count_words(data_en[i])) < 3 or max(count_words(data_de[i]), count_words(data_en[i])) > 10:
ltd.add(i)
temp_de = [l for i, l in enumerate(data_de) if i not in ltd]
data_de = temp_de
temp_en = [l for i, l in enumerate(data_en) if i not in ltd]
data_en = temp_en
print(len(data_de),len(data_en))
# Print 3 random sentence pairs
ix = torch.randint(low=0, high=max(len(data_de), len(data_en)), size=(3, ))
for i in ix:
print(f"Zeile: {i}\nDeutsch: {data_de[i]}\nEnglish: {data_en[i]}\n")
print(f"\nNumber of lines: {len(data_de), len(data_en)}")
return data_de, data_en
def count_words(string: str) -> int:
return len(string.split())
de, en = load_data()
# setting the unknown token (e.g. for emojis)
tokenizer_en = Tokenizer(BPE(unk_token="[UNK]"))
tokenizer_de = Tokenizer(BPE(unk_token="[UNK]"))
# adding special tokens
# [UNK] : unknown word/token
# [CLS] : starting token (new sentence sequence)
# [SEP] : separator for chaining multiple sentences
# [PAD] : padding needed for encoder input
trainer = BpeTrainer(vocab_size=10000,
special_tokens=["[UNK]", "[SOS]", "[EOS]", "[PAD]"])
tokenizer_en.pre_tokenizer = Whitespace()
tokenizer_de.pre_tokenizer = Whitespace()
tokenizer_en.train(["data/training-data/eup/europarl-v7.de-en.en"], trainer)
tokenizer_de.train(["data/training-data/eup/europarl-v7.de-en.de"], trainer)
# configure post processing
tokenizer_en.post_processor = TemplateProcessing(
single="[SOS] $A [EOS]",
special_tokens=[
("[SOS]", tokenizer_en.token_to_id("[SOS]")),
("[EOS]", tokenizer_en.token_to_id("[EOS]")),
],
)
tokenizer_de.post_processor = TemplateProcessing(
single="[SOS] $A [EOS]",
special_tokens=[
("[SOS]", tokenizer_de.token_to_id("[SOS]")),
("[EOS]", tokenizer_de.token_to_id("[EOS]")),
],
)
target_vocab_size = tokenizer_de.get_vocab_size()
source_vocab_size = tokenizer_en.get_vocab_size()
```
%% Cell type:code id:8edfacb67dc8c527 tags:
``` python
def training_data(source: list[str],
target: list[str],
batch_size: int = 64,
sort: bool = True) -> tuple[torch.tensor, torch.tensor]:
tokenizer_de.no_padding()
tokenizer_en.no_padding()
# sort the training data if true
if sort:
temp = ([list(a) for a in zip(source, target)])
temp.sort(key=lambda s: len(s[0]) + len(s[1]))
source, target = list(zip(*temp))
# select random sentences
for i in range(0, len(source) - batch_size, batch_size):
x_training_data = source[i:i + batch_size]
y_training_data = target[i:i + batch_size]
# tokenize data
tokenizer_en.enable_padding(pad_id=3)
x_training_data = tokenizer_en.encode_batch(x_training_data)
tokenizer_de.enable_padding(pad_id=3)
y_training_data = tokenizer_de.encode_batch(y_training_data)
# extract ids for every sequence
for j in range(batch_size):
x_training_data[j] = x_training_data[j].ids
y_training_data[j] = y_training_data[j].ids
# put data into tensor
x_training_data = torch.tensor(x_training_data)
y_training_data = torch.tensor(y_training_data)
# transpose tensors to match input requirements for lstm
x_training_data = torch.transpose(x_training_data, 0, 1)
y_training_data = torch.transpose(y_training_data, 0, 1)
yield x_training_data, y_training_data
```
%% Cell type:code id:524195fe40653308 tags:
``` python
# data test cell
print(len(de)/64)
for idx, _ in enumerate(training_data(source=de, target=en, batch_size=64)):
print(idx)
```
%% Cell type:markdown id:ca6d3d436fd31e33 tags:
### Model Definition
%% Cell type:code id:3b2c4dbc74a1f144 tags:
``` python
# Prepare model
class Encoder(nn.Module):
def __init__(self, input_size: int, embedding_size: int,
hidden_size: int, num_layers: int, dropout_prob: float):
super(Encoder, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = nn.Dropout(dropout_prob)
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.LSTM(input_size=embedding_size, hidden_size=hidden_size,
num_layers=num_layers, dropout=dropout_prob)
def forward(self, x):
# shape x : (sequence_len, batch_size)
embedding = self.dropout(self.embedding(x))
# shape embedding : sequence_len, batch_size, embedding_size)
output, (hidden, cell) = self.rnn(embedding)
return hidden, cell
class Decoder(nn.Module):
def __init__(self, input_size: int, embedding_size: int,
hidden_size: int, num_layers: int, output_size: int,
dropout_prob: float):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = nn.Dropout(dropout_prob)
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.LSTM(input_size=embedding_size, hidden_size=hidden_size,
num_layers=num_layers, dropout=dropout_prob)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden, cell):
x = x.view(1, -1)
# shape x : (1, batch_size)
embedding = self.dropout(self.embedding(x))
# embedding shape : (1, batch_size, embedding_size)
output, (hidden, cell) = self.rnn(embedding, (hidden, cell))
# shape output : (1, batch_size, hidden_size)
predictions = self.fc(output)
# shape predictions : (1, batch_size, vocab_len)
predictions = predictions.squeeze(1)
return predictions, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder: Encoder, decoder: Decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, source, target, teacher_forcing_ratio: float = 0.5):
batch_size = source.shape[1]
target_len = target.shape[0]
outputs = torch.zeros(target_len, batch_size, target_vocab_size)
hidden, cell = self.encoder(source)
x = target[0]
for t in range(1, target_len):
output, hidden, cell = self.decoder(x, hidden, cell)
outputs[t] = output
best_guess = output.argmax(2)
x = target[t] if random.random() < teacher_forcing_ratio else best_guess
return outputs
```
%% Cell type:markdown id:9854eaee8392caa1 tags:
### Model Training
%% Cell type:code id:ee166d65b3b975d tags:
``` python
# training hyperparameters
num_epochs = 20
learning_rate = 0.001
batch_size = 64
# model hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size_encoder = source_vocab_size
input_size_decoder = target_vocab_size
output_size_decoder = target_vocab_size
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
encoder_net = Encoder(input_size=input_size_encoder,
embedding_size=encoder_embedding_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout_prob=encoder_dropout)
decoder_net = Decoder(input_size=input_size_decoder,
embedding_size=decoder_embedding_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout_prob=decoder_dropout,
output_size=output_size_decoder)
model = Seq2Seq(encoder=encoder_net, decoder=decoder_net)
criterion = nn.CrossEntropyLoss(ignore_index=3)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch + 1, num_epochs))
for batch_idx, (x_train, y_train) in enumerate(training_data(source=en,
target=en)):
output = model(x_train, y_train)
output = output[1:].reshape(-1, output.shape[2])
y_train = y_train[1:].reshape(-1)
optimizer.zero_grad()
loss = criterion(output, y_train)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
print(batch_idx)
print("loss: " + str(loss.item()))
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment