


我们将使用PyTorch和TorchText构建一个机器学习模型,从一个序列到另一个序列。 将德语到英语翻译成英语

该模型是《Sequence to Sequence Learning with Neural Networks》这篇论文的Pytorch实现









import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator

import spacy
import random
import math
import time

#1、preparing data
SEED = 1234
torch.backends.cudnn.deterministic = True  #使用cuda保证每次结果一样。将这个 flag 置为True的话,每次返回的卷积算法将是确定的,即默认算法。如果配合上设置 Torch 的随机种子为固定值的话,应该可以保证每次运行网络的时候相同输入的输出是固定的

spacy_de = spacy.load('de')
spacy_en = spacy.load('en')

#在原论文中,作者发现颠倒源语言的输入的顺序可以取得不错的翻译效果,例如,一句话为“good morning!”,颠倒顺序分词后变为"!", “morning”, 和"good"。

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)][::-1]
def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]


#sos是start of sequence, eos是end of sequence
SRC = Field(tokenize = tokenize_de,
            init_token = '<sos>',
            eos_token = '<eos>',
            lower = True)

TRG = Field(tokenize = tokenize_en,
            init_token = '<sos>',
            eos_token = '<eos>',
            lower = True)

train_data, valid_data, test_data = Multi30k.splits(exts = ('.de', '.en'),
                                                    fields = (SRC, TRG))

print(f"Number of training examples: {len(train_data.examples)}")
print(f"Number of validation examples: {len(valid_data.examples)}")
print(f"Number of testing examples: {len(test_data.examples)}")


SRC.build_vocab(train_data, min_freq = 2) #设置最小词频为2,当一个单词在数据集中出现次数小于2时会被转换为<unk>字符。
TRG.build_vocab(train_data, min_freq = 2)

print(f"Unique tokens in source (de) vocabulary: {len(SRC.vocab)}")
print(f"Unique tokens in target (en) vocabulary: {len(TRG.vocab)}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size = BATCH_SIZE,
    device = device)

batch = next(iter(train_iterator))
# [torchtext.data.batch.Batch of size 128 from MULTI30K]
# 	[.src]:[torch.cuda.LongTensor of size 23x128 (GPU 0)]
# 	[.trg]:[torch.cuda.LongTensor of size 21x128 (GPU 0)]





  • input_dim:输入编码器的one-hot向量的维度,等于源语言词汇表的大小。
  • emb_dim:embedding层的维度
  • hid_dim:隐藏层h和c的维度
  • n_layers:LSTM网络的层数
  • dropout:如果非零的话,将会在LSTM的输出上加个dropout,最后一层除外。


  • output_dim:输入解码器的one-hot向量维度,等于目标语言词汇表的大小。
  • 添加了Linear层,用于预测最终输出。





  • 使用源语言句子作为输入
  • 使用Encoder生成上下文向量
  • 使用Decoder预测目标语言句子
  • device :把张量放到GPU上。新版的Pytorch使用to方法可以容易地将对象移动到不同的设备(代替以前的cpu()或cuda()方法)。
  • outputs:存储Decoder所有输出的张量
  • teacher_forcing_ratio:该参数的作用是,当使用teacher force时,decoder网络的下一个input是目标语言的下一个字符,当不使用时,网络的下一个input是其预测出的那个字符。
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):

        self.input_dim = input_dim
        self.emb_dim = emb_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.dropout = dropout

        self.embedding = nn.Embedding(input_dim, emb_dim)

        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)

        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        #src:(sent_len, batch_size)

        embedded = self.dropout(self.embedding(src))

        #embedded:(sent_len, batch_size, emb_dim)
        outputs, (hidden, cell) = self.rnn(embedded)
        #outputs:(sent_len, batch_size, hid_dim)
        #hidden:(n_layers, batch_size, hid_dim)
        #cell:(n_layers, batch_size, hid_dim)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):

        self.output_dim = output_dim
        self.emb_dim = emb_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.dropout = dropout

        self.embedding = nn.Embedding(output_dim, emb_dim)

        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)

        self.out = nn.Linear(hid_dim, output_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        # input: (batch_size) -> input: (1, batch_size)
        input = input.unsqueeze(0)
        # embedded: (1, batch_size, emb_dim)
        embedded = self.dropout(self.embedding(input))
        # hidden: (n_layers, batch size, hid_dim)
        # cell: (n_layers, batch size, hid_dim)
        # output(1, batch_size, hid_dim)
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # prediction: (batch_size, output_dim)
        prediction = self.out(output.squeeze(0))

        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        assert encoder.hid_dim == decoder.hid_dim, \
            "Hidden dimensions of encoder and decoder must be equal!"
        assert encoder.n_layers == decoder.n_layers, \
            "Encoder and decoder must have equal number of layers!"

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src = [src sent len, batch size]
        # trg = [trg sent len, batch size]
        # teacher_forcing_ratio is probability to use teacher forcing
        # e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time

        batch_size = trg.shape[1]
        max_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        # tensor to store decoder outputs
        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)

        # last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)

        input = trg[0, :]

        for t in range(1, max_len):
            # insert input token embedding, previous hidden and previous cell states
            # receive output tensor (predictions) and new hidden and cell states
            output, hidden, cell = self.decoder(input, hidden, cell)

            # place predictions in a tensor holding predictions for each token
            outputs[t] = output

            # decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio

            # get the highest predicted token from our predictions
            top1 = output.argmax(1)

            # if teacher forcing, use actual next token as next input
            # if not, use predicted token
            input = trg[t] if teacher_force else top1

        return outputs





  • model.train() : 让model变为训练模式,启用 batch normalization(本模型未使用)和 Dropout。
  • clip_grad_norm: 进行梯度裁剪,防止梯度爆炸。clip:梯度阈值
  • view函数: 减少output和trg的维度以便进行loss计算。由于trg每句话的开头都是标记符sos,为了提高准确度,output和trg的第一列将不参与计算损失。
  • model.eval(): 开启测试模式,关闭batch normalization(本模型未使用)和 dropout。
  • torch.no_grad():关闭autograd 引擎(不会进行反向传播计算),这样的好处是减少内存的使用并且加速计算。
  • teacher_forcing_ratio = 0:在测试阶段须要关闭teacher forcing,保证模型使用预测的结果作为下一步的输入。
