请选择 进入手机版 | 继续访问电脑版
MSIPO技术圈 首页 IT技术 查看内容

Transformer架构完整代码示例

2023-07-13

Transformer架构完整代码

#!/usr/bin/python3.9
# -*- coding: utf-8 -*-
# @Time    : 2023/6/29 10:48
# @File    : abd_transformer_cyd.py
# @Software: PyCharm

import math
import torch
import collections
import numpy as np
import torch.nn as nn
from copy import deepcopy
import torch.nn.functional as F
from torch.autograd import Variable

# 让Hypothesis拥有可访问的属性,即Hypothesis.value
Hypothesis = collections.namedtuple('Hypothesis', ['value', 'score'])

def clone_module_to_modulelist(module, module_num):
    """
    克隆n个Module类放入ModuleList中,并返回ModuleList,这个ModuleList中的每个Module都是一模一样的
    nn.ModuleList,它是一个储存不同 module,并自动将每个 module 的 parameters 添加到网络之中的容器。
    你可以把任意 nn.Module 的子类 (比如 nn.Conv2d, nn.Linear 之类的) 加到这个 list 里面,
    加入到 nn.ModuleList 里面的 module 是会自动注册到整个网络上的,
    同时 module 的 parameters 也会自动添加到整个网络中。
    :param module: 被克隆的module
    :param module_num: 被克隆的module数
    :return: 装有module_num个相同module的ModuleList
    """
    return nn.ModuleList([deepcopy(module) for _ in range(module_num)])


class LayerNorm(nn.Module):
    """
    构建一个LayerNorm Module
    LayerNorm的作用:对x归一化,使x的均值为0,方差为1
    LayerNorm计算公式:x-mean(x)/\sqrt{var(x)+\epsilon} = x-mean(x)/std(x)+\epsilon
    """

    def __init__(self, x_size, eps=1e-6):
        """
        :param x_size: 特征的维度
        :param eps: eps是一个平滑的过程,取值通常在(10^-4~10^-8 之间)
        其含义是,对于每个参数,随着其更新的总距离增多,其学习速率也随之变慢。
        防止出现除以0的情况。

        nn.Parameter将一个不可训练的类型Tensor转换成可以训练的类型parameter,
        并将这个parameter绑定到这个module里面。
        使用这个函数的目的也是想让某些变量在学习的过程中不断的修改其值以达到最优化。
        """
        super(LayerNorm, self).__init__()
        self.ones_tensor = nn.Parameter(torch.ones(x_size))  # 按照特征向量大小返回一个全1的张量,并且转换成可训练的parameter类型
        self.zeros_tensor = nn.Parameter(torch.zeros(x_size))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)  # 求标准差
        return self.ones_tensor * (x - mean) / (std + self.eps) + self.zeros_tensor  # LayerNorm的计算公式


class FeatEmbedding(nn.Module):
    """
    视频特征向量生成器
    """

    def __init__(self, d_feat, d_model, dropout):
        """
        FeatEmbedding的初始化
        :param d_feat: per frame dimension(每帧的维度),作为Linear层输入的维度
        :param d_model: 作为Linear层输出的维度
        :param dropout: Dropout层的比率

        nn.Sequential:这是一个有顺序的容器,将特定神经网络模块按照在传入构造器的顺序依次被添加到计算图中
        在这里构造的容器是:LayerNorm --> Dropout --> Linear
        """
        super(FeatEmbedding, self).__init__()
        self.video_embeddings = nn.Sequential(
            # TODO:为什么这里对视频做处理,即图片做处理,不使用BatchNorm
            # nn.BatchNorm2d(d_feat)
            # nn.LayerNorm(d_feat),
            LayerNorm(d_feat),
            nn.Dropout(p=dropout),
            nn.Linear(d_feat, d_model)
        )

    def forward(self, x):
        return self.video_embeddings(x)  # 返回被处理后的视频特征向量


class WordEmbedding(nn.Module):
    """
    把向量构造成d_model维度的词向量,以便后续送入编码器
    """

    def __init__(self, vocab_size, d_model):
        """
        :param vocab_size: 字典长度
        :param d_model: 词向量维度
        """
        super(WordEmbedding, self).__init__()
        self.d_model = d_model
        # 字典中有vocab_size个词,词向量维度是d_model,每个词将会被映射成d_model维度的向量
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.embed = self.embedding

    def forward(self, x):
        # TODO:为什么要乘以一个sqrt,Transformer中的?
        return self.embed(x) * math.sqrt(self.d_model)


class PositionalEncoding(nn.Module):
    """
    正弦位置编码,即通过三角函数构建位置编码

    Implementation based on "Attention Is All You Need"
    :cite:`DBLP:journals/corr/VaswaniSPUJGKP17`
    """

    def __init__(self, dim: int, dropout: float, max_len=5000):
        """
        :param dim: 位置向量的向量维度,一般与词向量维度相同,即d_model
        :param dropout: Dropout层的比率
        :param max_len: 句子的最大长度
        """
        # 判断能够构建位置向量
        if dim % 2 != 0:
            raise ValueError(f"不能使用 sin/cos 位置编码,得到了奇数的维度{dim:d},应该使用偶数维度")

        """
        构建位置编码pe
        pe公式为:
        PE(pos,2i/2i+1) = sin/cos(pos/10000^{2i/d_{model}})
        """
        pe = torch.zeros(max_len, dim)  # 初始化pe
        position = torch.arange(0, max_len).unsqueeze(1)  # 构建pos,为句子的长度,相当于pos
        div_term = torch.exp((torch.arange(0, dim, 2, dtype=torch.float) * torch.tensor(
            -(math.log(10000.0) / dim))))  # 复现位置编码sin/cos中的公式
        pe[:, 0::2] = torch.sin(position.float() * div_term)  # 偶数使用sin函数
        pe[:, 1::2] = torch.cos(position.float() * div_term)  # 奇数使用cos函数
        pe = pe.unsqueeze(1)  # 扁平化成一维向量

        super(PositionalEncoding, self).__init__()
        self.register_buffer('pe', pe)  # pe不是模型的一个参数,通过register_buffer把pe写入内存缓冲区,当做一个内存中的常量
        self.drop_out = nn.Dropout(p=dropout)
        self.dim = dim

    def forward(self, emb, step=None):
        """
        词向量和位置编码拼接并输出
        :param emb: 词向量序列(FloatTensor),``(seq_len, batch_size, self.dim)``
        :param step: 如果 stepwise("seq_len=1"),则用此位置的编码
        :return: 词向量和位置编码的拼接
        """
        emb = emb * math.sqrt(self.dim)
        if step is None:
            emb = emb + self.pe[:emb.size(0)]  # 拼接词向量和位置编码
        else:
            emb = emb + self.pe[step]
        emb = self.drop_out(emb)
        return emb


def self_attention(query, key, value, dropout=None, mask=None):
    """
    自注意力计算
    :param query: Q
    :param key: K
    :param value: V
    :param dropout: drop比率
    :param mask: 是否mask
    :return: 经自注意力机制计算后的值
    """
    d_k = query.size(-1)  # 防止softmax未来求梯度消失时的d_k
    # Q,K相似度计算公式:\frac{Q^TK}{\sqrt{d_k}}
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)  # Q,K相似度计算
    # 判断是否要mask,注:mask的操作在QK之后,softmax之前
    if mask is not None:
        """
        scores.masked_fill默认是按照传入的mask中为1的元素所在的索引,
        在scores中相同的的索引处替换为value,替换值为-1e9,即-(10^9)
        """
        # mask.cuda()
        # 进行mask操作,由于参数mask==0,因此替换上述mask中为0的元素所在的索引

    scores = scores.masked_fill(mask == 0, -1e9)

    self_attn_softmax = F.softmax(scores, dim=-1)  # 进行softmax
    # 判断是否要对相似概率分布进行dropout操作
    if dropout is not None:
        self_attn_softmax = dropout(self_attn_softmax)

    # 注意:返回经自注意力计算后的值,以及进行softmax后的相似度(即相似概率分布)
    return torch.matmul(self_attn_softmax, value), self_attn_softmax


class MultiHeadAttention(nn.Module):
    """
    多头注意力计算
    """

    def __init__(self, head, d_model, dropout=0.1):
        """
        :param head: 头数
        :param d_model: 词向量的维度,必须是head的整数倍
        :param dropout: drop比率
        """
        super(MultiHeadAttention, self).__init__()
        assert (d_model % head == 0)  # 确保词向量维度是头数的整数倍
        self.d_k = d_model // head  # 被拆分为多头后的某一头词向量的维度
        self.head = head
        self.d_model = d_model

        """
        由于多头注意力机制是针对多组Q、K、V,因此有了下面这四行代码,具体作用是,
        针对未来每一次输入的Q、K、V,都给予参数进行构建
        其中linear_out是针对多头汇总时给予的参数
        """
        self.linear_query = nn.Linear(d_model, d_model)  # 进行一个普通的全连接层变化,但不修改维度
        self.linear_key = nn.Linear(d_model, d_model)
        self.linear_value = nn.Linear(d_model, d_model)
        self.linear_out = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(p=dropout)
        self.attn_softmax = None  # attn_softmax是能量分数, 即句子中某一个词与所有词的相关性分数, softmax(QK^T)

    def forward(self, query, key, value, mask=None):
        if mask is not None:
            """
            多头注意力机制的线性变换层是4维,是把query[batch, frame_num, d_model]变成[batch, -1, head, d_k]
            再1,2维交换变成[batch, head, -1, d_k], 所以mask要在第二维(head维)添加一维,与后面的self_attention计算维度一样
            具体点将,就是:
            因为mask的作用是未来传入self_attention这个函数的时候,作为masked_fill需要mask哪些信息的依据
            针对多head的数据,Q、K、V的形状维度中,只有head是通过view计算出来的,是多余的,为了保证mask和
            view变换之后的Q、K、V的形状一直,mask就得在head这个维度添加一个维度出来,进而做到对正确信息的mask
            """
            mask = mask.unsqueeze(1)

        n_batch = query.size(0)  # batch_size大小,假设query的维度是:[10, 32, 512],其中10是batch_size的大小

        """
        下列三行代码都在做类似的事情,对Q、K、V三个矩阵做处理
        其中view函数是对Linear层的输出做一个形状的重构,其中-1是自适应(自主计算)
        从这种重构中,可以看出,虽然增加了头数,但是数据的总维度是没有变化的,也就是说多头是对数据内部进行了一次拆分
        transopose(1,2)是对前形状的两个维度(索引从0开始)做一个交换,例如(2,3,4,5)会变成(2,4,3,5)
        因此通过transpose可以让view的第二维度参数变成n_head
        假设Linear成的输出维度是:[10, 32, 512],其中10是batch_size的大小
        注:这里解释了为什么d_model // head == d_k,如若不是,则view函数做形状重构的时候会出现异常
        """
        query = self.linear_query(query).view(n_batch, -1, self.head, self.d_k).transpose(1, 2)  # [b, 8, 32, 64],head=8
        key = self.linear_key(key).view(n_batch, -1, self.head, self.d_k).transpose(1, 2)  # [b, 8, 28, 64]
        value = self.linear_value(value).view(n_batch, -1, self.head, self.d_k).transpose(1, 2)  # [b, 8, 28, 64]

        # x是通过自注意力机制计算出来的值, self.attn_softmax是相似概率分布
        x, self.attn_softmax = self_attention(query, key, value, dropout=self.dropout, mask=mask)

        """
        下面的代码是汇总各个头的信息,拼接后形成一个新的x
        其中self.head * self.d_k,可以看出x的形状是按照head数拼接成了一个大矩阵,然后输入到linear_out层添加参数
        contiguous()是重新开辟一块内存后存储x,然后才可以使用.view方法,否则直接使用.view方法会报错
        """
        x = x.transpose(1, 2).contiguous().view(n_batch, -1, self.head * self.d_k)
        return self.linear_out(x)


class FeedForward(nn.Module):
    """
    两层具有残差网络的前馈神经网络,FNN网络
    """

    def __init__(self, d_model: int, d_ff: int, dropout=0.1):
        """
        :param d_model: FFN第一层输入的维度
        :param d_ff: FNN第二层隐藏层输入的维度
        :param dropout: drop比率
        """
        super(FeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
        self.dropout_1 = nn.Dropout(dropout)
        self.relu = nn.ReLU()
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x):
        """
        :param x: 输入数据,形状为(batch_size, input_len, model_dim)
        :return: 输出数据(FloatTensor),形状为(batch_size, input_len, model_dim)
        """
        inter = self.dropout_1(self.relu(self.w_1(self.layer_norm(x))))
        output = self.dropout_2(self.w_2(inter))
        # return output + x,即为残差网络
        return output  # + x


class SublayerConnection(nn.Module):
    """
    子层的连接: layer_norm(x + sublayer(x))
    上述可以理解为一个残差网络加上一个LayerNorm归一化
    """

    def __init__(self, size, dropout=0.1):
        """
        :param size: d_model
        :param dropout: drop比率
        """
        super(SublayerConnection, self).__init__()
        self.layer_norm = LayerNorm(size)
        # TODO:在SublayerConnection中LayerNorm可以换成nn.BatchNorm2d
        # self.layer_norm = nn.BatchNorm2d()
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer):
        return self.dropout(self.layer_norm(x + sublayer(x)))


class EncoderLayer(nn.Module):
    """
    一层编码Encoder层
    MultiHeadAttention -> Add & Norm -> Feed Forward -> Add & Norm
    """

    def __init__(self, size, attn, feed_forward, dropout=0.1):
        """
        :param size: d_model
        :param attn: 已经初始化的Multi-Head Attention层
        :param feed_forward: 已经初始化的Feed Forward层
        :param dropout: drop比率
        """
        super(EncoderLayer, self).__init__()
        self.attn = attn
        self.feed_forward = feed_forward

        """
        下面一行的作用是因为一个Encoder层具有两个残差结构的网络
        因此构建一个ModuleList存储两个SublayerConnection,以便未来对数据进行残差处理
        """
        self.sublayer_connection_list = clone_module_to_modulelist(SublayerConnection(size, dropout), 2)

    def forward(self, x, mask):
        """
        :param x: Encoder层的输入
        :param mask: mask标志
        :return: 经过一个Encoder层处理后的输出
        """
        """
        编码层第一层子层
        self.attn 应该是一个已经初始化的Multi-Head Attention层
        把Encoder的输入数据x和经过一个Multi-Head Attention处理后的x_attn送入第一个残差网络进行处理得到first_x
        """
        first_x = self.sublayer_connection_list[0](x, lambda x_attn: self.attn(x, x, x, mask))

        """
        编码层第二层子层
        把经过第一层子层处理后的数据first_x与前馈神经网络送入第二个残差网络进行处理得到Encoder层的输出
        """
        return self.sublayer_connection_list[1](first_x, self.feed_forward)


class DecoderLayer(nn.Module):
    """
    一层解码Decoder层
    Mask MultiHeadAttention -> Add & Norm -> Multi-Head Attention -> Add & Norm
    -> Feed Forward -> Add & Norm
    """

    def __init__(self, d_model, attn, feed_forward, sublayer_num, dropout=0.1):
        """
        :param d_model: d_model
        :param attn: 已经初始化的Multi-Head Attention层
        :param feed_forward: 已经初始化的Feed Forward层
        :param sublayer_num: 解码器内部子层数,如果未来r2l_memory传入有值,则为4层,否则为普通的3层
        :param dropout: drop比率
        """
        super(DecoderLayer, self).__init__()
        self.attn = attn
        self.feed_forward = feed_forward
        self.sublayer_connection_list = clone_module_to_modulelist(SublayerConnection(d_model, dropout), sublayer_num)

    def forward(self, x, l2r_memory, src_mask, trg_mask, r2l_memory=None, r2l_trg_mask=None):
        """
        :param x: Decoder的输入(captioning)
        :param l2r_memory: Encoder的输出,作为Multi-Head Attention的K,V值,为从左到右的Encoder的输出
        :param src_mask: 编码器输入的填充掩码
        :param trg_mask: 解码器输入的填充掩码和序列掩码,即对后面单词的掩码
        :param r2l_memory: 从右到左解码器的输出
        :param r2l_trg_mask: 从右到左解码器的输出的填充掩码和序列掩码
        :return: Encoder的输出
        """
        """
        解码器第一层子层
        把Decoder的输入数据x和经过一个Masked Multi-Head Attention处理后的first_x_attn送入第一个残差网络进行处理得到first_x
        """
        first_x = self.sublayer_connection_list[0](x, lambda first_x_attn: self.attn(x, x, x, trg_mask))

        """
        解码器第二层子层
        把第一层子层得到的first_x和
        经过一个Multi-Head Attention处理后的second_x_attn(由first_x和Encoder的输出进行自注意力计算)
        送入第二个残差网络进行处理
        """
        second_x = self.sublayer_connection_list[1](first_x,
                                                    lambda second_x_attn: self.attn(first_x, l2r_memory, l2r_memory,
                                                                                    src_mask))

        """
        解码器第三层子层
        把经过第二层子层处理后的数据second_x与前馈神经网络送入第三个残差网络进行处理得到Decoder层的输出
        
        如果有r2l_memory数据,则还需要经过一层多头注意力计算,也就是说会有四个残差网络
        r2l_memory是让Decoder层多了一层双向编码中从右到左的编码层
        而只要三个残差网络的Decoder层只有从左到右的编码
        """
        if not r2l_memory:
            # 进行从右到左的编码,增加语义信息
            third_x = self.sublayer_connection_list[-2](second_x,
                                                        lambda third_x_attn: self.attn(second_x, r2l_memory, r2l_memory,
                                                                                       r2l_trg_mask))
            return self.sublayer_connection_list[-1](third_x, self.feed_forward)
        else:
            return self.sublayer_connection_list[-1](second_x, self.feed_forward)


class Encoder(nn.Module):
    """
    构建n层编码层
    """

    def __init__(self, n, encoder_layer):
        """
        :param n: Encoder层的层数
        :param encoder_layer: 初始化的Encoder层
        """
        super(Encoder, self).__init__()
        self.encoder_layer_list = clone_module_to_modulelist(encoder_layer, n)

    def forward(self, x, src_mask):
        """
        :param x: 输入数据
        :param src_mask: mask标志
        :return: 经过n层Encoder处理后的数据
        """
        for encoder_layer in self.encoder_layer_list:
            x = encoder_layer(x, src_mask)
        return x


class 

相关阅读

热门文章

    手机版|MSIPO技术圈 皖ICP备19022944号-2

    Copyright © 2024, msipo.com

    返回顶部