手撕 Transformer

论文

Transformer经典结构

Transformer结构图如下

层次划分

  • 输入层
    • 词嵌入
    • 位置嵌入
  • Encoder 编码器层
    • 多头注意力层
    • 前馈神经网络层
    • 残差连接 + LayerNorm层
  • Decoder 解码器层
  • 输出层
    • 全连接神经网络
    • Softmax层

代码

Version 1

from DataWhale Transformer代码完全解读! (qq.com)

1. 输入层

1.1 Embedding

Embedding层的作用是将某种格式的输入数据,例如文本,转变为模型可以处理的向量表示,来描述原始数据所包含的信息

核心是采用 torch.nn.Embedding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
"""
类的初始化函数
d_model -- 词嵌入的维度
vacab -- 词表的大小
"""
super(Embeddings, self).__init__
#调用Embedding,获得一个词嵌入对象self.lut
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model

def forward(self, x):
embedds = self.lut(x)
return embedds * math.sqrt(self.d_model) #???
1.2 Positional Embedding

位置编码的作用是为模型提供当前时间步的前后出现顺序的信息。因为Transformer不像RNN那样的循环结构有前后不同时间步输入间天然的先后顺序,所有的时间步是同时输入,并行推理的,因此在时间步的特征中融合进位置编码的信息是合理的。

思考:为什么上面的公式可以作为位置编码?

我的理解:在上面公式的定义下,时间步p和时间步p+k的位置编码的内积,即 是与p无关,只与k有关的定值(不妨自行证明下试试)。也就是说,任意两个相距k个时间步的位置编码向量的内积都是相同的,这就相当于蕴含了两个时间步之间相对位置关系的信息。此外,每个时间步的位置编码又是唯一的,这两个很好的性质使得上面的公式作为位置编码是有理论保障的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class PositionalEmbedding(nn.Module):
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEmbedding, self).__init__
self.dropout = dropout
# 计算位置编码
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) * -math.log(10000) / d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe) ##???

def forward(self, x):
x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)

2.Encoder 编码器

2.1 编码器组

编码器作用是用于对输入进行特征提取,为解码环节提供有效的语义信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Encoder(nn.Module):
def __init__(self, layer, N):
'''
layer -- 编码器层
N -- 编码器层串联个数,原论文中为6
'''
super(Encoder, self).__init__
self.layers = nn.ModuleList([layer for i in range(N)])
self.ln = nn.LayerNorm(layer.size)

def forward(self, x, mask):
for layer in self.layers:
x = layer(x, mask)
return self.ln(x)

2.2 编码器子层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class EncoderLayer(nn.Module):
def __init__(self, size, AttentionLayer, FeedForwardLayer, dropout):
'''
size -- 词嵌入的维数
'''
super(EncoderLayer, self).__init__
self.atten = AttentionLayer
self.ffc = FeedForwardLayer
self.LayerNorm = nn.LayerNorm(size)
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask):
x = x + self.atten(x,mask)
x = self.LayerNorm(x)
x = self.dropout(x)

x = x + self.ffc(x, mask)
x = self.LayerNorm(x)
x = self.dropout(x)
return x
2.3 AttentionLayer

李宏毅老师的B站视频:https://www.bilibili.com/video/BV1J441137V6?from=search&seid=3530913447603589730

DataWhale开源项目:https://github.com/datawhalechina/learn-nlp-with-transformer

1
2
3
4
5
6
7
8
9
10
11
def AttentionLayer(query, key, value, mask=None, dropout=None):
"""计算点积注意力机制"""
d_k = query.size(-1) #取query的最后一维的大小,对应词嵌入的维度
scores = torch.matmul(query, key.transpose(-2,-1))/ math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_atten = F.softmax(scores, dim=-1)

if dropout is not None:
p_atten = dropout(p_atten)
return torch.matmul(p_atten, value), p_atten
2.4 MultiHeadAttention MHA

多头注意力机制的作用:这种结构设计能让每个注意力机制去优化每个词汇的不同特征部分,从而均衡同一种注意力机制可能产生的偏差,让词义拥有来自更多元表达,实验表明可以从而提升模型效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MultiHeadAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
super(MultiHeadAttention, self).__init__
assert d_model % h == 0
self.d_k = d_model // h
self.h = h

self.linears = clones(nn.Linear(d_model, d_model), 4)
self.atten = None
self.dropout= nn.Dropout(p=dropout)

def forward(self, query, key, value, mask=None):
if mask is not None:
mask = mask.unsqueeze(1)
nbatches = query.size(0)
# 首先利用zip将输入QKV与三个线性层组到一起,然后利用for循环,将输入QKV分别传到线性层中,做完线性变换后,开始为每个头分割输入,这里使用view方法对线性变换的结构进行维度重塑,多加了一个维度h代表头,这样就意味着每个头可以获得一部分词特征组成的句子,其中的-1代表自适应维度,计算机会根据这种变换自动计算这里的值,然后对第二维和第三维进行转置操作,为了让代表句子长度维度和词向量维度能够相邻,这样注意力机制才能找到词义与句子位置的关系,从attention函数中可以看到,利用的是原始输入的倒数第一和第二维,这样我们就得到了每个头的输入
query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1,2) for l, x in zip(self.linears, (query, key, value))]
# 得到每个头的输入后,接下来就是将他们传入到attention中,这里直接调用我们之前实现的attention函数,同时也将mask和dropout传入其中
x, self.atten = attention(query, key, value, mask=mask, dropout = self.dropout)
# 通过多头注意力计算后,我们就得到了每个头计算结果组成的4维张量,我们需要将其转换为输入的形状以方便后续的计算,因此这里开始进行第一步处理环节的逆操作,先对第二和第三维进行转置,然后使用contiguous方法。这个方法的作用就是能够让转置后的张量应用view方法,否则将无法直接使用,所以,下一步就是使用view重塑形状,变成和输入形状相同。
x = x.transpose(1,2).contiguous().view(nbatches, -1, self.h * self.d_k)

return self.linears[-1](x)


2.5 FFN 前馈全连接层

在进行了Attention操作之后,encoder和decoder中的每一层都包含了一个全连接前向网络,对每个position的向量分别进行相同的操作,包括两个线性变换和一个ReLU激活输出: \[ FFN(x)=max(0,xW_1+b_1)W_2+b_2 \] Attention模块每个时间步的输出都整合了所有时间步的信息,而Feed Forward Layer每个时间步只是对自己的特征的一个进一步整合,与其他时间步无关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward,self).__init__
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()

def forward(self, x):
x = self.w1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.w2(x)
return x
2.6 规范化层 LayerNorm

规范化层的作用:

规范化层是所有深层网络模型都需要的标准网络层,因为随着网络层数的增加,通过多层的计算后输出可能开始出现过大或过小的情况,这样可能会导致学习过程出现异常,模型可能收敛非常慢,因此都会在一定层后接规范化层进行数值的规范化,使特征数值在合理的范围内

LayerNorm

1
2
3
4
5
6
7
8
9
10
11
class LayerNorm(nn.Module):
def __init__(self, feature_size, eps=1e-6):
super(LayerNorm, self).__init__()
self.a2 = nn.Parameter(torch.ones(feature_size))
self.b2 = nn.Parameter(torch.zeros(feature_size))
self.eps = eps

def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a2 * (x-mean)/(std + self.eps) + self.b2

Version 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 手撕transformer
import torch
from torch import nn
import torch.nn.functional as F
import math

class AttentionHead(nn.Module):
def __init__(self, embed_dim, head_dim):
super(AttentionHead, self).__init__()
self.q = nn.Linear(embed_dim, head_dim)
self.k = nn.Linear(embed_dim, head_dim)
self.v = nn.Linear(embed_dim, head_dim)

def forward(self, query, key, value, mask=None):
query, key, value = self.q(query), self.k(key), self.v(value)
scores = torch.bmm(query, key.transpose(1, 2)) / math.sqrt(query.size(-1))
if mask is not None:
scores = scores.masked_fill(mask == 0, -float("inf"))
#转为按元素乘法?
weights = F.softmax(scores, dim=-1)
return torch.bmm(weights, value)

class MultiHeadAttention(nn.Module):
def __init__(self, config):
super(MultiHeadAttention, self).__init__()
embed_dim = config.hidden_size
num_heads = config.num_attention_heads
head_dim = embed_dim // num_heads #整除
self.heads = nn.ModuleList(
[AttentionHead(embed_dim, head_dim) for i in range(num_heads)]
)
self.output_linear = nn.Linear(embed_dim, embed_dim)

def forward(self, query, key, value, mask=None, query_mask=None, key_mask=None):
if query_mask is not None and key_mask is not None:
mask = torch.bmm(query_mask.unsqueeze(-1), key_mask.unsqueeze(1))
x = torch.cat([h(query, key, value, mask) for h in self.heads], dim=-1)
x = self.output_linear(x)
return x

class FeedForward(nn.Module):
def __init__(self, config):
super(FeedForward, self).__init__()
self.linear1 = nn.Linear(config.hidden_size, config.intermediate_size)
self.linear2 = nn.Linear(config.intermediate_size, config.hidden_size)
self.gelu = nn.GELU()
self.dropout = nn.Dropout(config.hidden_dropout_prob)

def forward(self, x):
x = self.linear1(x)
x = self.gelu(x)
x = self.linear2(x)
x = self.gelu(x)
x = self.dropout(x)
return x

class TransformerEncoderLayer(nn.Module):
def __init__(self, config):
super(TransformerEncoderLayer, self).__init__()
self.layer_norm = nn.LayerNorm(config.hidden_size)
self.attention = MultiHeadAttention(config)
self.feedforward = FeedForward(config)

def forward(self, x, mask=None):
hidden_state = self.layer_norm(x) ###??????????
x = x + self.attention(hidden_state, hidden_state, hidden_state, mask=mask)
x = x + self.feedforward(self.layer_norm(x))
return x

class Embeddings(nn.Module):
def __init__(self, config):
super(Embeddings, self).__init__()
self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
self.dropout = nn.Dropout()

def forward(self, input_ids):
seq_length = input_ids.size(1)
position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)

token_embeddings = self.token_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)#!
embeddings = token_embeddings + position_embeddings
embeddings = self.layer_norm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings

class TransformerEncoder(nn.Module):
def __init__(self, config):
super(TransformerEncoder, self).__init__()
self.embeddings = Embeddings(config)
self.layers = nn.ModuleList([TransformerEncoderLayer(config) for i in range(config.num_hidden_layers)])

def forward(self, x, mask=None):
x = self.embeddings(x)
for layer in self.layers:
x = layer(x, mask)
return x

if __name__=='__main__':
from transformers import AutoConfig
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
config = AutoConfig.from_pretrained(model_ckpt)

text1 = "time flies like an arrow"
text2 = "I Love you"

inputs = tokenizer(text1 + text2, return_tensors="pt", add_special_tokens=False)

encoder = TransformerEncoder(config)
print(encoder(inputs.input_ids).size())