Source code for paddlespeech.s2t.modules.embedding

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Copyright 2019 Mobvoi Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modified from wenet(https://github.com/wenet-e2e/wenet)
"""Positonal Encoding Module."""
import math
from typing import Tuple

import paddle
from paddle import nn

from paddlespeech.s2t.utils.log import Log

logger = Log(__name__).getlog()

__all__ = [
    "PositionalEncodingInterface", "NoPositionalEncoding", "PositionalEncoding",
    "RelPositionalEncoding"
]


[docs]class PositionalEncodingInterface:
[docs] def forward(self, x: paddle.Tensor, offset: int=0) -> Tuple[paddle.Tensor, paddle.Tensor]: """Compute positional encoding. Args: x (paddle.Tensor): Input tensor (batch, time, `*`). Returns: paddle.Tensor: Encoded tensor (batch, time, `*`). paddle.Tensor: Positional embedding tensor (1, time, `*`). """ raise NotImplementedError("forward method is not implemented")
[docs] def position_encoding(self, offset: int, size: int) -> paddle.Tensor: """ For getting encoding in a streaming fashion Args: offset (int): start offset size (int): requried size of position encoding Returns: paddle.Tensor: Corresponding position encoding """ raise NotImplementedError("position_encoding method is not implemented")
[docs]class NoPositionalEncoding(nn.Layer, PositionalEncodingInterface): def __init__(self, d_model: int, dropout_rate: float, max_len: int=5000, reverse: bool=False): nn.Layer.__init__(self)
[docs] def forward(self, x: paddle.Tensor, offset: int=0) -> Tuple[paddle.Tensor, paddle.Tensor]: return x, None
[docs] def position_encoding(self, offset: int, size: int) -> paddle.Tensor: return None
[docs]class PositionalEncoding(nn.Layer, PositionalEncodingInterface): def __init__(self, d_model: int, dropout_rate: float, max_len: int=5000, reverse: bool=False): """Positional encoding. PE(pos, 2i) = sin(pos/(10000^(2i/dmodel))) PE(pos, 2i+1) = cos(pos/(10000^(2i/dmodel))) Args: d_model (int): embedding dim. dropout_rate (float): dropout rate. max_len (int, optional): maximum input length. Defaults to 5000. reverse (bool, optional): Not used. Defaults to False. """ nn.Layer.__init__(self) self.d_model = paddle.to_tensor(d_model) self.max_len = max_len self.xscale = paddle.to_tensor(math.sqrt(self.d_model)) self.dropout = nn.Dropout(p=dropout_rate) self.base = paddle.to_tensor(10000.0) self.pe = paddle.zeros([1, self.max_len, self.d_model]) #[B=1,T,D] position = paddle.arange( 0, self.max_len, dtype=paddle.float32).unsqueeze(1) #[T, 1] # base^{-2(i-1)/d)}, i \in (1,2...,d/2) div_term = paddle.exp( -paddle.arange(0, self.d_model, 2, dtype=paddle.float32) * (paddle.log(self.base) / self.d_model)) # [B,T,D] self.pe[:, :, 0::2] = paddle.sin(position * div_term) self.pe[:, :, 1::2] = paddle.cos(position * div_term)
[docs] def forward(self, x: paddle.Tensor, offset: int=0) -> Tuple[paddle.Tensor, paddle.Tensor]: """Add positional encoding. Args: x (paddle.Tensor): Input. Its shape is (batch, time, ...) offset (int): position offset Returns: paddle.Tensor: Encoded tensor. Its shape is (batch, time, ...) paddle.Tensor: for compatibility to RelPositionalEncoding, (batch=1, time, ...) """ assert offset + x.shape[ 1] < self.max_len, "offset: {} + x.shape[1]: {} is larger than the max_len: {}".format( offset, x.shape[1], self.max_len) pos_emb = self.pe[:, offset:offset + x.shape[1]] x = x * self.xscale + pos_emb return self.dropout(x), self.dropout(pos_emb)
[docs] def position_encoding(self, offset: int, size: int) -> paddle.Tensor: """ For getting encoding in a streaming fashion Attention!!!!! we apply dropout only once at the whole utterance level in a none streaming way, but will call this function several times with increasing input size in a streaming scenario, so the dropout will be applied several times. Args: offset (int): start offset size (int): requried size of position encoding Returns: paddle.Tensor: Corresponding position encoding, #[1, T, D]. """ assert offset + size < self.max_len return self.dropout(self.pe[:, offset:offset + size])
[docs]class RelPositionalEncoding(PositionalEncoding): """Relative positional encoding module. See : Appendix B in https://arxiv.org/abs/1901.02860 """ def __init__(self, d_model: int, dropout_rate: float, max_len: int=5000): """ Args: d_model (int): Embedding dimension. dropout_rate (float): Dropout rate. max_len (int, optional): [Maximum input length.]. Defaults to 5000. """ super().__init__(d_model, dropout_rate, max_len, reverse=True) logger.info(f"max len: {max_len}")
[docs] def forward(self, x: paddle.Tensor, offset: int=0) -> Tuple[paddle.Tensor, paddle.Tensor]: """Compute positional encoding. Args: x (paddle.Tensor): Input tensor (batch, time, `*`). Returns: paddle.Tensor: Encoded tensor (batch, time, `*`). paddle.Tensor: Positional embedding tensor (1, time, `*`). """ assert offset + x.shape[ 1] < self.max_len, "offset: {} + x.shape[1]: {} is larger than the max_len: {}".format( offset, x.shape[1], self.max_len) x = x * self.xscale pos_emb = self.pe[:, offset:offset + x.shape[1]] return self.dropout(x), self.dropout(pos_emb)
# RotaryRelPositionalEncoding is same to RelPositionalEncoding class ScaledRotaryRelPositionalEncoding(RelPositionalEncoding): """Scaled Rotary Relative positional encoding module. POSITION INTERPOLATION: : https://arxiv.org/pdf/2306.15595v2.pdf """ def __init__(self, d_model: int, dropout_rate: float, max_len: int=5000, scale=1): """ Args: d_model (int): Embedding dimension. dropout_rate (float): Dropout rate. max_len (int, optional): [Maximum input length.]. Defaults to 5000. scale (int): Interpolation max input length to `scale * max_len` positions. """ super().__init__(d_model, dropout_rate, max_len, reverse=True) self.pscale = paddle.to_tensor(scale) self.max_len = max_len * scale def sinusoidal_embeddings(self, pos: paddle.Tensor, dim: paddle.Tensor, base=10000) -> paddle.Tensor: """计算pos位置的dim维sinusoidal编码""" assert dim % 2 == 0 # (d/2,) indices = paddle.arange(0, dim // 2, dtype=pos.dtype) indices = paddle.pow(paddle.cast(base, pos.dtype), -2 * indices / dim) # pos (1, T), indices (d/2,) -> (1, T, d/2) embeddings = paddle.einsum('...,d->...d', pos, indices) # (1, T, d/2, 2) embeddings = paddle.stack( [paddle.sin(embeddings), paddle.cos(embeddings)], axis=-1) # (1, T, d) embeddings = paddle.flatten(embeddings, start_axis=-2, stop_axis=-1) return embeddings def forward(self, x: paddle.Tensor, offset: int=0) -> Tuple[paddle.Tensor, paddle.Tensor]: """Compute positional encoding. Args: x (paddle.Tensor): Input tensor (batch, time, `*`). Returns: paddle.Tensor: Encoded tensor (batch, time, `*`). paddle.Tensor: Positional embedding tensor (1, time, `*`). """ x = x * self.xscale B, T, D = x.shape assert D == self.d_model # postion interploation start = 0 end = T * self.pscale assert end <= self.max_len position = paddle.arange(start, end, dtype=x.dtype).unsqueeze(0) position *= 1.0 / self.pscale pe = self.sinusoidal_embeddings(position, self.d_model, base=self.base) pos_emb = pe[:, offset:offset + x.shape[1]] return self.dropout(x), self.dropout(pos_emb) def position_encoding(self, offset: int, size: int) -> paddle.Tensor: """ For getting encoding in a streaming fashion Attention!!!!! we apply dropout only once at the whole utterance level in a none streaming way, but will call this function several times with increasing input size in a streaming scenario, so the dropout will be applied several times. Args: offset (int): start offset size (int): requried size of position encoding Returns: paddle.Tensor: Corresponding position encoding, #[1, T, D]. """ # postion interploation start = offset end = (offset + size) * self.pscale assert end <= self.max_len position = paddle.arange( start, end, dtype=paddle.get_default_dtype()).unsqueeze(0) position *= 1.0 / self.pscale pe = self.sinusoidal_embeddings(position, self.d_model, base=self.base) return self.dropout(pe)