RNN、LSTM、GRU等总是处理序列问题的第一选择
虽然他们展现出了非常优秀的处理效果,但在计算速度上却有着明显缺点

  • 基于循环的层以串行方式处理数据,完全无法利用硬件的并行计算优势
  • 无论是用卷积还是循环处理序列,其计算量都会随输入输出的维度增大而大幅增加

因此Ashish等人提出了仅基于Attention机制和全连接网络的Transformer
Transformer不仅计算效率大大提高,其处理效果也不亚于seq2seq

论文链接:Attention is all you need

Attention Is All You Need

Multi-Head Attention

Transformer中使用的作者提出的Multi-Head Attention

记输入的Query向量和(Key, Value)向量对分别为$Q,K,V$​,且维度分别为$d_Q,d_K,d_V$
传统的Dot-Product Attention公式为

而另一种传统的Attention——Additive Attention用一个全连接网络代替Dot-Product Attention的内积

Dot-Product Attention计算效率更高,而且当$d_k$​较小时两者表现相当
然而对于较大的$d_k$​,Dot-Product Attention的处理效果又会低于Additive Attention
作者推测因为$d_k$​​​较大时点积增长幅度大,导致softmax进入了梯度较小的区域

因此作者首先提出了Scaled Dot-Product Attention,公式为

进一步作者提出了Multi-Head Attention
首先将$Q,K,V$用$h$个不同的全连接网络投影成$h$份,同时将输入维度$d_{in}$降维到$d_Q,d_K,d_V$
然后对这$h$份不同的$Q,K,V$​​​​​​分别进行Scaled Dot-Product Attention
将h份Attention的结果拼接在一起后,最后再经过一层全连接,同时将维度还原

综上Multi-Head Attention的公式为

Multi-Head Attention

Positional Embedding

Transformer抛弃了RNN,但是序列数据的时序信息是不能随便抛弃的
例如下面两个句子

“I don’t like coffee, I like tea”
“I like coffee, I don’t like tea”

虽然组成它们的词语完全一样,但意义却截然相反

因为为了替代RNN做到提取时序信息的效果,作者提出了Positional Embedding
即对一个从0到length-1的序列进行embedding,并与原句子的embedding相加

Positional Embedding可以使用一个可学习的embedding层,也可以像论文中一样硬编码

作者的实验表明两种方法效果相当

Transformer结构

Transformer仍然基于encoder-decoder结构

encoder部分包含N个相同的sublayer
每个sublayer包含两部分:一个Multi-Head Attention和一个两层的全连接网络
两部分都使用了残差连接,且在残差链接之后应用了LayerNormalization

decoder部分和encoder类似
每个sublayer仅增加一层Multi-Head Attention

encoder和decoder的输入都是token embedding和positional embedding的结果

Transformer

self-Attention

注意到encoder和decoder第一层的Multi-Head Attention输入的Q,K,V是相同的,即self-attention

self-attention使得序列可以专注于自身的任意位置
而卷积要达到相同效果需要不断堆叠来扩大感受野,同理RNN也需要不断增加cell个数

Masked Attention

前面提到self-attention使得序列可以专注于自身的任意位置
但是decoder的输入在测试中只能不断将上一个输出单词作为下一个输入

那么训练过程中显然不能让self-attention专注于当前位置之后的数据
因此decoder的self-attention就需要加上一个mask

这个mask应该是一个尺寸为$(samples, T_Q, T_K)$​​​​​​的下三角矩阵,如下所示
(其中$T_Q,T_k$​​​​​分别为Q、K中句子长度)

举个例子,在self-attention中,假设

即句子包含三个单词,词向量分别为$s_1,s_2,s_3$​,然后Attention计算

因为接下来计算softmax就是在计算专注度,所以mask在这里加上

就实现了attention只专注前面的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# self-attention的mask
def getFeatureMask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, seq_length = input_shape[0], input_shape[1]

i = tf.range(seq_length)[:, tf.newaxis]
j = tf.range(seq_length)

# 生成下三角矩阵
mask = tf.cast(i >= j, dtype="int32")

# 复制batch_size份
mask = tf.reshape(mask, (1, seq_length, seq_length))
mult = tf.concat(
[tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
axis=0,
)

return tf.tile(mask, mult)

keras代码实现

说实话,transformer跑得确实是快,但是参数也是真的很难调。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
""" Transformer
paper: Attention is all you need
see: https://proceedings.neurips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf
"""

import tensorflow as tf
import keras
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Layer, Input, Dense, Embedding, Add, MultiHeadAttention, LayerNormalization
import numpy as np
from keras_tf.NLP.preprocessor import TatoebaPreprocessor # 数据预处理


class TransformerEncoderSublayer(Layer):
""" sublayer of transformer's encoder
using hyperparameters of the base model described in paper as default
"""

def __init__(self, latent_dim, num_heads=8, key_dim=64, hidden_dim=2048):
"""
:param latent_dim: input and output dimensions
:param num_heads: number of heads for multi-head attention in each sublayer
:param key_dim: the dimensions in multi-head attention after projection step
:param hidden_dim: units number of hidden layer in feedforward network in each sublayer
"""
super().__init__()

self.multihead_atten = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
self.feedforward = Sequential([
Dense(hidden_dim, activation='relu'),
Dense(latent_dim)
])
self.layernorm1 = LayerNormalization()
self.layernorm2 = LayerNormalization()

def call(self, inputs):
x = self.multihead_atten(inputs, inputs)
x = Add()([x, inputs])
outputs_atten = self.layernorm1(x)

x = self.feedforward(outputs_atten)
x = Add()([x, outputs_atten])
outputs = self.layernorm2(x)

return outputs

class TransformerDecoderSublayer(Layer):
""" sublayer of transformer's decoder
using hyperparameters of the base model described in paper as default
"""

def __init__(self, latent_dim, num_heads=8, key_dim=64, hidden_dim=2048):
"""
:param latent_dim: input and output dimensions
:param num_heads: number of heads for multi-head attention in each sublayer
:param key_dim: the dimensions in multi-head attention after projection step
:param hidden_dim: units number of hidden layer in feedforward network in each sublayer
"""
super().__init__()

self.multihead_atten_mask = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
self.multihead_atten = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
self.feedforward = Sequential([
Dense(hidden_dim, activation='relu'),
Dense(latent_dim)
])
self.layernorm1 = LayerNormalization()
self.layernorm2 = LayerNormalization()
self.layernorm3 = LayerNormalization()

def call(self, inputs, outputs_enc):
future_mask = self.getFeatureMask(inputs)

x = self.multihead_atten_mask(inputs, inputs, attention_mask=future_mask)
x = Add()([x, inputs])
outputs_atten_mask = self.layernorm1(x)

x = self.multihead_atten(inputs, outputs_enc)
x = Add()([x, outputs_atten_mask])
outputs_atten = self.layernorm2(x)

x = self.feedforward(outputs_atten)
x = Add()([x, outputs_atten])
outputs = self.layernorm3(x)

return outputs

def getFeatureMask(self, inputs):
""" future mask for self-attention
return a lower triangular matrix with shape (samples, T_Q, T_K) where T_Q = T_K
"""
input_shape = tf.shape(inputs)
batch_size, seq_length = input_shape[0], input_shape[1]

i = tf.range(seq_length)[:, tf.newaxis]
j = tf.range(seq_length)

# generate lower triangular matrix. shape: (length, length)
mask = tf.cast(i >= j, dtype="int32")

# tile to shape (batch_size, length, length)
mask = tf.reshape(mask, (1, seq_length, seq_length))
mult = tf.concat(
[tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
axis=0,
)

return tf.tile(mask, mult)

class TransformerEncoder(Layer):
def __init__(self, latent_dim, num_sublayer=6):
super().__init__()
self.sublayers = [TransformerEncoderSublayer(latent_dim) for _ in range(num_sublayer)]

def call(self, x):
for sublayer in self.sublayers:
x = sublayer(x)

return x

class TransformerDecoder(Layer):
def __init__(self, latent_dim, num_sublayer=6):
super().__init__()
self.sublayers = [TransformerDecoderSublayer(latent_dim) for _ in range(num_sublayer)]

def call(self, x, outputs_enc):
for sublayer in self.sublayers:
x = sublayer(x, outputs_enc)

return x

class CosinePositionalEmbedding(Layer):
""" positional embedding using cosine functions
"""
def __init__(self, mxlen, latent_dim):
super().__init__()
self.trainable = False # set to True is also allowed

encoding_matrix = np.array([
[pos / np.power(10000, 2 * (j // 2) / latent_dim) for j in range(latent_dim)]
for pos in range(mxlen)
])
encoding_matrix[:, 0::2] = np.sin(encoding_matrix[:, 0::2]) # dim 2i
encoding_matrix[:, 1::2] = np.cos(encoding_matrix[:, 1::2]) # dim 2i+1

self.embedding_pos = Embedding(
mxlen, latent_dim, embeddings_initializer=keras.initializers.constant(encoding_matrix))

def call(self, inputs):
seq_length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=seq_length, delta=1)

return self.embedding_pos(positions)

class LearnedPositionalEmbedding(Layer):
""" learned positional embedding
"""
def __init__(self, mxlen, latent_dim):
super().__init__()

self.embedding_pos = Embedding(mxlen, latent_dim)

def call(self, inputs):
seq_length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=seq_length, delta=1)

embedded_pos = self.embedding_pos(positions)

return embedded_pos

class Transformer:
def __init__(self):
preprocessor = TatoebaPreprocessor(dataDir='D:\\wallpaper\\datas\\fra-eng\\fra.txt')

self.text_en, self.text_fra = preprocessor.getOriginalText()
(self.dict_en, self.dict_en_rev), (self.dict_fra, self.dict_fra_rev) = preprocessor.getVocab()
num_word_en, num_word_fra = preprocessor.getNumberOfWord()
self.tensor_input, self.tensor_output = preprocessor.getPaddedSeq()

mxlen_en = self.tensor_input.shape[-1]
mxlen_fra = self.tensor_output.shape[-1]

self.buildNet(
num_word_en, num_word_fra,
mxlen_en, mxlen_fra
)

def embed(self, inputs, num_word, mxlen, latent_dim):
embedded_token = Embedding(num_word, latent_dim)(inputs)
embedded_pos = CosinePositionalEmbedding(mxlen, latent_dim)(inputs)

# using a learned embedding was proved to produce nearly identical results
# embedded_pos = LearnedPositionalEmbedding(mxlen, latent_dim)(inputs)

return embedded_token + embedded_pos

def buildNet(self, num_word_in, num_word_out, mxlen_in, mxlen_out, latent_dim=512, num_sublayer=6):
inputs = Input(shape=(None,))
targets = Input(shape=(None,))

# input embedding
embedded_inputs = self.embed(inputs, num_word_in, mxlen_in, latent_dim)
embedded_targets = self.embed(targets, num_word_out, mxlen_out, latent_dim)

outputs_enc = TransformerEncoder(latent_dim, num_sublayer=num_sublayer)(embedded_inputs)
outputs_dec = TransformerDecoder(latent_dim, num_sublayer=num_sublayer)(embedded_targets, outputs_enc)

prob = Dense(num_word_out, activation='softmax')(outputs_dec)

self.model = Model([inputs, targets], prob)

# there's no need to pass one-hot tensor when using sparse_categorical_crossentropy
self.model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

def trainModel(self, epochs, batch_size):
# there's one timestep shift when using teach forcing
outputs_shift = np.zeros(self.tensor_output.shape)
outputs_shift[:, :-1] = self.tensor_output.copy()[:, 1:]

self.model.fit(
[self.tensor_input, self.tensor_output], outputs_shift,
epochs=epochs, batch_size=batch_size, validation_split=0.2,
)

self.test()

self.model.save_weights('./transformer.h5')

def test(self):
for idx in range(5):
input_seq = self.tensor_input[idx: idx + 1]
translated = self.translate(input_seq)
print('-')
print('Input sentence:', self.text_en[idx])
print('Decoded sentence:', translated)
print('Ground truth:', self.text_fra[idx][1:])

def translate(self, input_seq):
# the current word is <sos>
output_seq = np.zeros((1, 1))
output_seq[0, 0] = self.dict_fra['\t']

max_length = 80
translated = ''
for _ in range(max_length):
pred = self.model.predict([input_seq, output_seq])

token_idx = np.argmax(pred[0, -1, :])
token = self.dict_fra_rev[token_idx]

# stop when <eos> has been generated
if token == '\n':
break

translated += ' ' + token

output_seq = np.hstack((output_seq, np.zeros((1, 1))))
output_seq[0, -1] = token_idx

return translated


transformer = Transformer()
transformer.trainModel(epochs=20, batch_size=64)