# ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ # This file was automatically generated from src/transformers/models/phimoe/modular_phimoe.py. # Do NOT edit this file manually as any edits will be overwritten by the generation of # the file from the modular. If any change should be done, please apply the change to the # modular_phimoe.py file directly. One of our CI enforces this. # ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ๐Ÿšจ # Copyright 2024 Microsoft and the HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections.abc import Callable from typing import Optional import torch from torch import nn from ... import initialization as init from ...activations import ACT2FN from ...cache_utils import Cache, DynamicCache from ...generation import GenerationMixin from ...integrations import use_experts_implementation, use_kernel_func_from_hub, use_kernelized_func from ...masking_utils import create_causal_mask, create_sliding_window_causal_mask from ...modeling_layers import GenericForSequenceClassification, GradientCheckpointingLayer from ...modeling_outputs import MoeCausalLMOutputWithPast, MoeModelOutputWithPast from ...modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel from ...processing_utils import Unpack from ...utils import TransformersKwargs, auto_docstring, can_return_tuple, is_grouped_mm_available from ...utils.generic import OutputRecorder, check_model_inputs, maybe_autocast from .configuration_phimoe import PhimoeConfig class PhimoeRotaryEmbedding(nn.Module): inv_freq: torch.Tensor # fix linting for `register_buffer` def __init__(self, config: PhimoeConfig, device=None): super().__init__() self.max_seq_len_cached = config.max_position_embeddings self.original_max_seq_len = config.max_position_embeddings self.config = config self.rope_type = self.config.rope_parameters["rope_type"] self.rope_init_fn: Callable = self.compute_default_rope_parameters if self.rope_type != "default": self.rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type] inv_freq, self.attention_scaling = self.rope_init_fn(self.config, device) self.register_buffer("inv_freq", inv_freq, persistent=False) self.register_buffer("original_inv_freq", inv_freq.clone(), persistent=False) @staticmethod def compute_default_rope_parameters( config: PhimoeConfig | None = None, device: Optional["torch.device"] = None, seq_len: int | None = None, ) -> tuple["torch.Tensor", float]: """ Computes the inverse frequencies according to the original RoPE implementation Args: config ([`~transformers.PreTrainedConfig`]): The model configuration. device (`torch.device`): The device to use for initialization of the inverse frequencies. seq_len (`int`, *optional*): The current sequence length. Unused for this type of RoPE. Returns: Tuple of (`torch.Tensor`, `float`), containing the inverse frequencies for the RoPE embeddings and the post-processing scaling factor applied to the computed cos/sin (unused in this type of RoPE). """ base = config.rope_parameters["rope_theta"] dim = getattr(config, "head_dim", None) or config.hidden_size // config.num_attention_heads attention_factor = 1.0 # Unused in this type of RoPE # Compute the inverse frequencies inv_freq = 1.0 / ( base ** (torch.arange(0, dim, 2, dtype=torch.int64).to(device=device, dtype=torch.float) / dim) ) return inv_freq, attention_factor @torch.no_grad() @dynamic_rope_update # power user: used with advanced RoPE types (e.g. dynamic rope) def forward(self, x, position_ids=None, layer_type=None): if layer_type is not None: raise ValueError( f"{self.__class__.__name__} does not support layer types, but got `layer_type={layer_type}`" ) mscale = None seq_len = torch.max(position_ids) + 1 if self.config.rope_parameters["rope_type"] != "default" and seq_len: mscale = ( self.config.rope_parameters["long_mscale"] if seq_len > self.config.rope_parameters["original_max_position_embeddings"] else self.config.rope_parameters["short_mscale"] ) inv_freq, attention_scaling = self.rope_init_fn(self.config, x.device, seq_len) mscale = attention_scaling if mscale is None else mscale inv_freq_expanded = inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1).to(x.device) position_ids_expanded = position_ids[:, None, :].float() device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu" with maybe_autocast(device_type=device_type, enabled=False): # Force float32 freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2) emb = torch.cat((freqs, freqs), dim=-1) cos = emb.cos() * mscale sin = emb.sin() * mscale return cos.to(x.dtype), sin.to(x.dtype) def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) @use_kernel_func_from_hub("rotary_pos_emb") def apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=1): """Applies Rotary Position Embedding to the query and key tensors. Args: q (`torch.Tensor`): The query tensor. k (`torch.Tensor`): The key tensor. cos (`torch.Tensor`): The cosine part of the rotary embedding. sin (`torch.Tensor`): The sine part of the rotary embedding. unsqueeze_dim (`int`, *optional*, defaults to 1): The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2. Returns: `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding. """ cos = cos.unsqueeze(unsqueeze_dim) sin = sin.unsqueeze(unsqueeze_dim) q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) return q_embed, k_embed def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: """ This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) """ batch, num_key_value_heads, slen, head_dim = hidden_states.shape if n_rep == 1: return hidden_states hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) def eager_attention_forward( module: nn.Module, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attention_mask: torch.Tensor | None, scaling: float, dropout: float = 0.0, **kwargs: Unpack[TransformersKwargs], ): key_states = repeat_kv(key, module.num_key_value_groups) value_states = repeat_kv(value, module.num_key_value_groups) attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling if attention_mask is not None: causal_mask = attention_mask[:, :, :, : key_states.shape[-2]] attn_weights = attn_weights + causal_mask attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype) attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) attn_output = torch.matmul(attn_weights, value_states) attn_output = attn_output.transpose(1, 2).contiguous() return attn_output, attn_weights @use_kernelized_func(apply_rotary_pos_emb) class PhimoeAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: PhimoeConfig, layer_idx: int): super().__init__() self.config = config self.layer_idx = layer_idx self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads self.scaling = self.head_dim**-0.5 self.attention_dropout = config.attention_dropout self.is_causal = True self.q_proj = nn.Linear( config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias ) self.k_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.v_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.o_proj = nn.Linear( config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias ) def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor] | None = None, attention_mask: torch.Tensor | None = None, past_key_values: Cache | None = None, cache_position: torch.LongTensor | None = None, **kwargs: Unpack[TransformersKwargs], ) -> tuple[torch.Tensor, torch.Tensor]: input_shape = hidden_states.shape[:-1] hidden_shape = (*input_shape, -1, self.head_dim) query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2) key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2) value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) cos, sin = position_embeddings query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_values is not None: # sin and cos are specific to RoPE models; cache_position needed for the static cache cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs) attention_interface: Callable = ALL_ATTENTION_FUNCTIONS.get_interface( self.config._attn_implementation, eager_attention_forward ) attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.attention_dropout, scaling=self.scaling, **kwargs, ) attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = self.o_proj(attn_output) return attn_output, attn_weights class PhimoeMultiplier(torch.autograd.Function): @staticmethod def forward( ctx, scores: torch.Tensor, multiplier: torch.Tensor, selected_experts: torch.Tensor, masked_gates: torch.Tensor, mask_for_one: torch.Tensor, ): """ Forward pass for the custom autograd function. Args: ctx: Context object to save information for backward computation. scores (torch.Tensor): Input scores tensor. multiplier (torch.Tensor): Multiplier tensor. selected_experts (torch.Tensor): Tensor of selected experts. masked_gates (torch.Tensor): Masked gates tensor. mask_for_one (torch.Tensor): Mask for one tensor. Returns: torch.Tensor: Result of the forward pass. """ ctx.save_for_backward(multiplier, selected_experts, masked_gates) return multiplier * mask_for_one @staticmethod def backward( ctx, grad_at_output: torch.Tensor, ): """ Backward pass for the custom autograd function. Args: ctx: Context object with saved tensors from the forward pass. grad_at_output (torch.Tensor): Gradient at the output. Returns: tuple[torch.Tensor, None, None, None, None]: Gradients for the inputs. """ multiplier, selected_experts, masked_gates = ctx.saved_tensors grad_at_output = grad_at_output * multiplier grad_at_scores_expanded = masked_gates * grad_at_output.mul(-1) grad_at_scores_expanded.scatter_add_( dim=-1, index=selected_experts, src=grad_at_output, ) return ( grad_at_scores_expanded, None, None, None, None, ) @use_experts_implementation class PhimoeExperts(nn.Module): """Collection of expert weights stored as 3D tensors.""" def __init__(self, config: PhimoeConfig): super().__init__() self.num_experts = config.num_local_experts self.hidden_dim = config.hidden_size self.intermediate_dim = config.intermediate_size self.gate_up_proj = nn.Parameter(torch.empty(self.num_experts, 2 * self.intermediate_dim, self.hidden_dim)) self.down_proj = nn.Parameter(torch.empty(self.num_experts, self.hidden_dim, self.intermediate_dim)) self.act_fn = ACT2FN[config.hidden_act] def forward( self, hidden_states: torch.Tensor, top_k_index: torch.Tensor, top_k_weights: torch.Tensor, ) -> torch.Tensor: final_hidden_states = torch.zeros_like(hidden_states) with torch.no_grad(): expert_mask = torch.nn.functional.one_hot(top_k_index, num_classes=self.num_experts) expert_mask = expert_mask.permute(2, 1, 0) expert_hit = torch.greater(expert_mask.sum(dim=(-1, -2)), 0).nonzero() for expert_idx in expert_hit: expert_idx = expert_idx[0] if expert_idx == self.num_experts: continue top_k_pos, token_idx = torch.where(expert_mask[expert_idx]) current_state = hidden_states[token_idx] gate, up = nn.functional.linear(current_state, self.gate_up_proj[expert_idx]).chunk(2, dim=-1) current_hidden_states = self.act_fn(gate) * up current_hidden_states = nn.functional.linear(current_hidden_states, self.down_proj[expert_idx]) current_hidden_states = current_hidden_states * top_k_weights[token_idx, top_k_pos, None] final_hidden_states.index_add_(0, token_idx, current_hidden_states.to(final_hidden_states.dtype)) return final_hidden_states def sparsemixer(scores, jitter_eps, training, top_k=2): """ Sparse mixer function to select top-k experts and compute multipliers. Based on the paper: https://huggingface.co/papers/2409.12136 We first replace the TopK(ยท) function as random sampling of discrete variables in model training. Then, following Liu et al. (2023a) and Liu et al. (2023b), we apply Heun's third order method to approximate the expert routing gradient and construct a modified back-propagation to give a mathematically sound gradient estimation for expert routing. Args: scores (torch.Tensor): Input scores tensor. jitter_eps (float): Jitter epsilon for numerical stability. training (bool): Flag indicating if the model is in training mode. top_k (int): Number of top experts to select. Returns: tuple[torch.Tensor, torch.Tensor]: Multiplier and selected experts tensors. """ with torch.no_grad(): # Compute mask for sparsity mask_logits_threshold, max_ind = scores.max(dim=-1, keepdim=True) factor = scores.abs().clamp(min=mask_logits_threshold) mask_logits_threshold = ((mask_logits_threshold - scores) / factor) > (2 * jitter_eps) # Apply mask masked_gates = scores.masked_fill(mask_logits_threshold, float("-inf")) if training: selected_experts = ( ( masked_gates - torch.empty_like(masked_gates, memory_format=torch.legacy_contiguous_format).exponential_().log() ) .max(dim=-1)[1] .unsqueeze(-1) ) # Gumbel sampling, more robust than the multinomial method else: selected_experts = max_ind # Compute scores for gradients masked_gates = torch.softmax(masked_gates, dim=-1) multiplier_o = masked_gates.gather(dim=-1, index=selected_experts) if training: # Compute midpoint mask max_scores, max_ind = masked_gates.max(dim=-1, keepdim=True) mask_for_one = torch.logical_or( selected_experts == max_ind, torch.rand_like(max_scores) > 0.75, # Heun's third-order method ) # 1 -> 1.0 & 0 -> 1./3: lambda x: (x + 0.5) / 1.5 mask_for_one = torch.add(0.3333, mask_for_one, alpha=0.6667).type_as(masked_gates) multiplier = PhimoeMultiplier.apply( scores, multiplier_o, selected_experts, masked_gates, mask_for_one, ) else: multiplier = multiplier_o # Masked out first expert masked_scores = torch.scatter( scores, -1, selected_experts, float("-inf"), ) with torch.no_grad(): # Compute mask for sparsity mask_logits_threshold, max_ind = masked_scores.max(dim=-1, keepdim=True) factor = scores.abs().clamp(min=mask_logits_threshold) mask_logits_threshold = ((mask_logits_threshold - scores) / factor) > (2 * jitter_eps) # Apply mask masked_gates_top2 = masked_scores.masked_fill(mask_logits_threshold, float("-inf")) if training: selected_experts_top2 = ( ( masked_gates_top2 - torch.empty_like(masked_gates_top2, memory_format=torch.legacy_contiguous_format) .exponential_() .log() ) .max(dim=-1)[1] .unsqueeze(-1) ) # Gumbel sampling, more robust than the multinomial method else: selected_experts_top2 = max_ind # Compute scores for gradients masked_gates_top2 = torch.softmax(masked_gates_top2, dim=-1) multiplier_top2_o = masked_gates_top2.gather(dim=-1, index=selected_experts_top2) if training: # Compute midpoint mask max_scores, max_ind = masked_gates_top2.max(dim=-1, keepdim=True) mask_for_one_top2 = torch.logical_or( selected_experts_top2 == max_ind, torch.rand_like(max_scores).uniform_() > 0.75, # Heun's third-order method ) # 1 -> 1.0 & 0 -> 1./3: lambda x: (x + 0.5) / 1.5 mask_for_one_top2 = torch.add(0.3333, mask_for_one_top2, alpha=0.6667).type_as(masked_gates_top2) multiplier_top2 = PhimoeMultiplier.apply( scores, multiplier_top2_o, selected_experts_top2, masked_gates_top2, mask_for_one_top2, ) else: multiplier_top2 = multiplier_top2_o multiplier = torch.concat((multiplier, multiplier_top2), dim=-1) selected_experts = torch.concat((selected_experts, selected_experts_top2), dim=-1) return ( multiplier, selected_experts, ) class PhimoeTopKRouter(nn.Linear): def __init__(self, config: PhimoeConfig): super().__init__(config.hidden_size, config.num_local_experts, bias=False) self.router_jitter_noise = config.router_jitter_noise self.input_jitter_noise = config.input_jitter_noise self.top_k = config.num_experts_per_tok def forward(self, hidden_states: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: if self.training and self.input_jitter_noise > 0: hidden_states *= torch.empty_like(hidden_states).uniform_( 1.0 - self.input_jitter_noise, 1.0 + self.input_jitter_noise ) router_logits = super().forward(hidden_states) routing_weights, selected_experts = sparsemixer( router_logits, jitter_eps=self.router_jitter_noise, training=self.training, top_k=self.top_k ) return routing_weights, selected_experts class PhimoeSparseMoeBlock(nn.Module): """ This implementation is strictly equivalent to standard MoE with full capacity (no dropped tokens). It's faster since it formulates MoE operations in terms of block-sparse operations to accommodate imbalanced assignments of tokens to experts, whereas standard MoE either (1) drop tokens at the cost of reduced performance or (2) set capacity factor to number of experts and thus waste computation and memory on padding. """ def __init__(self, config): super().__init__() self.hidden_dim = config.hidden_size self.ffn_dim = config.intermediate_size self.num_experts = config.num_local_experts self.top_k = config.num_experts_per_tok self.router = PhimoeTopKRouter(config) self.experts = PhimoeExperts(config) self.input_jitter_noise = config.input_jitter_noise def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: batch_size, sequence_length, hidden_dim = hidden_states.shape if self.training and self.input_jitter_noise > 0: hidden_states *= torch.empty_like(hidden_states).uniform_( 1.0 - self.input_jitter_noise, 1.0 + self.input_jitter_noise ) batch_size, sequence_length, hidden_dim = hidden_states.shape hidden_states = hidden_states.reshape(-1, hidden_dim) routing_weights, selected_experts = self.router(hidden_states) final_hidden_states = self.experts(hidden_states, selected_experts, routing_weights) return final_hidden_states.reshape(batch_size, sequence_length, hidden_dim) class PhimoeDecoderLayer(GradientCheckpointingLayer): def __init__(self, config: PhimoeConfig, layer_idx: int): super().__init__() self.hidden_size = config.hidden_size self.self_attn = PhimoeAttention(config, layer_idx) self.mlp = PhimoeSparseMoeBlock(config) # Phimoe uses nn.LayerNorm self.input_layernorm = nn.LayerNorm(config.hidden_size, eps=config.rms_norm_eps, elementwise_affine=True) self.post_attention_layernorm = nn.LayerNorm( config.hidden_size, eps=config.rms_norm_eps, elementwise_affine=True ) def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor] | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, cache_position: torch.LongTensor | None = None, **kwargs: Unpack[TransformersKwargs], ) -> torch.Tensor: residual = hidden_states hidden_states = self.input_layernorm(hidden_states) hidden_states, _ = self.self_attn( hidden_states=hidden_states, position_embeddings=position_embeddings, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, cache_position=cache_position, **kwargs, ) hidden_states = residual + hidden_states residual = hidden_states hidden_states = self.post_attention_layernorm(hidden_states) hidden_states = self.mlp(hidden_states) hidden_states = residual + hidden_states return hidden_states @auto_docstring class PhimoePreTrainedModel(PreTrainedModel): config: PhimoeConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["PhimoeDecoderLayer"] _skip_keys_device_placement = ["past_key_values"] _supports_flash_attn = True _supports_sdpa = True _supports_flex_attn = True _can_compile_fullgraph = ( is_grouped_mm_available() ) # https://huggingface.co/docs/transformers/experts_interface#torchcompile _supports_attention_backend = True _can_record_outputs = { "router_logits": OutputRecorder(PhimoeTopKRouter, layer_name="mlp.router", index=0), "hidden_states": PhimoeDecoderLayer, "attentions": PhimoeAttention, } @torch.no_grad() def _init_weights(self, module): super()._init_weights(module) std = self.config.initializer_range if isinstance(module, PhimoeExperts): init.normal_(module.gate_up_proj, mean=0.0, std=std) init.normal_(module.down_proj, mean=0.0, std=std) elif isinstance(module, PhimoeTopKRouter): init.normal_(module.weight, mean=0.0, std=std) @auto_docstring class PhimoeModel(PhimoePreTrainedModel): def __init__(self, config: PhimoeConfig): super().__init__(config) self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) self.layers = nn.ModuleList( [PhimoeDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)] ) self.norm = nn.LayerNorm(config.hidden_size, eps=config.rms_norm_eps, elementwise_affine=True) self.rotary_emb = PhimoeRotaryEmbedding(config=config) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() @check_model_inputs @auto_docstring def forward( self, input_ids: torch.LongTensor | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, inputs_embeds: torch.FloatTensor | None = None, use_cache: bool | None = None, cache_position: torch.LongTensor | None = None, **kwargs: Unpack[TransformersKwargs], ) -> MoeModelOutputWithPast: if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if use_cache and past_key_values is None: past_key_values = DynamicCache(config=self.config) if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if cache_position is None: past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device ) if position_ids is None: position_ids = cache_position.unsqueeze(0) mask_function = create_causal_mask if self.config.sliding_window is None else create_sliding_window_causal_mask causal_mask = mask_function( config=self.config, input_embeds=inputs_embeds, attention_mask=attention_mask, cache_position=cache_position, past_key_values=past_key_values, position_ids=position_ids, ) hidden_states = inputs_embeds position_embeddings = self.rotary_emb(hidden_states, position_ids=position_ids) for decoder_layer in self.layers[: self.config.num_hidden_layers]: hidden_states = decoder_layer( hidden_states, attention_mask=causal_mask, position_ids=position_ids, past_key_values=past_key_values, use_cache=use_cache, cache_position=cache_position, position_embeddings=position_embeddings, **kwargs, ) hidden_states = self.norm(hidden_states) return MoeModelOutputWithPast( # only diff with Mistral is the output type, we need MoE last_hidden_state=hidden_states, past_key_values=past_key_values, ) def load_balancing_loss_func( gate_logits: torch.Tensor | tuple[torch.Tensor] | None, num_experts: int | None = None, top_k=2, attention_mask: torch.Tensor | None = None, ) -> torch.Tensor | int: r""" Computes auxiliary load balancing loss as in Switch Transformer - implemented in Pytorch. See Switch Transformer (https://huggingface.co/papers/2101.03961) for more details. This function implements the loss function presented in equations (4) - (6) of the paper. It aims at penalizing cases where the routing between experts is too unbalanced. Args: gate_logits: Logits from the `gate`, should be a tuple of model.config.num_hidden_layers tensors of shape [batch_size X sequence_length, num_experts]. num_experts: Number of experts top_k: The number of experts to route per-token, can be also interpreted as the `top-k` routing parameter. attention_mask (`torch.Tensor`, *optional*): The attention_mask used in forward function shape [batch_size X sequence_length] if not None. Returns: The auxiliary loss. """ if gate_logits is None or not isinstance(gate_logits, tuple): return 0 if isinstance(gate_logits, tuple): compute_device = gate_logits[0].device concatenated_gate_logits = torch.cat([layer_gate.to(compute_device) for layer_gate in gate_logits], dim=0) routing_weights = torch.nn.functional.softmax(concatenated_gate_logits, dim=-1) _, selected_experts = torch.topk(routing_weights, top_k, dim=-1) expert_mask = torch.nn.functional.one_hot(selected_experts, num_experts) if attention_mask is None: # Compute the percentage of tokens routed to each experts tokens_per_expert = torch.mean(expert_mask.float(), dim=0) # Compute the average probability of routing to these experts router_prob_per_expert = torch.mean(routing_weights, dim=0) else: batch_size, sequence_length = attention_mask.shape num_hidden_layers = concatenated_gate_logits.shape[0] // (batch_size * sequence_length) # Compute the mask that masks all padding tokens as 0 with the same shape of expert_mask expert_attention_mask = ( attention_mask[None, :, :, None, None] .expand((num_hidden_layers, batch_size, sequence_length, top_k, num_experts)) .reshape(-1, top_k, num_experts) .to(compute_device) ) # Compute the percentage of tokens routed to each experts tokens_per_expert = torch.sum(expert_mask.float() * expert_attention_mask, dim=0) / torch.sum( expert_attention_mask, dim=0 ) # Compute the mask that masks all padding tokens as 0 with the same shape of tokens_per_expert router_per_expert_attention_mask = ( attention_mask[None, :, :, None] .expand((num_hidden_layers, batch_size, sequence_length, num_experts)) .reshape(-1, num_experts) .to(compute_device) ) # Compute the average probability of routing to these experts router_prob_per_expert = torch.sum(routing_weights * router_per_expert_attention_mask, dim=0) / torch.sum( router_per_expert_attention_mask, dim=0 ) overall_loss = torch.sum(tokens_per_expert * router_prob_per_expert.unsqueeze(0)) return overall_loss * num_experts @auto_docstring class PhimoeForCausalLM(PhimoePreTrainedModel, GenerationMixin): _tied_weights_keys = {"lm_head.weight": "model.embed_tokens.weight"} _tp_plan = {"lm_head": "colwise_gather_output"} _pp_plan = {"lm_head": (["hidden_states"], ["logits"])} def __init__(self, config): super().__init__(config) self.model = PhimoeModel(config) self.vocab_size = config.vocab_size self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=self.config.lm_head_bias) self.router_aux_loss_coef = config.router_aux_loss_coef self.num_experts = config.num_local_experts self.num_experts_per_tok = config.num_experts_per_tok # Initialize weights and apply final processing self.post_init() @can_return_tuple @auto_docstring def forward( self, input_ids: torch.LongTensor | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, inputs_embeds: torch.FloatTensor | None = None, labels: torch.LongTensor | None = None, use_cache: bool | None = None, output_router_logits: bool | None = None, cache_position: torch.LongTensor | None = None, logits_to_keep: int | torch.Tensor = 0, **kwargs: Unpack[TransformersKwargs], ) -> MoeCausalLMOutputWithPast: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. Example: ```python >>> from transformers import AutoTokenizer, PhimoeForCausalLM >>> model = PhimoeForCausalLM.from_pretrained("mistralai/Phimoe-8x7B-v0.1") >>> tokenizer = AutoTokenizer.from_pretrained("mistralai/Phimoe-8x7B-v0.1") >>> prompt = "Hey, are you conscious? Can you talk to me?" >>> inputs = tokenizer(prompt, return_tensors="pt") >>> # Generate >>> generate_ids = model.generate(inputs.input_ids, max_length=30) >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] "Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." ```""" output_router_logits = ( output_router_logits if output_router_logits is not None else self.config.output_router_logits ) # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) outputs: MoeModelOutputWithPast = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_router_logits=output_router_logits, cache_position=cache_position, **kwargs, ) hidden_states = outputs.last_hidden_state # Only compute necessary logits, and do not upcast them to float if we are not computing the loss slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep logits = self.lm_head(hidden_states[:, slice_indices, :]) loss = None if labels is not None: loss = self.loss_function(logits, labels, self.vocab_size, **kwargs) aux_loss = None if output_router_logits: aux_loss = load_balancing_loss_func( outputs.router_logits, self.num_experts, self.num_experts_per_tok, attention_mask, ) if labels is not None: loss += self.router_aux_loss_coef * aux_loss.to(loss.device) # make sure to reside in the same device return MoeCausalLMOutputWithPast( loss=loss, aux_loss=aux_loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, router_logits=outputs.router_logits, ) # Copied from transformers.models.phi3.modeling_phi3.Phi3ForCausalLM.prepare_inputs_for_generation def prepare_inputs_for_generation( self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, cache_position=None, position_ids=None, use_cache=True, logits_to_keep=None, **kwargs, ): # Overwritten -- this model may need to switch between short and long rope, invalidating the cache in the # process # When the first time input length reached long and short factor switching point, enforce re-compute cache # It will cause downside of slower at this single token position, however, better than current failure. if ( past_key_values and hasattr(self.config, "original_max_position_embeddings") and input_ids.shape[1] >= self.config.original_max_position_embeddings + 1 ): past_length = cache_position[0] if past_length <= self.config.original_max_position_embeddings: past_key_values = None model_inputs = super().prepare_inputs_for_generation( input_ids=input_ids, past_key_values=past_key_values, attention_mask=attention_mask, inputs_embeds=inputs_embeds, cache_position=cache_position, position_ids=position_ids, use_cache=use_cache, logits_to_keep=logits_to_keep, **kwargs, ) return model_inputs class PhimoeForSequenceClassification(GenericForSequenceClassification, PhimoePreTrainedModel): ... __all__ = ["PhimoePreTrainedModel", "PhimoeModel", "PhimoeForCausalLM", "PhimoeForSequenceClassification"]