Seq2Seq与注意力机制

深度学习专题 · 序列到序列模型与注意力机制全面解析

专题:深度学习系统学习

关键词:Seq2Seq, 注意力, Attention, 编码器-解码器, Teacher Forcing, Beam Search, BLEU

一、Seq2Seq模型概述

Seq2Seq(Sequence-to-Sequence)模型是一种端到端的深度学习架构,专门用于处理输入序列到输出序列的转换任务。其核心思想是利用两个循环神经网络(RNN)分别作为编码器(Encoder)和解码器(Decoder),通过一个上下文向量(Context Vector)将输入序列的信息传递给解码器,从而生成目标序列。

Seq2Seq模型最早由Sutskever等人于2014年在论文"Sequence to Sequence Learning with Neural Networks"中提出,同年Cho等人在"Learning Phrase Representations using RNN Encoder-Decoder for Statistical Machine Translation"中也独立提出了类似架构。这两个开创性工作奠定了神经机器翻译(Neural Machine Translation, NMT)的基础。

在传统的机器翻译方法中,系统需要依赖复杂的管道:分词、词性标注、句法分析、语义分析、语言模型等多个独立模块。而Seq2Seq模型通过一个统一的神经网络架构,将翻译过程简化为"编码-解码"两个步骤,实现了端到端的训练。这种简洁而强大的范式迅速被应用到机器翻译、文本摘要、对话系统、语音识别、视频字幕生成、代码生成等众多序列转换任务中。

核心思想:利用两个RNN分别作为编码器和解码器,通过上下文向量传递信息,实现任意长度输入到任意长度输出的序列转换。编码器将输入序列压缩为固定长度的上下文向量,解码器基于该向量逐步生成目标序列。

Seq2Seq的应用场景

二、编码器-解码器架构详解

2.1 编码器(Encoder)

编码器负责读取输入序列并将其编码为一个固定长度的上下文向量(Context Vector)。编码器通常是一个RNN网络(可以是LSTM或GRU),按时间步依次读取输入序列中的每个元素。在每个时间步 t,编码器接收当前输入 x_t 和上一个时间步的隐藏状态 h_{t-1},经过RNN单元计算后产生当前隐藏状态 h_t。这一过程本质上是对输入序列的逐步压缩与特征提取。

编码器最终将整个输入序列的信息压缩到最后一个时间步的隐藏状态 h_T(或者所有隐藏状态的某种组合)中,形成上下文向量 c。这个上下文向量需要承载输入序列的全部语义信息,因此其表达能力直接影响后续解码的质量。

import torch import torch.nn as nn class EncoderRNN(nn.Module): """ 编码器:使用GRU将输入序列编码为隐藏状态 """ def __init__(self, input_size, hidden_size, num_layers=1): super(EncoderRNN, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers # 词嵌入层:将离散的词索引映射为稠密向量 self.embedding = nn.Embedding(input_size, hidden_size) # GRU层:处理序列数据,比LSTM参数更少、训练更快 self.gru = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True, bidirectional=False) def forward(self, input_seq, hidden=None): """ input_seq: (batch, seq_len) 输入序列的词索引 hidden: (num_layers, batch, hidden_size) 初始隐藏状态 返回: outputs: (batch, seq_len, hidden_size) 每个时间步的隐藏状态 hidden: (num_layers, batch, hidden_size) 最后时间步的隐藏状态 """ # 词嵌入: (batch, seq_len, hidden_size) embedded = self.embedding(input_seq) # GRU前向传播 outputs, hidden = self.gru(embedded, hidden) return outputs, hidden def init_hidden(self, batch_size): """初始化隐藏状态为零向量""" return torch.zeros(self.num_layers, batch_size, self.hidden_size)

2.2 解码器(Decoder)

解码器基于编码器生成的上下文向量 c,逐步生成目标序列。解码器也是一个RNN网络,在每个时间步 t,它接收上一个时间步生成的输出 y_{t-1}(训练时使用真实目标值,即Teacher Forcing;推理时使用自己前一个时间步的输出)和当前的隐藏状态 s_{t-1},计算当前隐藏状态 s_t,然后通过一个线性层(输出投影层)和softmax函数预测当前时间步的输出词概率分布。

class DecoderRNN(nn.Module): """ 解码器(无注意力):基于编码器最终隐藏状态逐步生成目标序列 """ def __init__(self, output_size, hidden_size, num_layers=1): super(DecoderRNN, self).__init__() self.output_size = output_size self.hidden_size = hidden_size # 词嵌入 self.embedding = nn.Embedding(output_size, hidden_size) # GRU层 self.gru = nn.GRU(hidden_size, hidden_size, num_layers, batch_first=True) # 输出投影层:将隐藏状态映射到词汇表大小的向量 self.out = nn.Linear(hidden_size, output_size) # softmax:将logits转换为概率分布 self.softmax = nn.LogSoftmax(dim=1) def forward(self, input_token, hidden): """ input_token: (batch, 1) 当前时间步的输入词索引 hidden: (num_layers, batch, hidden_size) 上一时间步的隐藏状态 返回: output: (batch, output_size) 当前时间步的预测概率分布 hidden: (num_layers, batch, hidden_size) 当前隐藏状态 """ # 词嵌入: (batch, 1, hidden_size) embedded = self.embedding(input_token) # GRU: output (batch, 1, hidden_size), hidden same gru_output, hidden = self.gru(embedded, hidden) # 投影到词汇表空间: (batch, output_size) output = self.out(gru_output.squeeze(1)) # 对数softmax: (batch, output_size) output = self.softmax(output) return output, hidden

2.3 上下文向量(Context Vector)与信息瓶颈

上下文向量是编码器和解码器之间的唯一信息通道。在传统的Seq2Seq模型中,编码器必须将整个输入序列的全部语义信息压缩到一个**固定长度**的向量中。当输入序列较长时(例如超过20-30个词),这个固定长度的向量成为了系统的**信息瓶颈**(Information Bottleneck),难以承载完整的源语言信息。这是早期Seq2Seq模型在处理长句子时性能急剧下降的根本原因。

信息瓶颈问题:固定长度的上下文向量强制模型将不定长输入压缩为定长表示,导致长序列信息丢失。注意力机制的出现正是为了解决这一问题——它允许解码器在每一步直接访问编码器的所有隐藏状态,而不是仅依赖单一上下文向量。

2.4 Teacher Forcing训练策略

在训练过程中,解码器的每个时间步需要输入"上一个词"来预测当前词。如果使用解码器自己生成的词(可能包含错误),错误会逐级累积,导致训练不稳定且收敛缓慢。Teacher Forcing是一种简单而有效的应对策略:在训练时,无论解码器前一步预测是否正确,都强制使用**真实的目标词**作为当前步的输入。这相当于给解码器提供了一个"老师"在手把手地引导,使训练过程更加稳定、收敛更快。

然而,Teacher Forcing也有其弊端。训练时模型过度依赖真实前缀,而推理时只能使用自己生成的词,这种训练与推理之间的输入分布差异被称为**曝光偏差**(Exposure Bias)。为缓解此问题,研究人员提出了Scheduled Sampling策略:训练过程中以一定概率随机选择使用真实词或模型自己生成的词,概率随训练轮次动态调整。

def train_step(input_tensor, target_tensor, encoder, decoder, encoder_optim, decoder_optim, criterion, teacher_forcing_ratio=0.5): """单步训练,支持Teacher Forcing""" encoder_optim.zero_grad() decoder_optim.zero_grad() batch_size = input_tensor.size(0) target_len = target_tensor.size(1) target_vocab_size = decoder.output_size # 初始化损失 loss = 0 # 编码器前向传播 encoder_hidden = encoder.init_hidden(batch_size) encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden) # 解码器初始输入:<SOS>(序列开始标记) decoder_input = torch.full((batch_size, 1), SOS_token, dtype=torch.long) decoder_hidden = encoder_hidden # 编码器最终隐藏状态初始化解码器 # Teacher Forcing决策 use_teacher_forcing = (random.random() < teacher_forcing_ratio) if use_teacher_forcing: # 方式1:全部使用真实目标词作为输入 for t in range(target_len): decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden) loss += criterion(decoder_output, target_tensor[:, t]) decoder_input = target_tensor[:, t].unsqueeze(1) # 用真实值 else: # 方式2:全部使用模型自己的预测作为输入 for t in range(target_len): decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden) loss += criterion(decoder_output, target_tensor[:, t]) # 取概率最大的词作为下一步输入 _, topi = decoder_output.topk(1) decoder_input = topi.detach() # 断开梯度(避免通过采样反向传播) # 反向传播和优化 loss.backward() encoder_optim.step() decoder_optim.step() return loss.item() / target_len

经验之谈:实践中,Teacher Forcing比例通常设为0.5~1.0,并在训练后期逐渐降低以缓解曝光偏差。Scheduled Sampling方法可以进一步弥合训练和推理之间的输入分布差异。

三、注意力机制(Attention Mechanism)

注意力机制的灵感来自人类的视觉注意力机制——当人类观察一幅图像或阅读一段文字时,并非将所有信息等同处理,而是将注意力集中在最相关的部分。同样,在Seq2Seq模型中,注意力机制允许解码器在生成每个目标词时,动态地"关注"输入序列中与当前生成最相关的部分,而不是仅依赖一个固定长度的上下文向量。

注意力机制的引入是神经机器翻译领域最重要的突破之一。Bahdanau等人在2015年的论文"Neural Machine Translation by Jointly Learning to Align and Translate"中首次提出了注意力机制在Seq2Seq中的应用。随后Luong等人在2015年的"Effective Approaches to Attention-based Neural Machine Translation"中提出了更高效的注意力变体。

3.1 注意力机制的核心思想

在带注意力的解码器中,每个时间步 t 生成目标词时,不再是仅依赖编码器的最后一个隐藏状态,而是执行以下三个步骤:

  1. 计算注意力权重:将解码器当前隐藏状态 s_t 与编码器的每个隐藏状态 h_i 比较,计算出每个源位置的相关性分数,经过softmax归一化得到注意力权重 α_{ti}
  2. 计算上下文向量:将编码器所有隐藏状态按照注意力权重加权求和,得到当前时间步特有的上下文向量 c_t
  3. 预测目标词:将上下文向量 c_t 与解码器当前隐藏状态 s_t 结合,通过输出层预测当前目标词

注意力机制的本质:注意力机制实现了软对齐(Soft Alignment)——模型自动学习源语言和目标语言之间的词语对应关系,无需外部对齐工具。这与传统统计机器翻译中的硬对齐(Hard Alignment)有本质区别。

3.2 Bahdanau Attention(加性注意力)

Bahdanau注意力(也称为加性注意力或拼接注意力)是最早被提出的注意力机制之一。其核心思想是使用一个前馈神经网络来计算解码器隐藏状态与编码器隐藏状态之间的相关性分数。具体地,给定解码器隐藏状态 s_t 和编码器隐藏状态 h_i,相关性分数计算为:

e_{ti} = v_a^T · tanh(W_a · [s_t; h_i])

其中 v_a 和 W_a 是可学习的参数,[s_t; h_i] 表示将两个向量拼接。由于使用了tanh激活函数,加性注意力可以捕捉状态之间的非线性交互关系。

class BahdanauAttention(nn.Module): """ Bahdanau Attention(加性注意力) 使用前馈网络计算相关性分数:e = v^T * tanh(W * [decoder_hidden; encoder_output]) """ def __init__(self, hidden_size): super(BahdanauAttention, self).__init__() self.hidden_size = hidden_size # 将解码器隐藏状态映射到注意力空间 self.W_a = nn.Linear(hidden_size, hidden_size, bias=False) # 将编码器输出映射到注意力空间 self.U_a = nn.Linear(hidden_size, hidden_size, bias=False) # 注意力向量 v_a(用于计算标量分数) self.v_a = nn.Linear(hidden_size, 1, bias=False) def forward(self, decoder_hidden, encoder_outputs, mask=None): """ decoder_hidden: (batch, hidden_size) 当前解码器隐藏状态 encoder_outputs: (batch, src_len, hidden_size) 编码器所有时间步输出 mask: (batch, src_len) 可选的填充掩码 返回: context: (batch, hidden_size) 加权求和后的上下文向量 attention_weights: (batch, src_len) 注意力权重分布 """ src_len = encoder_outputs.size(1) # 扩展解码器隐藏状态以匹配编码器长度: (batch, src_len, hidden_size) decoder_hidden_expanded = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1) # 计算相关性分数: # score = v^T * tanh(W * decoder_hidden + U * encoder_outputs) energy = torch.tanh( self.W_a(decoder_hidden_expanded) + self.U_a(encoder_outputs) ) # (batch, src_len, 1) -> (batch, src_len) scores = self.v_a(energy).squeeze(2) # 应用掩码(将填充位置的分数设为很大的负数) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) # softmax归一化得到注意力权重 attention_weights = torch.softmax(scores, dim=1) # 加权求和得到上下文向量 context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs) context = context.squeeze(1) # (batch, hidden_size) return context, attention_weights

3.3 Luong Attention(乘性注意力)

Luong注意力(也称为乘性注意力或点积注意力)是Bahdanau注意力的简化版本,由Luong等人于2015年提出。其核心思想是使用更简单的点积运算来计算相关性分数,显著降低了计算复杂度。Luong注意力有三种变体:

  1. 点积(Dot):score(s_t, h_i) = s_t · h_i,最简洁的形式,要求两者维度相同
  2. 通用(General):score(s_t, h_i) = s_t · W_a · h_i,引入可学习权重矩阵
  3. 拼接(Concat):与Bahdanau类似,但使用不同的计算方式
class LuongAttention(nn.Module): """ Luong Attention(乘性注意力) 支持三种分数计算方式:dot, general, concat """ def __init__(self, hidden_size, method='general'): super(LuongAttention, self).__init__() self.method = method self.hidden_size = hidden_size if method == 'general': # 通用注意力:在编码器和解码器之间插入可学习权重 self.W_a = nn.Linear(hidden_size, hidden_size, bias=False) elif method == 'concat': # 拼接注意力:与Bahdanau类似 self.W_a = nn.Linear(hidden_size * 2, hidden_size, bias=False) self.v_a = nn.Linear(hidden_size, 1, bias=False) def _score(self, decoder_hidden, encoder_output): """ 计算单个解码器-编码器对的分数 decoder_hidden: (batch, hidden_size) encoder_output: (batch, src_len, hidden_size) 返回: (batch, src_len) """ if self.method == 'dot': # 点积: decoder_hidden * encoder_output^T # (batch, 1, hidden_size) @ (batch, hidden_size, src_len) return torch.bmm(decoder_hidden.unsqueeze(1), encoder_output.transpose(1, 2)).squeeze(1) elif self.method == 'general': # 通用: decoder_hidden * W * encoder_output^T transformed = self.W_a(decoder_hidden) # (batch, hidden_size) return torch.bmm(transformed.unsqueeze(1), encoder_output.transpose(1, 2)).squeeze(1) elif self.method == 'concat': # 拼接: v^T * tanh(W * [decoder_hidden; encoder_output]) src_len = encoder_output.size(1) decoder_expanded = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1) concat = torch.cat([decoder_expanded, encoder_output], dim=2) energy = torch.tanh(self.W_a(concat)) return self.v_a(energy).squeeze(2) def forward(self, decoder_hidden, encoder_outputs, mask=None): # 计算分数 scores = self._score(decoder_hidden, encoder_outputs) # 掩码处理 if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) # softmax归一化 attention_weights = torch.softmax(scores, dim=1) # 加权求和得到上下文向量 context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1) return context, attention_weights

3.4 全局注意力 vs 局部注意力

Luong在其论文中还区分了全局注意力(Global Attention)和局部注意力(Local Attention)。全局注意力考虑编码器的所有隐藏状态,计算全局范围内的注意力分布。虽然表达能力最强,但当输入序列非常长时,计算和存储开销也最大。

局部注意力则是对全局注意力的近似优化。它在每个时间步只关注输入序列的一个子集(一个窗口区域),而非全部位置。局部注意力结合了硬注意力(Hard Attention,只关注一个位置,不可微分)和软注意力(Soft Attention,关注所有位置,可微分)的优点——在保持可微分的同时大幅降低了计算复杂度。

特性全局注意力局部注意力
关注范围所有源位置一个窗口内的源位置
计算复杂度O(T_src) 每个时间步O(window_size) 每个时间步
对齐模型软对齐软对齐(窗口内)
适用场景中等长度输入长序列输入
是否可微分

3.5 注意力可视化:对齐矩阵

注意力机制最吸引人的特性之一是其可解释性。通过将注意力权重 α_{ti} 可视化为热力图(Heatmap),我们可以直观地看到模型在生成目标序列的每个词时,关注了源序列的哪些位置。这种可视化被称为**对齐矩阵**(Alignment Matrix),通常以二维网格的形式呈现,其中行为目标词,列为源词,颜色深浅表示注意力权重大小。

在英法翻译中,对齐矩阵通常呈现出清晰的对角线模式,表明模型学会了合理的词对齐关系。例如,在翻译"She is a student" -> "Elle est etudiante"时,注意力图会显示"Elle"对齐到"She"、"est"对齐到"is"、"etudiante"对齐到"student"。

四、注意力计算的数学框架:Query-Key-Value

在更一般的框架下,注意力机制可以被形式化为 Query-Key-Value(QKV)模型。这个框架由Vaswani等人在"Attention Is All You Need"(2017)中系统地阐述,并成为后续所有注意力变体的理论基础。无论是最初的Bahdanau注意力、Luong注意力,还是Transformer中的自注意力(Self-Attention),都可以统一到QKV框架中理解。

4.1 Query、Key、Value的含义

在注意力机制的QKV框架中,三个角色分别承担不同的功能:

注意力计算的过程可以类比为信息检索:用Query去匹配所有Key,找到最相关的信息,然后从相应的Value中提取实际内容。在大多数情况下,Key和Value是相同的(都是编码器的隐藏状态),但允许它们不同的设计可以带来更灵活的建模能力。

4.2 缩放点积注意力(Scaled Dot-Product Attention)

缩放点积注意力是Transformer中使用的核心计算方式,也是最高效的注意力变体之一。给定维度为 d_k 的Q和K,计算过程为:

Attention(Q, K, V) = softmax(Q · K^T / sqrt(d_k)) · V

其中缩放因子 sqrt(d_k) 至关重要:当 d_k 较大时,点积结果的方差增大,使得softmax函数的梯度处于极端区域(梯度极小),不利于训练。除以 sqrt(d_k) 将方差重新归一化为1,保证了梯度的稳定性。

class ScaledDotProductAttention(nn.Module): """ 缩放点积注意力(Scaled Dot-Product Attention) 这是Transformer中最核心的注意力计算方式 """ def __init__(self, dropout=0.1): super(ScaledDotProductAttention, self).__init__() self.dropout = nn.Dropout(dropout) def forward(self, query, key, value, mask=None): """ query: (batch, ..., seq_len_q, d_k) key: (batch, ..., seq_len_k, d_k) value: (batch, ..., seq_len_k, d_v) mask: (batch, ..., seq_len_q, seq_len_k) 布尔掩码 返回: output: (batch, ..., seq_len_q, d_v) 注意力输出 weights: (batch, ..., seq_len_q, seq_len_k) 注意力权重 """ d_k = query.size(-1) # 计算点积相似度: (batch, ..., seq_len_q, seq_len_k) scores = torch.matmul(query, key.transpose(-2, -1)) # 缩放:防止softmax梯度过小 scores = scores / math.sqrt(d_k) # 应用掩码(将需要屏蔽的位置设为很大的负数) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) # softmax归一化,得到概率分布 attention_weights = torch.softmax(scores, dim=-1) # dropout(训练时用于正则化) attention_weights = self.dropout(attention_weights) # 加权求和得到输出 output = torch.matmul(attention_weights, value) return output, attention_weights

4.3 从Seq2Seq到Transformer的演变

理解Seq2Seq+Attention的架构是理解Transformer的基础。在Transformer中:

class MultiHeadAttention(nn.Module): """ 多头注意力(Multi-Head Attention) 将Q、K、V投影到h个子空间,独立计算注意力,然后拼接 """ def __init__(self, d_model, num_heads, dropout=0.1): super(MultiHeadAttention, self).__init__() assert d_model % num_heads == 0 self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads # 线性投影层(保持维度不变,分割为多个头) self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) # 缩放点积注意力 self.attention = ScaledDotProductAttention(dropout) def forward(self, query, key, value, mask=None): batch_size = query.size(0) # 线性投影 + 重塑为多头形状 # (batch, seq_len, d_model) -> (batch, seq_len, num_heads, d_k) # -> (batch, num_heads, seq_len, d_k) Q = self.W_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) K = self.W_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) V = self.W_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) # 对每个头独立计算注意力 # attn_output: (batch, num_heads, seq_len_q, d_k) attn_output, attn_weights = self.attention(Q, K, V, mask) # 拼接所有头的输出 # (batch, num_heads, seq_len_q, d_k) -> (batch, seq_len_q, d_model) attn_output = attn_output.transpose(1, 2).contiguous() attn_output = attn_output.view(batch_size, -1, self.d_model) # 输出投影 output = self.W_o(attn_output) return output

五、注意力可视化

注意力权重提供了模型内部决策过程的可视化窗口。通过将注意力矩阵渲染为热力图,我们可以直观地理解模型的翻译行为——在生成每个目标词时,模型关注了源句子的哪些部分。这对调试模型行为、验证学习效果、以及进行错误分析都非常有价值。

5.1 注意力热力图(Heatmap)解读

注意力热力图以二维矩阵的形式展示注意力权重。行表示目标序列的位置(从<SOS>到<EOS>),列表示源序列的位置。矩阵中每个单元格的颜色深浅表示对应的注意力权重值,颜色越深表示权重越大,越表示该源词对生成当前目标词的贡献越大。

一个训练良好的翻译模型通常会呈现出近似对角线的模式,这是因为自然语言中词语的对应关系往往保持相对的语序一致性。然而,在某些语言对中(如德语-英语,动词位置不同),注意力图中会出现明显的跳跃模式,这正是模型学会了重新排列语序的证据。

import matplotlib.pyplot as plt import seaborn as sns import numpy as np def visualize_attention(input_tokens, output_tokens, attention_weights, title="Attention Heatmap", figsize=(10, 8)): """ 可视化注意力权重矩阵 参数: input_tokens: list[str] 输入序列的分词结果 output_tokens: list[str] 输出序列的分词结果 attention_weights: np.ndarray (target_len, source_len) 注意力权重矩阵 title: str 图标题 figsize: tuple 图像尺寸 """ plt.figure(figsize=figsize) # 绘制热力图 ax = sns.heatmap( attention_weights, xticklabels=input_tokens, yticklabels=output_tokens, cmap='Blues', annot=False, cbar_kws={'label': 'Attention Weight'} ) # 设置标题和轴标签 ax.set_title(title, fontsize=14, fontweight='bold') ax.set_xlabel('Source Tokens (Input)', fontsize=12) ax.set_ylabel('Target Tokens (Output)', fontsize=12) # 旋转x轴标签以避免重叠 plt.xticks(rotation=45, ha='right') plt.yticks(rotation=0) plt.tight_layout() plt.show() plt.close() def visualize_alignment_matrix(source_sentence, target_sentence, alignments, output_path=None): """ 展示源语言和目标语言之间的对齐关系 参数: source_sentence: str 源语句 target_sentence: str 目标语句 alignments: np.ndarray (target_len, source_len) 对齐矩阵 output_path: str 可选,保存图像的路径 """ source_words = source_sentence.split() target_words = target_sentence.split() # 确保矩阵维度匹配 assert alignments.shape == (len(target_words), len(source_words)) visualize_attention( source_words, target_words, alignments, title="Seq2Seq with Attention - Alignment Visualization" ) # 打印每个目标词关注最多的源词 print(f"\n{'Target Word':<15} {'Aligned Source Word':<20} {'Weight':<10}") print("-" * 50) for i, t_word in enumerate(target_words): max_idx = np.argmax(alignments[i]) max_weight = alignments[i][max_idx] print(f"{t_word:<15} {source_words[max_idx]:<20} {max_weight:.4f}") def plot_attention_weights_over_time(time_steps, source_tokens, attention_weights_per_step): """ 在多个解码时间步上可视化注意力分布的变化 参数: time_steps: list[int] 要显示的解码时间步 source_tokens: list[str] 源语言分词 attention_weights_per_step: list[np.ndarray] 每个时间步的注意力分布 """ n_steps = len(time_steps) fig, axes = plt.subplots(n_steps, 1, figsize=(12, 3 * n_steps)) if n_steps == 1: axes = [axes] for idx, (ax, step) in enumerate(zip(axes, time_steps)): weights = attention_weights_per_step[idx] ax.bar(range(len(weights)), weights) ax.set_xticks(range(len(source_tokens))) ax.set_xticklabels(source_tokens, rotation=45, ha='right') ax.set_ylim(0, 1) ax.set_ylabel('Attention Weight') ax.set_title(f'Decoding Step {step}') ax.axhline(y=1.0/len(weights), color='r', linestyle='--', alpha=0.5, label='Uniform Distribution') ax.legend() plt.tight_layout() plt.show()

实际应用:在机器翻译的调试过程中,注意力可视化是诊断模型行为的首选工具。如果某些目标词的注意力分布过于分散(均匀分布),通常表明模型对该词存在翻译困难。如果注意力集中在填充标记(PAD)上,则可能意味着对齐学习出了问题。

六、Seq2Seq + Attention 翻译系统完整实现

下面我们将构建一个完整的基于注意力机制的Seq2Seq神经机器翻译系统。系统使用GRU作为编码器和解码器的基本单元,使用Bahdanau注意力机制实现软对齐,并支持Teacher Forcing训练策略。整个实现在PyTorch框架中完成。

6.1 带注意力的解码器

class AttentionalDecoderRNN(nn.Module): """ 带Bahdanau注意力的解码器 在每个时间步: 1. 使用前一隐藏状态计算注意力权重 2. 加权求和编码器输出得到上下文向量 3. 拼接上下文向量和当前输入,通过GRU生成新的隐藏状态 4. 通过输出层预测当前词 """ def __init__(self, output_size, hidden_size, num_layers=1, dropout=0.1): super(AttentionalDecoderRNN, self).__init__() self.output_size = output_size self.hidden_size = hidden_size # 词嵌入层 self.embedding = nn.Embedding(output_size, hidden_size) # 注意力机制 self.attention = BahdanauAttention(hidden_size) # 将上下文向量和嵌入向量拼接后输入GRU self.gru = nn.GRU(hidden_size * 2, hidden_size, num_layers, batch_first=True) # 输出投影:将GRU输出映射到词汇表空间 self.out = nn.Linear(hidden_size, output_size) # dropout正则化 self.dropout = nn.Dropout(dropout) self.softmax = nn.LogSoftmax(dim=1) def forward(self, input_token, hidden, encoder_outputs, mask=None): """ input_token: (batch, 1) 当前输入词索引 hidden: (num_layers, batch, hidden_size) 上一时间步隐藏状态 encoder_outputs: (batch, src_len, hidden_size) 编码器全部输出 mask: (batch, src_len) 掩码 """ # 词嵌入 + dropout embedded = self.dropout(self.embedding(input_token)) # (batch, 1, hidden_size) # 计算注意力:使用上一时间步的隐藏状态(顶层) # 对于多层GRU,通常使用最后一层的隐藏状态计算注意力 decoder_hidden_top = hidden[-1] # (batch, hidden_size) context, attn_weights = self.attention( decoder_hidden_top, encoder_outputs, mask ) # context: (batch, hidden_size) # attn_weights: (batch, src_len) # 拼接上下文向量和嵌入向量 # (batch, 1, hidden_size * 2) gru_input = torch.cat([embedded, context.unsqueeze(1)], dim=2) # GRU前向 gru_output, hidden = self.gru(gru_input, hidden) # gru_output: (batch, 1, hidden_size) # 输出投影 output = self.out(gru_output.squeeze(1)) # (batch, output_size) # LogSoftmax output = self.softmax(output) return output, hidden, attn_weights

6.2 训练函数

def train_epoch(input_tensor, target_tensor, encoder, decoder, encoder_optim, decoder_optim, criterion, teacher_forcing_ratio=0.5, use_cuda=False): """训练一个epoch""" encoder.train() decoder.train() encoder_optim.zero_grad() decoder_optim.zero_grad() batch_size = input_tensor.size(0) target_len = target_tensor.size(1) target_vocab_size = decoder.output_size loss = 0 # 编码器前向 encoder_hidden = encoder.init_hidden(batch_size) if use_cuda: encoder_hidden = encoder_hidden.cuda() encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden) # 创建掩码(标记哪些位置是有效的词,哪些是填充) src_mask = (input_tensor != PAD_token).float() # 解码器初始输入 decoder_input = torch.LongTensor([[SOS_token]] * batch_size) if use_cuda: decoder_input = decoder_input.cuda() # 解码器初始隐藏状态:使用编码器最终隐藏状态 decoder_hidden = encoder_hidden # Teacher Forcing决策 use_tf = random.random() < teacher_forcing_ratio if use_tf: # Teacher Forcing:使用真实目标词作为输入 for t in range(target_len): decoder_output, decoder_hidden, attn = decoder( decoder_input, decoder_hidden, encoder_outputs, src_mask ) loss += criterion(decoder_output, target_tensor[:, t]) # 使用真实值作为下一步输入 decoder_input = target_tensor[:, t].unsqueeze(1) else: # 非Teacher Forcing:使用模型预测作为输入 for t in range(target_len): decoder_output, decoder_hidden, attn = decoder( decoder_input, decoder_hidden, encoder_outputs, src_mask ) loss += criterion(decoder_output, target_tensor[:, t]) # 取概率最大的词 _, topi = decoder_output.topk(1) decoder_input = topi.detach() # 反向传播 loss.backward() # 梯度裁剪(防止梯度爆炸) torch.nn.utils.clip_grad_norm_(encoder.parameters(), max_norm=5.0) torch.nn.utils.clip_grad_norm_(decoder.parameters(), max_norm=5.0) encoder_optim.step() decoder_optim.step() return loss.item() / target_len

6.3 贪心解码推理

def greedy_decode(encoder, decoder, input_tensor, max_length=50, use_cuda=False): """ 贪心解码:每个时间步选择概率最大的词 参数: input_tensor: (1, src_len) 编码后的输入序列 max_length: int 最大生成长度(防止无限循环) 返回: decoded_indices: list[int] 解码出的词索引序列 attention_weights: list[np.ndarray] 每个时间步的注意力权重 """ encoder.eval() decoder.eval() with torch.no_grad(): # 编码 encoder_hidden = encoder.init_hidden(1) if use_cuda: encoder_hidden = encoder_hidden.cuda() input_tensor = input_tensor.cuda() encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden) # 起始输入 decoder_input = torch.LongTensor([[SOS_token]]) # 使用编码器最终隐藏状态初始化解码器 decoder_hidden = encoder_hidden decoded_indices = [] attention_weights = [] for t in range(max_length): decoder_output, decoder_hidden, attn_weights = decoder( decoder_input, decoder_hidden, encoder_outputs ) # 收集注意力权重 attention_weights.append(attn_weights.squeeze(0).cpu().numpy()) # 选择概率最大的词 _, topi = decoder_output.topk(1) token_idx = topi.item() # 遇到结束标记则停止 if token_idx == EOS_token: break decoded_indices.append(token_idx) # 当前预测作为下一步输入 decoder_input = torch.LongTensor([[token_idx]]) attention_weights = np.array(attention_weights) return decoded_indices, attention_weights

6.4 BLEU评估指标

BLEU(Bilingual Evaluation Understudy)是机器翻译最常用的自动评估指标,由Papineni等人于2002年提出。BLEU的核心思想是计算模型生成的翻译(候选翻译)与一个或多个参考翻译之间的n-gram重叠程度。BLEU分数范围为0~1(通常以百分比表示),分数越高表示翻译质量越好。

BLEU的计算包含四项核心组件:

from collections import Counter import math import numpy as np def compute_bleu(candidate, references, max_n=4, weights=None): """ 计算BLEU分数 参数: candidate: list[str] 候选翻译(模型输出) references: list[list[str]] 参考翻译列表(通常有1-4个参考) max_n: int 最大n-gram阶数(标准为4) weights: list[float] 各阶n-gram的权重(默认均匀分布) 返回: bleu: float BLEU分数 precisions: list[float] 各阶n-gram的精确率 """ if weights is None: weights = [1.0 / max_n] * max_n candidate_len = len(candidate) ref_lens = [len(ref) for ref in references] # 选择最接近候选长度的参考长度(用于简短惩罚) closest_ref_len = min(ref_lens, key=lambda x: abs(x - candidate_len)) # 简短惩罚 if candidate_len == 0: return 0.0, [0.0] * max_n if candidate_len <= closest_ref_len: brevity_penalty = math.exp(1 - closest_ref_len / candidate_len) else: brevity_penalty = 1.0 precisions = [] for n in range(1, max_n + 1): # 统计候选翻译中的n-gram candidate_ngrams = Counter( tuple(candidate[i:i+n]) for i in range(len(candidate) - n + 1) ) # 统计所有参考翻译中的n-gram(合并计数) reference_ngrams = Counter() for ref in references: ref_ngrams = Counter( tuple(ref[i:i+n]) for i in range(len(ref) - n + 1) ) reference_ngrams |= ref_ngrams # 取并集(最大值) # 计算匹配的n-gram数(使用裁剪:不超过参考出现次数) matches = sum( min(count, reference_ngrams.get(ngram, 0)) for ngram, count in candidate_ngrams.items() ) total = max(sum(candidate_ngrams.values()), 1) # 避免除零 precisions.append(matches / total) # 如果任何阶数的精确率为0,整体系数为0 if any(p == 0 for p in precisions): return 0.0, precisions # 加权几何平均 log_bleu = sum(w * math.log(p) for w, p in zip(weights, precisions)) bleu = brevity_penalty * math.exp(log_bleu) return bleu, precisions def evaluate_translation(encoder, decoder, pairs, input_lang, output_lang, max_length=50, use_cuda=False, num_examples=10): """ 评估翻译质量:展示示例翻译 + 计算平均BLEU分数 """ total_bleu = 0.0 num_valid = 0 print(f"{'Source':<30} {'Reference':<30} {'Prediction':<30} {'BLEU':<10}") print("=" * 100) for i in range(min(num_examples, len(pairs))): src_sentence, tgt_sentence = pairs[i] # 将源句子的词索引化 input_tensor = tensor_from_sentence(input_lang, src_sentence) if use_cuda: input_tensor = input_tensor.cuda() # 贪心解码 decoded_indices, attn_weights = greedy_decode( encoder, decoder, input_tensor, max_length, use_cuda ) # 将索引转换回词 predicted_words = [output_lang.index2word[idx] for idx in decoded_indices] predicted_sentence = ' '.join(predicted_words) # 将参考句子分词 reference_words = tgt_sentence.split() # 计算BLEU bleu, _ = compute_bleu(predicted_words, [reference_words]) total_bleu += bleu num_valid += 1 print(f"{src_sentence:<30} {tgt_sentence:<30} " f"{predicted_sentence:<30} {bleu:.4f}") avg_bleu = total_bleu / max(num_valid, 1) print(f"\nAverage BLEU Score: {avg_bleu:.4f}") return avg_bleu

重要提示:虽然BLEU是最广泛使用的自动评估指标,但它并非完美。BLEU主要衡量词汇重叠,无法捕捉语义等价性和语法正确性。高BLEU分数不等同于高质量翻译。在学术研究中,BLEU通常与人工评估、TER(Translation Edit Rate)、METEOR等指标配合使用。

七、Beam Search 束搜索

在推理阶段,贪心解码(Greedy Decoding)在每个时间步只选择概率最大的一个词,这种方法虽然简单高效,但可能错过全局最优序列。因为语言具有**局部最优不等于全局最优**的特性——当前看似概率最大的词,后续可能导致整个序列概率降低。Beam Search(束搜索)在每一步保留 top-k 个候选序列(称为"束"或"beam"),通过维护多个候选路径来探索更广阔的搜索空间,显著提升了生成质量。

7.1 Beam Search的核心概念

def beam_search_decode(encoder, decoder, input_tensor, beam_width=5, max_length=50, length_penalty_alpha=1.0, use_cuda=False): """ Beam Search解码 参数: beam_width: int 束宽度 max_length: int 最大生成长度 length_penalty_alpha: float 长度惩罚系数(α=1时强惩罚,α=0时不惩罚) 返回: best_sequence: list[int] 最优序列(经过长度惩罚排序后) best_score: float 最优序列的归一化分数 all_candidates: list[tuple] 所有完成的候选序列及其分数 """ encoder.eval() decoder.eval() with torch.no_grad(): # --- 编码 --- encoder_hidden = encoder.init_hidden(1) if use_cuda: input_tensor = input_tensor.cuda() encoder_hidden = encoder_hidden.cuda() encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden) # --- 初始化束 --- # 每个候选是一个元组:(sequence, score, decoder_hidden) # score为累积对数概率 decoder_input = torch.LongTensor([[SOS_token]]) if use_cuda: decoder_input = decoder_input.cuda() decoder_hidden = encoder_hidden decoder_output, decoder_hidden, _ = decoder( decoder_input, decoder_hidden, encoder_outputs ) # 取top-k个初始词 log_probs = decoder_output.squeeze(0) # (vocab_size) topk_log_probs, topk_indices = log_probs.topk(beam_width) # 初始化束 beams = [] for i in range(beam_width): token = topk_indices[i].item() score = topk_log_probs[i].item() beams.append(([token], score, decoder_hidden)) # --- 束搜索迭代 --- completed_sequences = [] for step in range(1, max_length): new_beams = [] for seq, score, hidden in beams: # 准备当前输入 decoder_input = torch.LongTensor([[seq[-1]]]) if use_cuda: decoder_input = decoder_input.cuda() # 解码一步 decoder_output, hidden_next, _ = decoder( decoder_input, hidden, encoder_outputs ) # 取top-k扩展 log_probs = decoder_output.squeeze(0) # (vocab_size) topk_log_probs, topk_indices = log_probs.topk(beam_width) for i in range(beam_width): token = topk_indices[i].item() new_score = score + topk_log_probs[i].item() new_seq = seq + [token] if token == EOS_token: # 遇到结束标记,加入完成列表 completed_sequences.append((new_seq, new_score)) else: new_beams.append((new_seq, new_score, hidden_next)) # 如果没有新的候选,停止 if not new_beams: break # 对所有候选按分数排序,保留top-k new_beams.sort(key=lambda x: x[1], reverse=True) beams = new_beams[:beam_width] # 如果已经有了足够多的完成序列,可以提前停止 # (启发式:至少完成beam_width个,或达到2*beam_width) if len(completed_sequences) >= beam_width * 2: break # 将所有未完成的束也加入完成列表(强制终止) for seq, score, _ in beams: completed_sequences.append((seq, score)) # --- 长度惩罚和排序 --- def length_penalty(seq_len): """GNMT风格的长度惩罚:((5 + len)^α) / (5 + 1)^α""" return ((5 + seq_len) ** length_penalty_alpha) / (5 + 1) ** length_penalty_alpha # 对每个序列应用长度惩罚 scored_sequences = [] for seq, score in completed_sequences: # 排除SOS和EOS标记的计数 effective_len = len([t for t in seq if t not in (SOS_token, EOS_token)]) if effective_len == 0: continue # 归一化分数 normalized_score = score / length_penalty(effective_len) scored_sequences.append((seq, normalized_score)) # 按归一化分数排序 scored_sequences.sort(key=lambda x: x[1], reverse=True) if not scored_sequences: return [], float('-inf'), [] best_seq, best_score = scored_sequences[0] return best_seq, best_score, scored_sequences

7.2 Beam Search versus 贪心搜索

对比维度贪心搜索Beam Search
搜索策略局部最优(每个时间步选最佳)全局近似最优(维护多个候选)
候选路径数1k(束宽度)
时间复杂度O(T)O(k · T) 其中k为束宽度
空间复杂度O(T)O(k · T)
生成质量可能陷入局部最优显著优于贪心搜索
多样性差(总是输出相同结果)较好(可输出多个候选)
适用场景实时要求高的应用质量优先的离线生成

实践建议:对于机器翻译,束宽度 k=4 或 k=5 通常能提供较好的质量与速度平衡。当k增大到10以上时,质量提升趋于饱和,而计算成本线性增长。在文本摘要任务中,可以使用更大的束宽度(k=8~12)以获取更好的覆盖率和信息完整性。

7.3 Beam Search的改进变体

标准的Beam Search有几个已知的缺陷,研究人员提出了多种改进方案:

def beam_search_with_coverage_penalty(encoder, decoder, input_tensor, beam_width=5, max_length=50, length_penalty_alpha=0.8, coverage_penalty_beta=0.2): """ 带覆盖惩罚(Coverage Penalty)的Beam Search 覆盖惩罚机制:在计算候选分数时,额外惩罚过多关注同一源词的重复翻译行为。 覆盖向量 cov 记录每个源词被注意的总次数,覆盖惩罚与其成正比。 """ encoder.eval() decoder.eval() with torch.no_grad(): encoder_hidden = encoder.init_hidden(1) encoder_outputs, encoder_hidden = encoder(input_tensor, encoder_hidden) src_len = encoder_outputs.size(1) # 初始化 decoder_input = torch.LongTensor([[SOS_token]]) decoder_hidden = encoder_hidden decoder_output, decoder_hidden, attn_weights = decoder( decoder_input, decoder_hidden, encoder_outputs ) log_probs = decoder_output.squeeze(0) topk_log_probs, topk_indices = log_probs.topk(beam_width) # 每个beam维护:序列、分数、隐藏状态、覆盖向量 beams = [] for i in range(beam_width): token = topk_indices[i].item() score = topk_log_probs[i].item() cov_vector = attn_weights.squeeze(0).cpu().numpy() # (src_len,) beams.append(([token], score, decoder_hidden, cov_vector)) completed_sequences = [] for step in range(1, max_length): new_beams = [] for seq, score, hidden, coverage in beams: decoder_input = torch.LongTensor([[seq[-1]]]) decoder_output, hidden_next, attn_weights = decoder( decoder_input, hidden, encoder_outputs ) # 更新覆盖向量(累加注意力权重) current_attn = attn_weights.squeeze(0).cpu().numpy() updated_coverage = coverage + current_attn # 计算覆盖惩罚:注意力权重的方差(越大表示越不均匀) cov_penalty = coverage_penalty_beta * np.sum( np.minimum(current_attn, updated_coverage) ) log_probs = decoder_output.squeeze(0) topk_log_probs, topk_indices = log_probs.topk(beam_width) for i in range(beam_width): token = topk_indices[i].item() # 分数 = 累积对数概率 - 长度惩罚 + 覆盖惩罚 # 注意:这里覆盖惩罚是负的(鼓励均匀覆盖) new_score = (score + topk_log_probs[i].item() - cov_penalty) new_seq = seq + [token] if token == EOS_token: completed_sequences.append((new_seq, new_score)) else: new_beams.append( (new_seq, new_score, hidden_next, updated_coverage) ) if not new_beams: break # 按分数排序保留top-k new_beams.sort(key=lambda x: x[1], reverse=True) beams = new_beams[:beam_width] if len(completed_sequences) >= beam_width * 2: break for seq, score, _, _ in beams: completed_sequences.append((seq, score)) # 长度归一化 def length_penalty(seq_len): return ((5 + seq_len) ** length_penalty_alpha) / 6 ** length_penalty_alpha scored = [] for seq, score in completed_sequences: effective_len = len([t for t in seq if t not in (SOS_token, EOS_token)]) if effective_len == 0: continue normalized_score = score / length_penalty(effective_len) scored.append((seq, normalized_score)) scored.sort(key=lambda x: x[1], reverse=True) best_seq = scored[0][0] if scored else [] return best_seq, scored

八、核心要点总结

Seq2Seq与注意力机制是现代自然语言处理中最基础、最重要的技术之一,也是理解Transformer架构的基石。以下是本文的核心要点:

九、进一步思考与实践

Seq2Seq与注意力机制是理解现代NLP的重要基石。掌握了这些概念后,可以从以下方向继续深入:

进阶方向:

  • Transformer架构:深入理解自注意力、多头注意力、位置编码、层归一化等核心组件,完全理解"Attention Is All You Need"论文
  • BERT/GPT等预训练模型:理解自监督预训练+下游任务微调的范式,以及双向/单向语言模型的区别
  • 机器翻译前沿:理解Cascaded vs End-to-End方法,以及基于大规模预训练模型的零样本翻译
  • 更高效的注意力变体:Linear Attention、Reformer、Longformer、Performer等旨在降低注意力机制O(n²)计算复杂度的最新研究
  • 多模态注意力:视觉-语言模型(如CLIP、Flamingo)中的跨模态注意力机制

实践建议:理论学习之外,强烈建议动手实现一个完整的Seq2Seq+Attention机器翻译系统。可以从一个小型数据集开始(如英语-法语翻译的TED Talks数据集或IWSLT数据集),逐步增加模型复杂度。实践中注意使用梯度裁剪防止梯度爆炸、使用学习率预热和衰减策略、以及注意处理OOV(Out-of-Vocabulary)和稀有词翻译问题。

Seq2Seq与注意力机制的思想已经超越了自然语言处理的范畴,广泛应用于计算机视觉(图像注意力、视频理解)、语音处理(语音识别、语音合成)、生物信息学(蛋白质结构预测)等众多领域。掌握这两个核心概念,将为理解更广泛的AI技术打下坚实的基础。