From 2fe8ffae48fdfb44839894e8a6d589d778d19396 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 17 Jun 2025 17:27:44 -0700 Subject: [PATCH 01/14] remove CODEOWNERS --- .github/CODEOWNERS | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS deleted file mode 100644 index 000cc1af1..000000000 --- a/.github/CODEOWNERS +++ /dev/null @@ -1,10 +0,0 @@ -# This is a CODEOWNERS file. -# Each line is a file pattern followed by one or more owners. - -# These owners will be the default owners for everything in -# the repo. Unless a later match takes precedence, -# they will be requested for review when someone opens a pull request. -* @tianyu-l @fegin @wwwjn @wconstab - -# Exclude the experiments directory by adding a pattern without owners -/torchtitan/experiments/ From 9ec5113c9ecc54778a1da68227bbd3c3820da1ef Mon Sep 17 00:00:00 2001 From: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Date: Wed, 18 Jun 2025 12:21:42 -0700 Subject: [PATCH 02/14] Implement Deepseek-V3 model skeleton (#1315) ## Contents 1. Attention module 2. MoE module (note: I only implemented the naive routing, not the "node limit routing" strategy) 3. Deepseek-V3 model Reference: 1. Deepseek-ai: https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py 4. Huggingface: https://huggingface.co/deepseek-ai/DeepSeek-V3-Base/blob/main/modeling_deepseek.py 5. torchtitan/experiment/deepseek-v3 6. torchtitan/experiment/llama4 ## TODO - [ ] Further clean up the DeepseekV3ModelArgs class, remove unused model args - [ ] Test forward pass w/ torchtitan --- torchtitan/models/deepseek-v3/model/args.py | 106 ++++++ torchtitan/models/deepseek-v3/model/model.py | 337 ++++++++++++++++++ torchtitan/models/deepseek-v3/model/moe.py | 339 +++++++++++++++++++ 3 files changed, 782 insertions(+) create mode 100644 torchtitan/models/deepseek-v3/model/args.py create mode 100644 torchtitan/models/deepseek-v3/model/model.py create mode 100644 torchtitan/models/deepseek-v3/model/moe.py diff --git a/torchtitan/models/deepseek-v3/model/args.py b/torchtitan/models/deepseek-v3/model/args.py new file mode 100644 index 000000000..845d6b83e --- /dev/null +++ b/torchtitan/models/deepseek-v3/model/args.py @@ -0,0 +1,106 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# Copyright (c) Meta Platforms, Inc. All Rights Reserved. + + +from dataclasses import dataclass +from typing import Literal + +from torch import nn + +from torchtitan.components.tokenizer import Tokenizer +from torchtitan.config_manager import JobConfig +from torchtitan.protocols.train_spec import BaseModelArgs + + +# Reference: https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py +@dataclass +class DeepseekV3ModelArgs(BaseModelArgs): + """ + Data class for defining model arguments and hyperparameters. + + Attributes: + max_batch_size (int): Maximum batch size. + max_seq_len (int): Maximum sequence length. + dtype (Literal["bf16", "fp8"]): Data type for computations. + vocab_size (int): Vocabulary size. + dim (int): Model dimension. + inter_dim (int): Intermediate dimension for MLP layers. + moe_inter_dim (int): Intermediate dimension for MoE layers. + n_layers (int): Number of transformer layers. + n_dense_layers (int): Number of dense layers in the model. + n_heads (int): Number of attention heads. + n_routed_experts (int): Number of routed experts for MoE layers. + n_shared_experts (int): Number of shared experts for MoE layers. + n_activated_experts (int): Number of activated experts in MoE layers. + n_expert_groups (int): Number of expert groups. + n_limited_groups (int): Number of limited groups for MoE routing. + score_func (Literal["softmax", "sigmoid"]): Scoring function for MoE routing. + route_scale (float): Scaling factor for routing scores. + use_grouped_mm (bool): Whether to use grouped matrix multiplication for MoE layers. + load_balance_coeff (float | None): Auxiliary-Loss-Free Load balancing coefficient for MoE layers. + q_lora_rank (int): LoRA rank for query projections. + kv_lora_rank (int): LoRA rank for key-value projections. + qk_nope_head_dim (int): Dimension for query-key projections without positional embeddings. + qk_rope_head_dim (int): Dimension for query-key projections with rotary embeddings. + v_head_dim (int): Dimension for value projections. + original_seq_len (int): Original sequence length. + rope_theta (float): Base for rotary positional encoding. + rope_factor (float): Scaling factor for extended sequence lengths. + beta_fast (int): Fast beta correction factor. + beta_slow (int): Slow beta correction factor. + mscale (float): Scaling factor for extended attention. + """ + + max_batch_size: int = 8 + max_seq_len: int = 4096 * 4 + dtype: Literal["bf16", "fp8"] = "bf16" + vocab_size: int = 102400 + dim: int = 2048 + inter_dim: int = 10944 + moe_inter_dim: int = 1408 + n_layers: int = 27 + n_dense_layers: int = 1 + n_heads: int = 16 + norm_eps: float = 1e-5 # eps used for RMSNorm + # MoE + n_routed_experts: int = 64 + n_shared_experts: int = 2 + n_activated_experts: int = 6 + n_expert_groups: int = 1 + n_limited_groups: int = 1 + score_func: Literal["softmax", "sigmoid"] = "softmax" + route_scale: float = 1.0 + use_grouped_mm: bool = False + load_balance_coeff: float | None = 1e-3 + # Multi-Head Latent Attention (MLA) + q_lora_rank: int = 0 + kv_lora_rank: int = 512 + qk_nope_head_dim: int = 128 + qk_rope_head_dim: int = 64 + v_head_dim: int = 128 + use_flex_attn: bool = False + attn_mask_type: str = "causal" + # yarn + original_seq_len: int = 4096 + rope_theta: float = 10000.0 + rope_factor: float = 40 + beta_fast: int = 32 + beta_slow: int = 1 + mscale: float = 1.0 + + def update_from_config(self, job_config: JobConfig, tokenizer: Tokenizer) -> None: + """ + TODO: Placeholder for now + """ + pass + + def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, int]: + """ + TODO: Placeholder for now + """ + return 0, 0 diff --git a/torchtitan/models/deepseek-v3/model/model.py b/torchtitan/models/deepseek-v3/model/model.py new file mode 100644 index 000000000..dd6c44319 --- /dev/null +++ b/torchtitan/models/deepseek-v3/model/model.py @@ -0,0 +1,337 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import math +from typing import Tuple + +import torch +import torch.nn.functional as F +from torch import nn +from torchtitan.models.attention import build_attention +from torchtitan.protocols.train_spec import ModelProtocol + +from .args import DeepseekV3ModelArgs +from .moe import MoE + + +# Adopted from https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py#L294 +def precompute_freqs_cis(args: DeepseekV3ModelArgs) -> torch.Tensor: + """ + Precomputes frequency-based complex exponential values for rotary positional embeddings. + + Args: + args (DeepseekV3ModelArgs): Model arguments containing positional embedding parameters. + + Returns: + torch.Tensor: Precomputed complex exponential values for positional embeddings. + """ + dim = args.qk_rope_head_dim + seqlen = args.max_seq_len + beta_fast = args.beta_fast + beta_slow = args.beta_slow + base = args.rope_theta + factor = args.rope_factor + + def find_correction_dim( + num_rotations: float, dim: int, base: float, max_seq_len: int + ) -> float: + """ + Computes the correction dimension for a given number of rotations in the rotary positional embedding. + + Args: + num_rotations (float): Number of rotations to compute the correction for. + dim (int): Dimensionality of the embedding space. + base (float): Base value for the exponential computation. + max_seq_len (int): Maximum sequence length. + + Returns: + float: The correction dimension based on the input parameters. + """ + return ( + dim + * math.log(max_seq_len / (num_rotations * 2 * math.pi)) + / (2 * math.log(base)) + ) + + def find_correction_range( + low_rot: float, high_rot: float, dim: int, base: float, max_seq_len: int + ) -> Tuple[int, int]: + """ + Computes the range of correction dimensions for rotary positional embeddings. + + Args: + low_rot (float): Lower bound for the number of rotations. + high_rot (float): Upper bound for the number of rotations. + dim (int): Dimensionality of the embedding space. + base (float): Base value for the exponential computation. + max_seq_len (int): Maximum sequence length. + + Returns: + Tuple[int, int]: The range of correction dimensions (low, high), clamped to valid indices. + """ + low = math.floor(find_correction_dim(low_rot, dim, base, max_seq_len)) + high = math.ceil(find_correction_dim(high_rot, dim, base, max_seq_len)) + return max(low, 0), min(high, dim - 1) + + def linear_ramp_factor(min: float, max: float, dim: int) -> torch.Tensor: + """ + Computes a linear ramp function used to smooth values between a minimum and maximum range. + + Args: + min (float): Minimum value for the ramp function. + max (float): Maximum value for the ramp function. + dim (int): Dimensionality of the ramp tensor. + + Returns: + torch.Tensor: A tensor of shape (dim,) with values linearly interpolated between 0 and 1, + clamped to the range [0, 1]. + """ + if min == max: + max += 0.001 + linear_func = (torch.arange(dim, dtype=torch.float32) - min) / (max - min) + ramp_func = torch.clamp(linear_func, 0, 1) + return ramp_func + + # Basic RoPE frequency calculation + freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim)) + + # YaRN scaling for extended context + if seqlen > args.original_seq_len: + low, high = find_correction_range( + beta_fast, beta_slow, dim, base, args.original_seq_len + ) + smooth = 1 - linear_ramp_factor(low, high, dim // 2) + freqs = freqs / factor * (1 - smooth) + freqs * smooth + + t = torch.arange(seqlen) + freqs = torch.outer(t, freqs) + freqs_cis = torch.polar(torch.ones_like(freqs), freqs) + return freqs_cis + + +def apply_rotary_emb(x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor: + """ + Applies rotary positional embeddings to the input tensor. + + Args: + x (torch.Tensor): Input tensor with positional embeddings to be applied. + freqs_cis (torch.Tensor): Precomputed complex exponential values for positional embeddings. + + Returns: + torch.Tensor: Tensor with rotary embeddings applied. + """ + dtype = x.dtype + x = torch.view_as_complex(x.float().view(*x.shape[:-1], -1, 2)) + freqs_cis = freqs_cis.view(1, x.size(1), 1, x.size(-1)) + y = torch.view_as_real(x * freqs_cis).flatten(3) + return y.to(dtype) + + +class Attention(nn.Module): + """ + Multi-head attention (MLA) module. + """ + + def __init__(self, model_args: DeepseekV3ModelArgs): + super().__init__() + self.dim = model_args.dim + self.n_heads = model_args.n_heads + self.q_lora_rank = model_args.q_lora_rank + self.kv_lora_rank = model_args.kv_lora_rank + self.qk_nope_head_dim = model_args.qk_nope_head_dim + self.qk_rope_head_dim = model_args.qk_rope_head_dim + self.qk_head_dim = model_args.qk_nope_head_dim + model_args.qk_rope_head_dim + self.v_head_dim = model_args.v_head_dim + + if self.q_lora_rank == 0: + self.wq = nn.Linear(self.dim, self.n_heads * self.qk_head_dim) + else: + self.wq_a = nn.Linear(self.dim, self.q_lora_rank) + self.q_norm = nn.RMSNorm(self.q_lora_rank, eps=model_args.norm_eps) + self.wq_b = nn.Linear(self.q_lora_rank, self.n_heads * self.qk_head_dim) + self.wkv_a = nn.Linear(self.dim, self.kv_lora_rank + self.qk_rope_head_dim) + self.kv_norm = nn.RMSNorm(self.kv_lora_rank, eps=model_args.norm_eps) + self.wkv_b = nn.Linear( + self.kv_lora_rank, self.n_heads * (self.qk_nope_head_dim + self.v_head_dim) + ) + self.wo = nn.Linear(self.n_heads * self.v_head_dim, self.dim) + self.softmax_scale = self.qk_head_dim**-0.5 + + if model_args.max_seq_len > model_args.original_seq_len: + mscale = 0.1 * model_args.mscale * math.log(model_args.rope_factor) + 1.0 + self.softmax_scale = self.softmax_scale * mscale * mscale + + self.sdpa = build_attention(model_args.use_flex_attn, model_args.attn_mask_type) + + def forward( + self, + x: torch.Tensor, + freqs_cis: torch.Tensor, + ): + """ + Forward pass for the Multi-Head Latent Attention (MLA) Layer. + + Args: + x (torch.Tensor): Input tensor of shape (batch_size, seq_len, dim). + freqs_cis (torch.Tensor): Precomputed complex exponential values for rotary embeddings. + + Returns: + torch.Tensor: Output tensor with the same shape as the input. + """ + bsz, seqlen, _ = x.size() + + # Query projection + if self.q_lora_rank == 0: + q = self.wq(x) # (bsz, seqlen, n_heads * qk_head_dim) + else: + q = self.wq_b(self.q_norm(self.wq_a(x))) + + q = q.view(bsz, seqlen, self.n_heads, self.qk_head_dim) + q_nope, q_pe = torch.split( + q, [self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1 + ) + q_pe = apply_rotary_emb(q_pe, freqs_cis) + q = torch.cat([q_nope, q_pe], dim=-1) # (bsz, seqlen, n_heads, qk_head_dim) + + # Key-value projection + kv = self.wkv_a(x) # (bsz, seqlen, kv_lora_rank + qk_rope_head_dim) + kv, k_pe = torch.split(kv, [self.kv_lora_rank, self.qk_rope_head_dim], dim=-1) + k_pe = apply_rotary_emb( + k_pe.unsqueeze(2), freqs_cis + ) # (bsz, seqlen, 1, qk_rope_head_dim) + + kv = self.wkv_b( + self.kv_norm(kv) + ) # (bsz, seqlen, n_heads * (qk_nope_head_dim + v_head_dim)) + kv = kv.view(bsz, seqlen, self.n_heads, self.qk_nope_head_dim + self.v_head_dim) + k_nope, v = torch.split(kv, [self.qk_nope_head_dim, self.v_head_dim], dim=-1) + k = torch.cat( + [k_nope, k_pe.expand(-1, -1, self.n_heads, -1)], dim=-1 + ) # (bsz, seqlen, n_heads, qk_head_dim) + + # TODO: Need to pass softmax_scale to sdpa() interface. + # For mask, DeepseekV3 uses causal mask, so we can use the default mask in sdpa + # https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py#L17 + output = self.sdpa(q, k, v) + + # Reshape and project output + output = output.transpose(1, 2) # (bsz, seqlen, n_heads, v_head_dim) + output = output.view(bsz, seqlen, -1) # (bsz, seqlen, n_heads * v_head_dim) + return self.wo(output) # (bsz, seqlen, dim) + + +class FeedForward(nn.Module): + """ + FeedForward module + + Args: + dim (int): Input dimension. + hidden_dim (int): Hidden dimension of the feedforward layer. + multiple_of (int): Value to ensure hidden dimension is a multiple of this value. + ffn_dim_multiplier (float | None): Custom multiplier for hidden dimension. Defaults to None. + + Attributes: + w1 (Linear): Linear transformation for the first layer. + w2 (Linear): Linear transformation for the second layer. + w3 (Linear): Linear transformation for the third layer. + + """ + + def __init__( + self, + dim: int, + hidden_dim: int, + ): + super().__init__() + self.w1 = nn.Linear(dim, hidden_dim, bias=False) + self.w2 = nn.Linear(hidden_dim, dim, bias=False) + self.w3 = nn.Linear(dim, hidden_dim, bias=False) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + return self.w2(F.silu(self.w1(x)) * self.w3(x)) + + def init_weights(self, init_std: float): + nn.init.trunc_normal_(self.w1.weight, mean=0.0, std=0.02) + for linear in (self.w2, self.w3): + nn.init.trunc_normal_(linear.weight, mean=0.0, std=init_std) + + +class TransformerBlock(nn.Module): + """ + Transformer block with attention and feed-forward layers. + """ + + def __init__(self, layer_id: int, model_args: DeepseekV3ModelArgs): + + super().__init__() + self.attention = Attention(model_args) + self.attention_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) + self.ffn_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) + self.ffn = ( + FeedForward(model_args.dim, model_args.inter_dim) + if layer_id < model_args.n_dense_layers + else MoE(model_args) + ) + + def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor): + """ + Forward pass for the Transformer block. + + Args: + x (torch.Tensor): Input tensor of shape (batch_size, seq_len, dim). + freqs_cis (torch.Tensor): Precomputed complex exponential values for rotary embeddings. + + Returns: + torch.Tensor: Output tensor with the same shape as the input. + """ + x = x + self.attention(self.attention_norm(x), freqs_cis) + x = x + self.ffn(self.ffn_norm(x)) + return x + + +class Transformer(nn.Module, ModelProtocol): + """ + Deepseek-V3 Transformer model with attention and feed-forward layers. + """ + + def __init__(self, model_args: DeepseekV3ModelArgs): + super().__init__() + self.max_seq_len = model_args.max_seq_len + self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim) + self.register_buffer( + "freqs_cis", precompute_freqs_cis(model_args), persistent=False + ) + + self.layers = torch.nn.ModuleList() + for layer_id in range(model_args.n_layers): + self.layers.append( + TransformerBlock(layer_id=layer_id, model_args=model_args) + ) + self.norm = nn.RMSNorm(model_args.dim) + self.output = nn.Linear( + model_args.dim, model_args.vocab_size, dtype=torch.get_default_dtype() + ) + self.init_weights() + + def forward(self, tokens: torch.Tensor): + """ + Forward pass for the Transformer model. + + Args: + tokens (torch.Tensor): Input tensor of token IDs with shape (batch_size, seq_len). + + Returns: + torch.Tensor: Logits tensor of shape (batch_size, vocab_size). + """ + h = self.tok_embeddings(tokens) + for layer in self.layers: + h = layer(h, self.freqs_cis) + h = self.norm(h)[:, -1] + output = self.output(h) + return output + + def init_weights(self, buffer_device: torch.device | None = None) -> None: + pass diff --git a/torchtitan/models/deepseek-v3/model/moe.py b/torchtitan/models/deepseek-v3/model/moe.py new file mode 100644 index 000000000..b224b1097 --- /dev/null +++ b/torchtitan/models/deepseek-v3/model/moe.py @@ -0,0 +1,339 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import torch +import torch.nn.functional as F +from torch import nn + +from .args import DeepseekV3ModelArgs + + +# Reference: torchtitan/experiments/llama4/model/ +class GroupedExperts(nn.Module): + def __init__( + self, + dim: int, + hidden_dim: int, + num_experts: int, + use_grouped_mm: bool, + ): + super().__init__() + self.num_experts = num_experts + self.w1 = nn.Parameter(torch.empty(num_experts, dim, hidden_dim)) + self.w2 = nn.Parameter(torch.empty(num_experts, hidden_dim, dim)) + self.w3 = nn.Parameter(torch.empty(num_experts, dim, hidden_dim)) + self.use_grouped_mm = use_grouped_mm + + def forward( + self, + x: torch.Tensor, + num_local_tokens_per_expert: torch.Tensor | list[int] | None = None, + ) -> torch.Tensor: + # TODO: keeping this for loop implementation for comparison + # and readability, will remove later + if not self.use_grouped_mm: + if num_local_tokens_per_expert is not None: + # a tuple of tensors indexed by experts + # each with shape (tokens_per_expert(varying), dim) + x = torch.split( + x, + split_size_or_sections=num_local_tokens_per_expert, + dim=0, + ) + out_experts_splits = [] + for expert_idx, x_expert in enumerate(x): + w1, w2, w3 = ( + self.w1[expert_idx], + self.w2[expert_idx], + self.w3[expert_idx], + ) + h = F.silu(torch.matmul(x_expert, w1)) + h = h * torch.matmul(x_expert, w3) + h = torch.matmul(h, w2) + # h shape (tokens_per_expert(varying), dim) + out_experts_splits.append(h) + out = torch.cat(out_experts_splits, dim=0) + else: + # x shape (num_experts, tokens_per_expert, dim) + h = F.silu(torch.bmm(x, self.w1)) + h = h * torch.bmm(x, self.w3) + # out shape (num_experts, tokens_per_expert, dim) + out = torch.bmm(h, self.w2) + + return out + + # grouped mm implementation + if num_local_tokens_per_expert is not None: + # https://github.com/pytorch/pytorch/pull/150374 + # NOTE: torch._gouped_mm requires bf16 dtypes + # and shapes to be multiple of 8 + offsets = torch.cumsum( + num_local_tokens_per_expert, dim=0, dtype=torch.int32 + ) + # grouped mm between a 2D tensor and a 3D tensor + assert x.dim() == 2 + else: + offsets = None + # fall back to regular bmm between 3D tensors + assert x.dim() == 3 + + assert ( + x.dtype == self.w1.dtype == self.w2.dtype == self.w3.dtype == torch.bfloat16 + ), "torch._grouped_mm only supports bf16 dtypes" + h = F.silu(torch._grouped_mm(x, self.w1, offs=offsets)) + h = h * torch._grouped_mm(x, self.w3, offs=offsets) + out = torch._grouped_mm(h, self.w2, offs=offsets) + + return out + + +class TokenChoiceTopKRouter(nn.Module): + """This class implements token-choice routing. In token-choice top-K routing, each token is + routed to top K experts based on the router scores. + + Args: + gate (nn.Module): Gate module to calculate the scores, typically nn.Linear(dim, num_experts). + num_experts (int): Number of experts in each moe layer. + top_k (int): Number of experts each token will be routed to in token-choice routing. + use_sigmoid (bool): Whether to use sigmoid or softmax for router scores. Default is False. + """ + + def __init__( + self, + num_experts: int, + top_k: int, + use_sigmoid: bool = False, + route_sclaing_factor: float = 1.0, + ): + super().__init__() + + self.num_experts = num_experts + self.top_k = top_k + self.use_sigmoid = use_sigmoid + self.route_sclaing_factor = route_sclaing_factor + + self.weight = nn.Parameter( + torch.empty((self.n_routed_experts, self.gating_dim)) + ) + + def forward( + self, x: torch.Tensor, expert_bias: torch.Tensor = None + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + TODO: We haven't implement the group-based routing (node limit routing), + and currently EP is not supporting node limit routing yet. + + Args: + x (torch.Tensor): Input tensor with shape ``(bs*slen, dim)``. + + Returns: + routed_input (torch.Tensor): + Tokens grouped together by experts indices with shape ``(bs*slen*top_k,)``. + token_indices (torch.Tensor): + Token indices for routed_input with shape ``(bs*slen*top_k,)``. + num_local_tokens_per_expert (torch.Tensor): + Number of tokens assigned to each expert with shape ``(num_experts,)``. + """ + # scores shape (bs*slen, num_experts) + scores = F.linear(x.type, self.weight, None) + + # By default, sigmoid or softmax is performed in float32 to avoid loss explosion + if self.use_sigmoid: + scores = torch.sigmoid(scores.to(torch.float32)) + else: + scores = F.softmax(scores.to(torch.float32), dim=1) + + # top scores shape (bs*slen, top_k) + # NOTE: The expert_bias is only used for routing. The gating value + # top_scores is still derived from the original scores. + _, selected_experts_indices = torch.topk( + scores + expert_bias, k=self.top_k, dim=1 + ) + top_scores = scores.gather(dim=1, index=selected_experts_indices) + + # group tokens together by expert indices from 0 to num_experts and pass that to experts forward + num_local_tokens_per_expert = torch.histc( + selected_experts_indices.view(-1), + bins=self.num_experts, + min=0, + max=self.num_experts, + ) + # token_indices_experts_sorted shape (bs*slen*top_k,) + token_indices_experts_sorted = torch.argsort( + selected_experts_indices.view(-1), stable=True + ) + top_scores = top_scores.view(-1)[token_indices_experts_sorted] + token_indices_experts_sorted = token_indices_experts_sorted // self.top_k + + top_scores = ( + top_scores * self.route_sclaing_factor + ) # must multiply the scaling factor + + return top_scores, token_indices_experts_sorted, num_local_tokens_per_expert + + +class MoE(nn.Module): + def __init__(self, model_args: DeepseekV3ModelArgs): + + super().__init__() + dim = model_args.dim + + num_experts = model_args.n_routed_experts + hidden_dim = model_args.moe_inter_dim + top_k = model_args.n_activated_experts + route_scaling_factor = model_args.route_scale + + self.use_grouped_mm = model_args.use_grouped_mm + self.experts = GroupedExperts( + dim=dim, + hidden_dim=hidden_dim, + num_experts=num_experts, + use_grouped_mm=self.use_grouped_mm, + ) + self.router = TokenChoiceTopKRouter( + num_experts=num_experts, + top_k=top_k, + use_sigmoid=model_args.score_func == "sigmoid", + route_sclaing_factor=route_scaling_factor, + ) + self.shared_expert = ( + # Reference: https://huggingface.co/deepseek-ai/DeepSeek-V3-Base/blob/main/modeling_deepseek.py#L517 + GroupedExperts( + dim=dim, + hidden_dim=hidden_dim * model_args.n_shared_experts, + num_experts=1, + use_grouped_mm=self.use_grouped_mm, + ) + if model_args.n_shared_experts > 0 + else None + ) + + # auxiliary-loss-free load balancing + self.load_balance_coeff = model_args.load_balance_coeff + # the fields below are defined even when load_balance_coeff is None + # to make initialization and checkpointing code simpler + self.register_buffer( + "expert_bias", + torch.zeros(num_experts, dtype=torch.float32), + persistent=True, + ) + self.register_buffer( + "tokens_per_expert", + torch.zeros(num_experts, dtype=torch.float32), + persistent=True, + ) + + # NOTE: forward hook, forward pre hook, or backward pre hook + # would conflict with activation checkpointing + if self.load_balance_coeff is not None and self.load_balance_coeff > 0: + self.register_full_backward_hook(self._update_expert_bias) + + # TODO: double check the bias update logic. It aligns with the paper. + def _update_expert_bias(self, *_): + expert_bias_delta = self.load_balance_coeff * torch.sign( + self.tokens_per_expert.mean() - self.tokens_per_expert + ) + expert_bias_delta = expert_bias_delta - expert_bias_delta.mean() + self.expert_bias.add_(expert_bias_delta) + + self.tokens_per_expert.zero_() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Args: + x (torch.Tensor): Input tensor with shape ``(bs, slen, dim)``. + + Returns: + out (torch.Tensor): Output tensor with shape ``(bs, slen, dim)``. + """ + bs, slen, dim = x.shape + + # top_scores and selected_indices shape (bs*slen*top_k,) + # num_local_tokens_per_expert shape (num_experts,) + ( + top_scores, + token_indices, + num_local_tokens_per_expert, + ) = self.router(x.reshape(bs * slen, dim), self.expert_bias) + + # will be used to update the expert bias for load balancing + self.tokens_per_expert += num_local_tokens_per_expert + + # shape (bs*slen*top_k, dim) + token_indices = token_indices.reshape(-1, 1).expand(-1, dim) + + # shape (bs*slen*top_k, dim) + routed_input = torch.gather( + x.view(-1, dim), + dim=0, + index=token_indices, + ) + + if self.use_grouped_mm: + # NOTE: In order to use torch._grouped_mm, we need to make sure + # the number of tokens each expert gets is a multiple of 16. + # The following kernel helps achieve this via padding, without + # incurring synchronization between device and host. + from torchtitan.experiments.kernels.moe.indices import ( + generate_permute_indices, + ) + + ALIGN_SIZE_M = 16 + + with torch.no_grad(): + ( + permuted_indices, + num_local_tokens_per_expert, + _, + ) = generate_permute_indices( + num_local_tokens_per_expert, + self.experts.num_experts, + 1, + ALIGN_SIZE_M, + ) + token_indices = torch.vstack( + (token_indices, token_indices.new_zeros((dim))) + ) + token_indices = token_indices[permuted_indices, :] + routed_input = torch.vstack((routed_input, routed_input.new_zeros((dim)))) + routed_input = routed_input[permuted_indices, :] + else: + # NOTE: this would incur a synchronization between device and host + num_local_tokens_per_expert = num_local_tokens_per_expert.tolist() + + # shape (bs*slen*top_k, dim) + routed_output = self.experts(routed_input, num_local_tokens_per_expert) + routed_output = routed_output * top_scores.unsqueeze(-1) + + # shared expert + if self.shared_expert is not None: + out = self.shared_expert(x.reshape(1, bs * slen, dim)).reshape( + bs * slen, dim + ) + else: + out = torch.zeros_like(x.reshape(bs * slen, dim)) + + out = out.scatter_add(dim=0, index=token_indices, src=routed_output) + out = out.reshape(bs, slen, dim) + return out + + def init_weights( + self, + init_std: float, + buffer_device: torch.device, + ): + self.experts.init_weights(init_std) + self.router.init_weights(init_std) + if self.shared_expert is not None: + self.shared_expert.init_weights(init_std) + + with torch.device(buffer_device): + self.expert_bias = torch.zeros( + self.experts.num_experts, dtype=torch.float32 + ) + self.tokens_per_expert = torch.zeros( + self.experts.num_experts, dtype=torch.float32 + ) From 70f09e59af7c71ec603424690614d8e6ec6442f0 Mon Sep 17 00:00:00 2001 From: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Date: Mon, 23 Jun 2025 10:55:32 -0700 Subject: [PATCH 03/14] [DSV3] Forward and backward pass for single GPU (#1320) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Command to run: `NGPU=1 CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/debug_model.toml" ./run_train.sh` ## Context 1. Added model args for 4 model settings, and training config for debug model 2. Debugged the forward pass, and the backward pass works out of pocket. 3. Reused c4-test dataset, and tiktokenizer from llama3 model for current testing ![Screenshot 2025-06-20 at 11 52 49 AM](https://github.com/user-attachments/assets/81d938a2-9a85-4e8c-b8e1-7f9510d785c2) --- torchtitan/models/__init__.py | 1 + torchtitan/models/deepseek_v3/__init__.py | 125 ++++++++++++++++++ .../models/deepseek_v3/infra/parallelize.py | 23 ++++ .../model/args.py | 63 ++++++++- .../model/model.py | 36 ++--- .../{deepseek-v3 => deepseek_v3}/model/moe.py | 13 +- .../train_configs/debug_model.toml | 69 ++++++++++ 7 files changed, 303 insertions(+), 27 deletions(-) create mode 100644 torchtitan/models/deepseek_v3/__init__.py create mode 100644 torchtitan/models/deepseek_v3/infra/parallelize.py rename torchtitan/models/{deepseek-v3 => deepseek_v3}/model/args.py (62%) rename torchtitan/models/{deepseek-v3 => deepseek_v3}/model/model.py (92%) rename torchtitan/models/{deepseek-v3 => deepseek_v3}/model/moe.py (97%) create mode 100644 torchtitan/models/deepseek_v3/train_configs/debug_model.toml diff --git a/torchtitan/models/__init__.py b/torchtitan/models/__init__.py index fd5aa42c6..378f88665 100644 --- a/torchtitan/models/__init__.py +++ b/torchtitan/models/__init__.py @@ -7,4 +7,5 @@ # Import the built-in models here so that the corresponding register_model_spec() # will be called. +import torchtitan.models.deepseek_v3 # noqa: F401 import torchtitan.models.llama3 # noqa: F401 diff --git a/torchtitan/models/deepseek_v3/__init__.py b/torchtitan/models/deepseek_v3/__init__.py new file mode 100644 index 000000000..8a21e53dd --- /dev/null +++ b/torchtitan/models/deepseek_v3/__init__.py @@ -0,0 +1,125 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# Copyright (c) Meta Platforms, Inc. All Rights Reserved. + +from torchtitan.components.loss import build_cross_entropy_loss +from torchtitan.components.lr_scheduler import build_lr_schedulers +from torchtitan.components.optimizer import build_optimizers +from torchtitan.datasets.hf_datasets import build_hf_dataloader +from torchtitan.datasets.tokenizer.tiktoken import build_tiktoken_tokenizer +from torchtitan.protocols.train_spec import register_train_spec, TrainSpec + +from .infra.parallelize import parallelize_deepseekv3 +from .model.args import DeepSeekV3ModelArgs +from .model.model import DeepSeekV3Model + +__all__ = [ + "parallelize_deepseekv3", + "DeepseekV3ModelArgs", + "DeepseekV3Model", + "deepseekv3_configs", +] + + +deepseekv3_configs = { + "debugmodel": DeepSeekV3ModelArgs( + vocab_size=102400, + dim=256, + inter_dim=10944, + moe_inter_dim=1408, + n_layers=3, + n_dense_layers=1, + n_heads=16, + n_routed_experts=8, + n_shared_experts=2, + n_activated_experts=3, + route_scale=1.0, + q_lora_rank=0, + kv_lora_rank=512, + qk_nope_head_dim=128, + qk_rope_head_dim=64, + v_head_dim=128, + mscale=0.70, + ), + "16B": DeepSeekV3ModelArgs( + vocab_size=102400, + dim=2048, + inter_dim=10944, + moe_inter_dim=1408, + n_layers=27, + n_dense_layers=1, + n_heads=16, + n_routed_experts=64, + n_shared_experts=2, + n_activated_experts=6, + route_scale=1.0, + q_lora_rank=0, + kv_lora_rank=512, + qk_nope_head_dim=128, + qk_rope_head_dim=64, + v_head_dim=128, + mscale=0.70, + ), + "236B": DeepSeekV3ModelArgs( + vocab_size=102400, + dim=5120, + inter_dim=12288, + moe_inter_dim=1536, + n_layers=60, + n_dense_layers=1, + n_heads=128, + n_routed_experts=160, + n_shared_experts=2, + n_activated_experts=6, + n_expert_groups=8, + n_limited_groups=3, + route_scale=16.0, + q_lora_rank=1536, + kv_lora_rank=512, + qk_nope_head_dim=128, + qk_rope_head_dim=64, + v_head_dim=128, + ), + "671B": DeepSeekV3ModelArgs( + vocab_size=129280, + dim=7168, + inter_dim=18432, + moe_inter_dim=2048, + n_layers=61, + n_dense_layers=3, + n_heads=128, + n_routed_experts=256, + n_shared_experts=1, + n_activated_experts=8, + n_expert_groups=8, + n_limited_groups=4, + route_scale=2.5, + score_func="sigmoid", + q_lora_rank=1536, + kv_lora_rank=512, + qk_nope_head_dim=128, + qk_rope_head_dim=64, + v_head_dim=128, + dtype="fp8", + ), +} + + +register_train_spec( + TrainSpec( + name="deepseek_v3", + cls=DeepSeekV3Model, + config=deepseekv3_configs, + parallelize_fn=parallelize_deepseekv3, + pipelining_fn=None, + build_optimizers_fn=build_optimizers, + build_lr_schedulers_fn=build_lr_schedulers, + build_dataloader_fn=build_hf_dataloader, + build_tokenizer_fn=build_tiktoken_tokenizer, + build_loss_fn=build_cross_entropy_loss, + ) +) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py new file mode 100644 index 000000000..f8090683c --- /dev/null +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -0,0 +1,23 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + + +import torch.nn as nn + +from torch.distributed.device_mesh import DeviceMesh + +from torchtitan.config_manager import JobConfig +from torchtitan.distributed import ParallelDims + + +def parallelize_deepseekv3( + model: nn.Module, + world_mesh: DeviceMesh, + parallel_dims: ParallelDims, + job_config: JobConfig, +): + # TODO: Add support for parallelizing the model, this is a placeholder function for now + return model diff --git a/torchtitan/models/deepseek-v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py similarity index 62% rename from torchtitan/models/deepseek-v3/model/args.py rename to torchtitan/models/deepseek_v3/model/args.py index 845d6b83e..c0134bf54 100644 --- a/torchtitan/models/deepseek-v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -15,11 +15,12 @@ from torchtitan.components.tokenizer import Tokenizer from torchtitan.config_manager import JobConfig from torchtitan.protocols.train_spec import BaseModelArgs +from torchtitan.tools.logging import logger # Reference: https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py @dataclass -class DeepseekV3ModelArgs(BaseModelArgs): +class DeepSeekV3ModelArgs(BaseModelArgs): """ Data class for defining model arguments and hyperparameters. @@ -53,7 +54,6 @@ class DeepseekV3ModelArgs(BaseModelArgs): rope_factor (float): Scaling factor for extended sequence lengths. beta_fast (int): Fast beta correction factor. beta_slow (int): Slow beta correction factor. - mscale (float): Scaling factor for extended attention. """ max_batch_size: int = 8 @@ -95,12 +95,63 @@ class DeepseekV3ModelArgs(BaseModelArgs): def update_from_config(self, job_config: JobConfig, tokenizer: Tokenizer) -> None: """ - TODO: Placeholder for now + Update the model_config config from the given job config. """ - pass + self.vocab_size = tokenizer.n_words + self.max_seq_len = job_config.training.seq_len def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, int]: """ - TODO: Placeholder for now + Adopted from llama4 implementation. """ - return 0, 0 + nparams_embedding = 0 + nparams_moe_router = 0 + nparams_shared_expert = 0 + nparams_experts = 0 + nparams_dense = 0 + + for name, p in model.named_parameters(): + print(name) + if "embedding" in name: + nparams_embedding += p.numel() + nparams_dense += p.numel() + elif "moe.shared_expert" in name: + nparams_shared_expert += p.numel() + elif "moe.router" in name: + nparams_moe_router += p.numel() + elif "moe.experts" in name: + nparams_experts += p.numel() + else: + nparams_dense += p.numel() + + nparams_sparse = nparams_moe_router + nparams_shared_expert + nparams_experts + nparams = nparams_dense + nparams_sparse + nparams_sparse_active = ( + nparams_moe_router + + nparams_shared_expert + + nparams_experts * self.n_activated_experts // self.n_routed_experts + ) + + logger.info( + f"Total parameter count: dense {nparams_dense:,}, " + f"sparse {nparams_sparse:,}, active {nparams_dense + nparams_sparse_active:,}" + ) + + l, h, q, t = ( + self.n_layers, + self.n_heads, + self.dim // self.n_heads, + seq_len, + ) + # Reasoning behind the factor of 12 for the self-attention part of the formula: + # 1. each self-attention has 2 matmul in the forward and 4 in the backward (6) + # 2. the flash attention does 1 more matmul recomputation in the backward + # but recomputation should not be counted in calculating MFU (+0) + # 3. each matmul performs 1 multiplication and 1 addition (*2) + # 4. we follow the convention and do not account for sparsity in causal attention + num_flops_per_token = ( + 6 * (nparams_dense - nparams_embedding + nparams_sparse_active) + + 12 * l * h * q * t + ) + + return nparams, num_flops_per_token diff --git a/torchtitan/models/deepseek-v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py similarity index 92% rename from torchtitan/models/deepseek-v3/model/model.py rename to torchtitan/models/deepseek_v3/model/model.py index dd6c44319..c5ee02327 100644 --- a/torchtitan/models/deepseek-v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -13,17 +13,17 @@ from torchtitan.models.attention import build_attention from torchtitan.protocols.train_spec import ModelProtocol -from .args import DeepseekV3ModelArgs +from .args import DeepSeekV3ModelArgs from .moe import MoE -# Adopted from https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py#L294 -def precompute_freqs_cis(args: DeepseekV3ModelArgs) -> torch.Tensor: +# Adapted from https://github.com/DeepSeek-ai/DeepSeek-V3/blob/main/inference/model.py#L294 +def precompute_freqs_cis(args: DeepSeekV3ModelArgs) -> torch.Tensor: """ Precomputes frequency-based complex exponential values for rotary positional embeddings. Args: - args (DeepseekV3ModelArgs): Model arguments containing positional embedding parameters. + args (DeepSeekV3ModelArgs): Model arguments containing positional embedding parameters. Returns: torch.Tensor: Precomputed complex exponential values for positional embeddings. @@ -98,7 +98,7 @@ def linear_ramp_factor(min: float, max: float, dim: int) -> torch.Tensor: # Basic RoPE frequency calculation freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim)) - # YaRN scaling for extended context + # YaRN scaling for extended context. YaRN is used to extend the context length after pre-training. if seqlen > args.original_seq_len: low, high = find_correction_range( beta_fast, beta_slow, dim, base, args.original_seq_len @@ -106,8 +106,13 @@ def linear_ramp_factor(min: float, max: float, dim: int) -> torch.Tensor: smooth = 1 - linear_ramp_factor(low, high, dim // 2) freqs = freqs / factor * (1 - smooth) + freqs * smooth + # Create position indices t = torch.arange(seqlen) + + # Outer product: [positions] × [frequencies] freqs = torch.outer(t, freqs) + + # Convert to complex exponentials: e^(i*freq*pos) freqs_cis = torch.polar(torch.ones_like(freqs), freqs) return freqs_cis @@ -135,7 +140,7 @@ class Attention(nn.Module): Multi-head attention (MLA) module. """ - def __init__(self, model_args: DeepseekV3ModelArgs): + def __init__(self, model_args: DeepSeekV3ModelArgs): super().__init__() self.dim = model_args.dim self.n_heads = model_args.n_heads @@ -264,13 +269,13 @@ class TransformerBlock(nn.Module): Transformer block with attention and feed-forward layers. """ - def __init__(self, layer_id: int, model_args: DeepseekV3ModelArgs): + def __init__(self, layer_id: int, model_args: DeepSeekV3ModelArgs): super().__init__() self.attention = Attention(model_args) self.attention_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) - self.ffn_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) - self.ffn = ( + self.moe_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) + self.moe = ( FeedForward(model_args.dim, model_args.inter_dim) if layer_id < model_args.n_dense_layers else MoE(model_args) @@ -288,16 +293,16 @@ def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor): torch.Tensor: Output tensor with the same shape as the input. """ x = x + self.attention(self.attention_norm(x), freqs_cis) - x = x + self.ffn(self.ffn_norm(x)) + x = x + self.moe(self.moe_norm(x)) return x -class Transformer(nn.Module, ModelProtocol): +class DeepSeekV3Model(nn.Module, ModelProtocol): """ - Deepseek-V3 Transformer model with attention and feed-forward layers. + DeepSeek-V3 Transformer model with attention and feed-forward layers. """ - def __init__(self, model_args: DeepseekV3ModelArgs): + def __init__(self, model_args: DeepSeekV3ModelArgs): super().__init__() self.max_seq_len = model_args.max_seq_len self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim) @@ -327,10 +332,11 @@ def forward(self, tokens: torch.Tensor): torch.Tensor: Logits tensor of shape (batch_size, vocab_size). """ h = self.tok_embeddings(tokens) + for layer in self.layers: h = layer(h, self.freqs_cis) - h = self.norm(h)[:, -1] - output = self.output(h) + h = self.norm(h) + output = self.output(h) # (batch_size, seq_len, dim) return output def init_weights(self, buffer_device: torch.device | None = None) -> None: diff --git a/torchtitan/models/deepseek-v3/model/moe.py b/torchtitan/models/deepseek_v3/model/moe.py similarity index 97% rename from torchtitan/models/deepseek-v3/model/moe.py rename to torchtitan/models/deepseek_v3/model/moe.py index b224b1097..3e17968e1 100644 --- a/torchtitan/models/deepseek-v3/model/moe.py +++ b/torchtitan/models/deepseek_v3/model/moe.py @@ -8,7 +8,7 @@ import torch.nn.functional as F from torch import nn -from .args import DeepseekV3ModelArgs +from .args import DeepSeekV3ModelArgs # Reference: torchtitan/experiments/llama4/model/ @@ -103,6 +103,7 @@ class TokenChoiceTopKRouter(nn.Module): def __init__( self, + dim: int, num_experts: int, top_k: int, use_sigmoid: bool = False, @@ -110,14 +111,13 @@ def __init__( ): super().__init__() + self.dim = dim self.num_experts = num_experts self.top_k = top_k self.use_sigmoid = use_sigmoid self.route_sclaing_factor = route_sclaing_factor - self.weight = nn.Parameter( - torch.empty((self.n_routed_experts, self.gating_dim)) - ) + self.weight = nn.Parameter(torch.empty((self.num_experts, self.dim))) def forward( self, x: torch.Tensor, expert_bias: torch.Tensor = None @@ -138,7 +138,7 @@ def forward( Number of tokens assigned to each expert with shape ``(num_experts,)``. """ # scores shape (bs*slen, num_experts) - scores = F.linear(x.type, self.weight, None) + scores = F.linear(x, self.weight, bias=None) # By default, sigmoid or softmax is performed in float32 to avoid loss explosion if self.use_sigmoid: @@ -176,7 +176,7 @@ def forward( class MoE(nn.Module): - def __init__(self, model_args: DeepseekV3ModelArgs): + def __init__(self, model_args: DeepSeekV3ModelArgs): super().__init__() dim = model_args.dim @@ -194,6 +194,7 @@ def __init__(self, model_args: DeepseekV3ModelArgs): use_grouped_mm=self.use_grouped_mm, ) self.router = TokenChoiceTopKRouter( + dim=dim, num_experts=num_experts, top_k=top_k, use_sigmoid=model_args.score_func == "sigmoid", diff --git a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml new file mode 100644 index 000000000..eddca8849 --- /dev/null +++ b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml @@ -0,0 +1,69 @@ +# torchtitan Config.toml + +[job] +dump_folder = "./outputs" +description = "DeepSeek-V3 debug training" +print_args = false +use_for_integration_test = true + +[profiling] +enable_profiling = false +save_traces_folder = "profile_trace" +profile_freq = 10 +enable_memory_snapshot = false +save_memory_snapshot_folder = "memory_snapshot" + +[metrics] +log_freq = 1 +disable_color_printing = false +enable_tensorboard = false +save_tb_folder = "tb" +enable_wandb = false + +[model] +name = "deepseek_v3" +flavor = "debugmodel" +# test tokenizer.model, for debug purpose only +tokenizer_path = "./tests/assets/test_tiktoken.model" +# converters = ["float8"] + +[optimizer] +name = "AdamW" +lr = 8e-4 +eps = 1e-8 + +[lr_scheduler] +warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps +decay_ratio = 0.8 # lr scheduler decay ratio, 80% of the train steps +decay_type = "linear" +lr_min = 0.0 + +[training] +local_batch_size = 8 +seq_len = 2048 +max_norm = 1.0 # grad norm clipping +steps = 10 +compile = false +dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) + +[parallelism] +data_parallel_replicate_degree = 1 +data_parallel_shard_degree = -1 +fsdp_reshard_after_forward = "default" # default / never / always + +[checkpoint] +enable_checkpoint = false +folder = "checkpoint" +interval = 10 +last_save_model_weights_only = false +export_dtype = "float32" +async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem"] + +[activation_checkpoint] +mode = "none" # ["none", "selective", "full"] +selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy + +[float8] +enable_fsdp_float8_all_gather = false +precompute_float8_dynamic_scale_for_fsdp = false +filter_fqns = ["output"] From 830e9f4f806ebc041d3de3fb895a8aa9c4d655f0 Mon Sep 17 00:00:00 2001 From: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Date: Tue, 24 Jun 2025 13:18:16 -0700 Subject: [PATCH 04/14] [DSV3] Adding 16B model training config, Enable FSDP and AC on DSV3-16B model (#1330) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Context 1. Introduced a basic DSV3-16B model training config 2. Enabled FSDP/HSDP on DSV3-16B model training ## Performance Current profiler looks like this: The `to_copy` takes to long and needs to be optimized. The copy comes from dtype conversion in class MoE(): ```routed_output = (routed_output.to(torch.float32) * top_scores.unsqueeze(-1)).to(x.dtype)``` With FSDP only: Screenshot 2025-06-23 at 2 10 20 PM --- torchtitan/models/deepseek_v3/__init__.py | 1 + .../models/deepseek_v3/infra/parallelize.py | 35 ++++++++-- torchtitan/models/deepseek_v3/model/args.py | 1 - torchtitan/models/deepseek_v3/model/model.py | 13 ++-- torchtitan/models/deepseek_v3/model/moe.py | 4 +- .../train_configs/deepseek_v3_16b.toml | 67 +++++++++++++++++++ 6 files changed, 110 insertions(+), 11 deletions(-) create mode 100644 torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml diff --git a/torchtitan/models/deepseek_v3/__init__.py b/torchtitan/models/deepseek_v3/__init__.py index 8a21e53dd..7eb16a1f3 100644 --- a/torchtitan/models/deepseek_v3/__init__.py +++ b/torchtitan/models/deepseek_v3/__init__.py @@ -11,6 +11,7 @@ from torchtitan.components.optimizer import build_optimizers from torchtitan.datasets.hf_datasets import build_hf_dataloader from torchtitan.datasets.tokenizer.tiktoken import build_tiktoken_tokenizer + from torchtitan.protocols.train_spec import register_train_spec, TrainSpec from .infra.parallelize import parallelize_deepseekv3 diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index f8090683c..99338663f 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -4,13 +4,13 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. - import torch.nn as nn - from torch.distributed.device_mesh import DeviceMesh -from torchtitan.config_manager import JobConfig +from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP from torchtitan.distributed import ParallelDims +from torchtitan.models.llama3.infra.parallelize import apply_ac, apply_fsdp +from torchtitan.tools.logging import logger def parallelize_deepseekv3( @@ -19,5 +19,32 @@ def parallelize_deepseekv3( parallel_dims: ParallelDims, job_config: JobConfig, ): - # TODO: Add support for parallelizing the model, this is a placeholder function for now + if job_config.activation_checkpoint.mode != "none": + apply_ac(model, job_config.activation_checkpoint) + + dp_mesh: DeviceMesh | None = None + if ( + parallel_dims.dp_shard_enabled + ): # apply FSDP or HSDP, potentially with Context Parallel + if parallel_dims.dp_replicate_enabled: + dp_mesh_dim_names = ("dp_replicate", "dp_shard") + else: + dp_mesh_dim_names = ("dp_shard",) + dp_mesh = world_mesh[tuple(dp_mesh_dim_names)] + + apply_fsdp( + model, + dp_mesh, + param_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_param], + reduce_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_reduce], + pp_enabled=parallel_dims.pp_enabled, + cpu_offload=job_config.training.enable_cpu_offload, + reshard_after_forward_policy=job_config.parallelism.fsdp_reshard_after_forward, + ) + + if parallel_dims.dp_replicate_enabled: + logger.info("Applied HSDP to the model") + else: + logger.info("Applied FSDP to the model") + return model diff --git a/torchtitan/models/deepseek_v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py index c0134bf54..09e882764 100644 --- a/torchtitan/models/deepseek_v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -111,7 +111,6 @@ def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, in nparams_dense = 0 for name, p in model.named_parameters(): - print(name) if "embedding" in name: nparams_embedding += p.numel() nparams_dense += p.numel() diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index c5ee02327..3eb0f2fbc 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -217,6 +217,10 @@ def forward( [k_nope, k_pe.expand(-1, -1, self.n_heads, -1)], dim=-1 ) # (bsz, seqlen, n_heads, qk_head_dim) + q = q.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) + k = k.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) + v = v.transpose(1, 2) # (bsz, n_heads, seqlen, v_head_dim) + # TODO: Need to pass softmax_scale to sdpa() interface. # For mask, DeepseekV3 uses causal mask, so we can use the default mask in sdpa # https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py#L17 @@ -310,11 +314,10 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): "freqs_cis", precompute_freqs_cis(model_args), persistent=False ) - self.layers = torch.nn.ModuleList() + self.layers = torch.nn.ModuleDict() for layer_id in range(model_args.n_layers): - self.layers.append( - TransformerBlock(layer_id=layer_id, model_args=model_args) - ) + self.layers[str(layer_id)] = TransformerBlock(layer_id, model_args) + self.norm = nn.RMSNorm(model_args.dim) self.output = nn.Linear( model_args.dim, model_args.vocab_size, dtype=torch.get_default_dtype() @@ -333,7 +336,7 @@ def forward(self, tokens: torch.Tensor): """ h = self.tok_embeddings(tokens) - for layer in self.layers: + for layer in self.layers.values(): h = layer(h, self.freqs_cis) h = self.norm(h) output = self.output(h) # (batch_size, seq_len, dim) diff --git a/torchtitan/models/deepseek_v3/model/moe.py b/torchtitan/models/deepseek_v3/model/moe.py index 3e17968e1..c9217c8be 100644 --- a/torchtitan/models/deepseek_v3/model/moe.py +++ b/torchtitan/models/deepseek_v3/model/moe.py @@ -307,7 +307,9 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: # shape (bs*slen*top_k, dim) routed_output = self.experts(routed_input, num_local_tokens_per_expert) - routed_output = routed_output * top_scores.unsqueeze(-1) + routed_output = (routed_output.to(torch.float32) * top_scores.unsqueeze(-1)).to( + x.dtype + ) # shared expert if self.shared_expert is not None: diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml new file mode 100644 index 000000000..4f08fb098 --- /dev/null +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -0,0 +1,67 @@ +# torchtitan Config.toml + +[job] +dump_folder = "./outputs" +description = "DeepSeek-V3 16B model training" +print_args = false + +[profiling] +enable_profiling = false +save_traces_folder = "profile_trace" +profile_freq = 10 +enable_memory_snapshot = false +save_memory_snapshot_folder = "memory_snapshot" + +[metrics] +log_freq = 1 +disable_color_printing = false +enable_tensorboard = false +save_tb_folder = "tb" +enable_wandb = false + +[model] +name = "deepseek_v3" +flavor = "16B" +# test tokenizer.model, for debug purpose only +tokenizer_path = "./tests/assets/test_tiktoken.model" +# converters = ["float8"] + +[optimizer] +name = "AdamW" +lr = 8e-4 +eps = 1e-8 + +[lr_scheduler] +warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps +decay_ratio = 0.8 # lr scheduler decay ratio, 80% of the train steps +decay_type = "linear" +lr_min = 0.0 + +[training] +local_batch_size = 32 +seq_len = 2048 +max_norm = 1.0 # grad norm clipping +steps = 10 +compile = false +dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) + +[parallelism] +data_parallel_replicate_degree = 1 +data_parallel_shard_degree = -1 +fsdp_reshard_after_forward = "default" # default / never / always + +[checkpoint] +enable_checkpoint = false +folder = "checkpoint" +interval = 10 +last_save_model_weights_only = false +export_dtype = "float32" +async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem]" + +[activation_checkpoint] +mode = "full" # ["none", "selective", "full"] + +[float8] +enable_fsdp_float8_all_gather = false +precompute_float8_dynamic_scale_for_fsdp = false +filter_fqns = ["output"] From 72109b93a3a41e055263ae5e268c16dee4c20d40 Mon Sep 17 00:00:00 2001 From: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Date: Wed, 2 Jul 2025 14:18:26 -0700 Subject: [PATCH 05/14] [DSV3] Apply TP on DSV3 (#1341) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mostly adapted from llama4, change the TP plan based on the difference between deepseek-v3 and llama. Thanks @tianyu-l for the detailed walk through about deepseek-v3 attention model and TP plan! This diff is currently based on #1324 , and we want to extract the MoE model in DSV3 and llama4 in a shared place. Now we have: 1. FSDP 2. Activation Checkpointing 3. TP 4. CP in progress (hang due to some reason) 1. Make CP work There are minor issue with the numerical verification: With deterministic seed, the loss is not identical. I used `AdamW` optimizer. 1. FSDP degree=4 (blue line) 2. FSDP degree=4, TP degree = 2 (orange line) Screenshot 2025-07-01 at 5 38 50 PM With `Adam` optimizer, the loss is **exactly the same**: Screenshot 2025-07-02 at 1 26 32 PM --------- Co-authored-by: Tianyu Liu --- .../llama4/train_configs/debug_model.toml | 3 +- torchtitan/models/deepseek_v3/README.md | 6 + torchtitan/models/deepseek_v3/__init__.py | 8 +- .../models/deepseek_v3/infra/parallelize.py | 127 ++++++++ torchtitan/models/deepseek_v3/model/args.py | 6 +- torchtitan/models/deepseek_v3/model/model.py | 149 +++++---- torchtitan/models/deepseek_v3/model/moe.py | 301 ++++++++++-------- .../train_configs/debug_model.toml | 4 +- .../train_configs/deepseek_v3_16b.toml | 10 +- torchtitan/train.py | 2 +- 10 files changed, 408 insertions(+), 208 deletions(-) create mode 100644 torchtitan/models/deepseek_v3/README.md diff --git a/torchtitan/experiments/llama4/train_configs/debug_model.toml b/torchtitan/experiments/llama4/train_configs/debug_model.toml index d72406d8c..c805c1326 100644 --- a/torchtitan/experiments/llama4/train_configs/debug_model.toml +++ b/torchtitan/experiments/llama4/train_configs/debug_model.toml @@ -26,7 +26,8 @@ tokenizer_path = "./tests/assets/tokenizer" # converters = ["float8"] [optimizer] -name = "AdamW" +# TODO: AdamW has numerical issues when TP is used, need to fix it +name = "Adam" lr = 4e-3 eps = 1e-15 diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md new file mode 100644 index 000000000..e0271bf82 --- /dev/null +++ b/torchtitan/models/deepseek_v3/README.md @@ -0,0 +1,6 @@ +Download tokenizer: + +``` +# DeepSeek tokenizer (automatically downloads tokenizer.json and tokenizer_config.json) +python scripts/download_tokenizer.py --repo_id deepseek-ai/DeepSeek-V3 +``` diff --git a/torchtitan/models/deepseek_v3/__init__.py b/torchtitan/models/deepseek_v3/__init__.py index 7eb16a1f3..3ab9fc3c2 100644 --- a/torchtitan/models/deepseek_v3/__init__.py +++ b/torchtitan/models/deepseek_v3/__init__.py @@ -8,9 +8,9 @@ from torchtitan.components.loss import build_cross_entropy_loss from torchtitan.components.lr_scheduler import build_lr_schedulers -from torchtitan.components.optimizer import build_optimizers +from torchtitan.components.tokenizer import build_hf_tokenizer from torchtitan.datasets.hf_datasets import build_hf_dataloader -from torchtitan.datasets.tokenizer.tiktoken import build_tiktoken_tokenizer +from torchtitan.experiments.llama4.optimizer import build_llama4_optimizers from torchtitan.protocols.train_spec import register_train_spec, TrainSpec @@ -117,10 +117,10 @@ config=deepseekv3_configs, parallelize_fn=parallelize_deepseekv3, pipelining_fn=None, - build_optimizers_fn=build_optimizers, + build_optimizers_fn=build_llama4_optimizers, # use optimizer hooks to update expert weights build_lr_schedulers_fn=build_lr_schedulers, build_dataloader_fn=build_hf_dataloader, - build_tokenizer_fn=build_tiktoken_tokenizer, + build_tokenizer_fn=build_hf_tokenizer, build_loss_fn=build_cross_entropy_loss, ) ) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 99338663f..d387f1a02 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -6,9 +6,19 @@ import torch.nn as nn from torch.distributed.device_mesh import DeviceMesh +from torch.distributed.tensor import Replicate, Shard +from torch.distributed.tensor.parallel import ( + ColwiseParallel, + parallelize_module, + PrepareModuleInput, + RowwiseParallel, + SequenceParallel, +) from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP from torchtitan.distributed import ParallelDims +from torchtitan.experiments.llama4.infra.expert_parallel import NoParallel +from torchtitan.experiments.llama4.infra.parallelize import apply_moe_ep_tp from torchtitan.models.llama3.infra.parallelize import apply_ac, apply_fsdp from torchtitan.tools.logging import logger @@ -19,6 +29,47 @@ def parallelize_deepseekv3( parallel_dims: ParallelDims, job_config: JobConfig, ): + + if parallel_dims.tp_enabled: + if job_config.parallelism.enable_async_tensor_parallel: + # TODO(jianiw): This branch needs to be tested and enabled + raise NotImplementedError( + "Currently, async TP is not tested for deepseekv3. \ + torch.compile is not supported yet, which is required for async TP." + ) + + enable_float8_linear = "float8" in job_config.model.converters + float8_is_rowwise = job_config.float8.recipe_name in ( + "rowwise", + "rowwise_with_gw_hp", + ) + + enable_float8_tensorwise_tp = enable_float8_linear and not float8_is_rowwise + if enable_float8_tensorwise_tp: + # TODO(jianiw): This branch needs to be tested and enabled + raise NotImplementedError( + "Currently, float8 tensorwise TP is not tested for deepseekv3" + ) + + apply_tp( + model, + world_mesh["tp"], + loss_parallel=parallel_dims.loss_parallel_enabled, + enable_float8_tensorwise_tp=False, + enable_async_tp=False, + ) + + apply_moe_ep_tp( + model, + tp_mesh=world_mesh["tp"] if parallel_dims.tp_enabled else None, + ep_mesh=world_mesh["ep"] if parallel_dims.ep_enabled else None, + ep_tp_mesh=( + world_mesh["ep", "tp"] + if parallel_dims.tp_enabled and parallel_dims.ep_enabled + else None + ), + ) + if job_config.activation_checkpoint.mode != "none": apply_ac(model, job_config.activation_checkpoint) @@ -48,3 +99,79 @@ def parallelize_deepseekv3( logger.info("Applied FSDP to the model") return model + + +def apply_tp( + model: nn.Module, + tp_mesh: DeviceMesh, + loss_parallel: bool, + enable_float8_tensorwise_tp: bool, + enable_async_tp: bool, +): + """Apply tensor parallelism.""" + # 1. Parallelize the embedding and shard its outputs (which are the first + # transformer block's inputs) + # 2. Parallelize the root norm layer over the sequence dim + # 3. Parallelize the final linear output layer + parallelize_module( + model, + tp_mesh, + { + "tok_embeddings": RowwiseParallel( + input_layouts=Replicate(), + output_layouts=Shard(1), + ), + "norm": SequenceParallel(), + "output": ColwiseParallel( + input_layouts=Shard(1), + output_layouts=Shard(-1) if loss_parallel else Replicate(), + use_local_output=not loss_parallel, + ), + }, + ) + + rowwise_parallel, colwise_parallel, prepare_module_input = ( + RowwiseParallel, + ColwiseParallel, + PrepareModuleInput, + ) + + # Apply tensor + sequence parallelism to every transformer block + # NOTE: At the cost of model code change, we can accelerate Sequence Parallel + # by folding (and unfolding) the batch dimension and the sequence dimension. + # Examples can be found at https://github.com/pytorch/torchtitan/pull/437 + for transformer_block in model.layers.values(): + layer_plan = { + "attention_norm": SequenceParallel(), + "attention": prepare_module_input( + input_layouts=(Shard(1), None), + desired_input_layouts=(Replicate(), None), + ), + "attention.wkv_a": NoParallel(), + "attention.wkv_b": colwise_parallel(), + "attention.kv_norm": NoParallel(), + "attention.wq_a": NoParallel(), + "attention.wq_b": colwise_parallel(), + "attention.q_norm": NoParallel(), + "attention.wq": colwise_parallel(), # This is only used when q_lora_rank==0 + "attention.wo": rowwise_parallel(output_layouts=Shard(1)), + "ffn_norm": SequenceParallel(), + "feed_forward": prepare_module_input( + input_layouts=(Shard(1),), + desired_input_layouts=(Replicate(),), + ), + "feed_forward.w1": colwise_parallel(), + "feed_forward.w2": rowwise_parallel(output_layouts=Shard(1)), + "feed_forward.w3": colwise_parallel(), + } + + parallelize_module( + module=transformer_block, + device_mesh=tp_mesh, + parallelize_plan=layer_plan, + ) + + logger.info( + f"Applied {'Float8 tensorwise ' if enable_float8_tensorwise_tp else ''}{'Async ' if enable_async_tp else ''}" + "Tensor Parallelism to the model" + ) diff --git a/torchtitan/models/deepseek_v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py index 09e882764..ea469c672 100644 --- a/torchtitan/models/deepseek_v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -75,8 +75,8 @@ class DeepSeekV3ModelArgs(BaseModelArgs): n_limited_groups: int = 1 score_func: Literal["softmax", "sigmoid"] = "softmax" route_scale: float = 1.0 - use_grouped_mm: bool = False - load_balance_coeff: float | None = 1e-3 + use_grouped_mm: bool = True + load_balance_coeff: float = 1e-3 # Multi-Head Latent Attention (MLA) q_lora_rank: int = 0 kv_lora_rank: int = 512 @@ -97,7 +97,7 @@ def update_from_config(self, job_config: JobConfig, tokenizer: Tokenizer) -> Non """ Update the model_config config from the given job config. """ - self.vocab_size = tokenizer.n_words + self.vocab_size = tokenizer.vocab_size self.max_seq_len = job_config.training.seq_len def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, int]: diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index 3eb0f2fbc..3c2bb9e84 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -8,13 +8,12 @@ from typing import Tuple import torch -import torch.nn.functional as F from torch import nn from torchtitan.models.attention import build_attention from torchtitan.protocols.train_spec import ModelProtocol from .args import DeepSeekV3ModelArgs -from .moe import MoE +from .moe import FeedForward, MoE # Adapted from https://github.com/DeepSeek-ai/DeepSeek-V3/blob/main/inference/model.py#L294 @@ -152,17 +151,23 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): self.v_head_dim = model_args.v_head_dim if self.q_lora_rank == 0: - self.wq = nn.Linear(self.dim, self.n_heads * self.qk_head_dim) + self.wq = nn.Linear(self.dim, self.n_heads * self.qk_head_dim, bias=False) else: - self.wq_a = nn.Linear(self.dim, self.q_lora_rank) + self.wq_a = nn.Linear(self.dim, self.q_lora_rank, bias=False) self.q_norm = nn.RMSNorm(self.q_lora_rank, eps=model_args.norm_eps) - self.wq_b = nn.Linear(self.q_lora_rank, self.n_heads * self.qk_head_dim) - self.wkv_a = nn.Linear(self.dim, self.kv_lora_rank + self.qk_rope_head_dim) + self.wq_b = nn.Linear( + self.q_lora_rank, self.n_heads * self.qk_head_dim, bias=False + ) + self.wkv_a = nn.Linear( + self.dim, self.kv_lora_rank + self.qk_rope_head_dim, bias=False + ) self.kv_norm = nn.RMSNorm(self.kv_lora_rank, eps=model_args.norm_eps) self.wkv_b = nn.Linear( - self.kv_lora_rank, self.n_heads * (self.qk_nope_head_dim + self.v_head_dim) + self.kv_lora_rank, + self.n_heads * (self.qk_nope_head_dim + self.v_head_dim), + bias=False, ) - self.wo = nn.Linear(self.n_heads * self.v_head_dim, self.dim) + self.wo = nn.Linear(self.n_heads * self.v_head_dim, self.dim, bias=False) self.softmax_scale = self.qk_head_dim**-0.5 if model_args.max_seq_len > model_args.original_seq_len: @@ -192,9 +197,12 @@ def forward( if self.q_lora_rank == 0: q = self.wq(x) # (bsz, seqlen, n_heads * qk_head_dim) else: - q = self.wq_b(self.q_norm(self.wq_a(x))) - - q = q.view(bsz, seqlen, self.n_heads, self.qk_head_dim) + q = self.wq_a(x) + q = self.wq_b(self.q_norm(q)) + # Use -1 instead of `n_heads` (or `n_kv_heads`) to infer the actual + # local heads from sizes of q and kv as TP may have sharded them after + # the above linear ops. + q = q.view(bsz, seqlen, -1, self.qk_head_dim) q_nope, q_pe = torch.split( q, [self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1 ) @@ -211,10 +219,11 @@ def forward( kv = self.wkv_b( self.kv_norm(kv) ) # (bsz, seqlen, n_heads * (qk_nope_head_dim + v_head_dim)) - kv = kv.view(bsz, seqlen, self.n_heads, self.qk_nope_head_dim + self.v_head_dim) + kv = kv.view(bsz, seqlen, -1, self.qk_nope_head_dim + self.v_head_dim) k_nope, v = torch.split(kv, [self.qk_nope_head_dim, self.v_head_dim], dim=-1) + n_local_heads = k_nope.size(2) k = torch.cat( - [k_nope, k_pe.expand(-1, -1, self.n_heads, -1)], dim=-1 + [k_nope, k_pe.expand(-1, -1, n_local_heads, -1)], dim=-1 ) # (bsz, seqlen, n_heads, qk_head_dim) q = q.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) @@ -231,41 +240,23 @@ def forward( output = output.view(bsz, seqlen, -1) # (bsz, seqlen, n_heads * v_head_dim) return self.wo(output) # (bsz, seqlen, dim) + def init_weights(self, init_std: float): + linear_list = [ + self.wkv_a, + self.wkv_b, + ] + if self.q_lora_rank > 0: + linear_list.extend([self.wq_a, self.wq_b]) + else: + linear_list.append(self.wq) -class FeedForward(nn.Module): - """ - FeedForward module - - Args: - dim (int): Input dimension. - hidden_dim (int): Hidden dimension of the feedforward layer. - multiple_of (int): Value to ensure hidden dimension is a multiple of this value. - ffn_dim_multiplier (float | None): Custom multiplier for hidden dimension. Defaults to None. - - Attributes: - w1 (Linear): Linear transformation for the first layer. - w2 (Linear): Linear transformation for the second layer. - w3 (Linear): Linear transformation for the third layer. - - """ - - def __init__( - self, - dim: int, - hidden_dim: int, - ): - super().__init__() - self.w1 = nn.Linear(dim, hidden_dim, bias=False) - self.w2 = nn.Linear(hidden_dim, dim, bias=False) - self.w3 = nn.Linear(dim, hidden_dim, bias=False) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - return self.w2(F.silu(self.w1(x)) * self.w3(x)) + for linear in linear_list: + nn.init.trunc_normal_(linear.weight, mean=0.0, std=0.02) + nn.init.trunc_normal_(self.wo.weight, mean=0.0, std=init_std) - def init_weights(self, init_std: float): - nn.init.trunc_normal_(self.w1.weight, mean=0.0, std=0.02) - for linear in (self.w2, self.w3): - nn.init.trunc_normal_(linear.weight, mean=0.0, std=init_std) + self.kv_norm.reset_parameters() + if self.q_lora_rank > 0: + self.q_norm.reset_parameters() class TransformerBlock(nn.Module): @@ -278,12 +269,17 @@ def __init__(self, layer_id: int, model_args: DeepSeekV3ModelArgs): super().__init__() self.attention = Attention(model_args) self.attention_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) - self.moe_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) - self.moe = ( - FeedForward(model_args.dim, model_args.inter_dim) - if layer_id < model_args.n_dense_layers - else MoE(model_args) - ) + self.ffn_norm = nn.RMSNorm(model_args.dim, eps=model_args.norm_eps) + self.moe_enabled = layer_id >= model_args.n_dense_layers + + if self.moe_enabled: + self.moe = MoE(model_args) + else: + self.feed_forward = FeedForward(model_args.dim, model_args.inter_dim) + + # TODO: Need to revisit the weight initialization for the TransformerBlock + self.weight_init_std = 0.02 / (2 * (layer_id + 1)) ** 0.5 + self.layer_id = layer_id def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor): """ @@ -297,9 +293,21 @@ def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor): torch.Tensor: Output tensor with the same shape as the input. """ x = x + self.attention(self.attention_norm(x), freqs_cis) - x = x + self.moe(self.moe_norm(x)) + if self.moe_enabled: + x = x + self.moe(self.ffn_norm(x)) + else: + x = x + self.feed_forward(self.ffn_norm(x)) return x + def init_weights(self, buffer_device: torch.device): + for norm in (self.attention_norm, self.ffn_norm): + norm.reset_parameters() + self.attention.init_weights(self.weight_init_std) + if self.moe_enabled: + self.moe.init_weights(self.weight_init_std, buffer_device) + else: + self.feed_forward.init_weights(self.weight_init_std) + class DeepSeekV3Model(nn.Module, ModelProtocol): """ @@ -311,7 +319,7 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): self.max_seq_len = model_args.max_seq_len self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim) self.register_buffer( - "freqs_cis", precompute_freqs_cis(model_args), persistent=False + "freqs_cis", precompute_freqs_cis(model_args), persistent=True ) self.layers = torch.nn.ModuleDict() @@ -320,10 +328,36 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): self.norm = nn.RMSNorm(model_args.dim) self.output = nn.Linear( - model_args.dim, model_args.vocab_size, dtype=torch.get_default_dtype() + model_args.dim, + model_args.vocab_size, + dtype=torch.get_default_dtype(), + bias=False, ) + self.model_args = model_args self.init_weights() + def init_weights(self, buffer_device: torch.device | None = None) -> None: + buffer_device = buffer_device or self.freqs_cis.device + with torch.device(buffer_device): + self.freqs_cis = precompute_freqs_cis(self.model_args) + if self.tok_embeddings is not None: + nn.init.normal_(self.tok_embeddings.weight) + for layer in self.layers.values(): + if layer is not None: + layer.init_weights(buffer_device=buffer_device) + if self.norm is not None: + self.norm.reset_parameters() + final_out_std = self.model_args.dim**-0.5 + cutoff_factor = 3 + if self.output is not None: + nn.init.trunc_normal_( + self.output.weight, + mean=0.0, + std=final_out_std, + a=-cutoff_factor * final_out_std, + b=cutoff_factor * final_out_std, + ) + def forward(self, tokens: torch.Tensor): """ Forward pass for the Transformer model. @@ -339,8 +373,5 @@ def forward(self, tokens: torch.Tensor): for layer in self.layers.values(): h = layer(h, self.freqs_cis) h = self.norm(h) - output = self.output(h) # (batch_size, seq_len, dim) + output = self.output(h) return output - - def init_weights(self, buffer_device: torch.device | None = None) -> None: - pass diff --git a/torchtitan/models/deepseek_v3/model/moe.py b/torchtitan/models/deepseek_v3/model/moe.py index c9217c8be..2554d6131 100644 --- a/torchtitan/models/deepseek_v3/model/moe.py +++ b/torchtitan/models/deepseek_v3/model/moe.py @@ -7,11 +7,47 @@ import torch import torch.nn.functional as F from torch import nn +from torchtitan.experiments.llama4.infra.expert_parallel import expert_parallel from .args import DeepSeekV3ModelArgs -# Reference: torchtitan/experiments/llama4/model/ +class FeedForward(nn.Module): + """ + FeedForward module + + Args: + dim (int): Input dimension. + hidden_dim (int): Hidden dimension of the feedforward layer. + multiple_of (int): Value to ensure hidden dimension is a multiple of this value. + ffn_dim_multiplier (float | None): Custom multiplier for hidden dimension. Defaults to None. + + Attributes: + w1 (Linear): Linear transformation for the first layer. + w2 (Linear): Linear transformation for the second layer. + w3 (Linear): Linear transformation for the third layer. + + """ + + def __init__( + self, + dim: int, + hidden_dim: int, + ): + super().__init__() + self.w1 = nn.Linear(dim, hidden_dim, bias=False) + self.w2 = nn.Linear(hidden_dim, dim, bias=False) + self.w3 = nn.Linear(dim, hidden_dim, bias=False) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + return self.w2(F.silu(self.w1(x)) * self.w3(x)) + + def init_weights(self, init_std: float = 0.02): + nn.init.trunc_normal_(self.w1.weight, mean=0.0, std=0.02) + for linear in (self.w2, self.w3): + nn.init.trunc_normal_(linear.weight, mean=0.0, std=init_std) + + class GroupedExperts(nn.Module): def __init__( self, @@ -30,49 +66,73 @@ def __init__( def forward( self, x: torch.Tensor, - num_local_tokens_per_expert: torch.Tensor | list[int] | None = None, + num_tokens_per_expert: torch.Tensor | None = None, ) -> torch.Tensor: - # TODO: keeping this for loop implementation for comparison - # and readability, will remove later - if not self.use_grouped_mm: - if num_local_tokens_per_expert is not None: - # a tuple of tensors indexed by experts - # each with shape (tokens_per_expert(varying), dim) - x = torch.split( - x, - split_size_or_sections=num_local_tokens_per_expert, - dim=0, - ) - out_experts_splits = [] - for expert_idx, x_expert in enumerate(x): - w1, w2, w3 = ( - self.w1[expert_idx], - self.w2[expert_idx], - self.w3[expert_idx], - ) - h = F.silu(torch.matmul(x_expert, w1)) - h = h * torch.matmul(x_expert, w3) - h = torch.matmul(h, w2) - # h shape (tokens_per_expert(varying), dim) - out_experts_splits.append(h) - out = torch.cat(out_experts_splits, dim=0) - else: - # x shape (num_experts, tokens_per_expert, dim) - h = F.silu(torch.bmm(x, self.w1)) - h = h * torch.bmm(x, self.w3) - # out shape (num_experts, tokens_per_expert, dim) - out = torch.bmm(h, self.w2) - - return out - - # grouped mm implementation - if num_local_tokens_per_expert is not None: - # https://github.com/pytorch/pytorch/pull/150374 - # NOTE: torch._gouped_mm requires bf16 dtypes - # and shapes to be multiple of 8 - offsets = torch.cumsum( - num_local_tokens_per_expert, dim=0, dtype=torch.int32 + if self.use_grouped_mm: + return GroupedExperts._run_experts_grouped_mm( + self.w1, self.w2, self.w3, x, num_tokens_per_expert ) + else: + return GroupedExperts._run_experts_for_loop( + self.w1, self.w2, self.w3, x, num_tokens_per_expert + ) + + # TODO: keeping this for-loop implementation for comparison + # and readability, may remove later + @expert_parallel + @staticmethod + def _run_experts_for_loop( + w1: torch.Tensor, + w2: torch.Tensor, + w3: torch.Tensor, + x: torch.Tensor, + num_tokens_per_expert: torch.Tensor | None = None, + ) -> torch.Tensor: + if num_tokens_per_expert is not None: + # NOTE: this would incur a synchronization between device and host + num_tokens_per_expert = num_tokens_per_expert.tolist() + + # side-effect code due to the usage of generate_permute_indices + num_padding = x.shape[0] - sum(num_tokens_per_expert) + + # a tuple of tensors indexed by experts + # each with shape (tokens_per_expert(varying), dim) + x = torch.split( + x[: sum(num_tokens_per_expert)], + split_size_or_sections=num_tokens_per_expert, + dim=0, + ) + out_experts_splits = [] + for expert_idx, x_expert in enumerate(x): + h = F.silu(torch.matmul(x_expert, w1[expert_idx])) + h = h * torch.matmul(x_expert, w3[expert_idx]) + h = torch.matmul(h, w2[expert_idx]) + # h shape (tokens_per_expert(varying), dim) + out_experts_splits.append(h) + out = torch.cat(out_experts_splits, dim=0) + + # side-effect code due to the usage of generate_permute_indices + out = torch.vstack((out, out.new_zeros((num_padding, out.shape[-1])))) + else: + # x shape (num_experts, tokens_per_expert, dim) + h = F.silu(torch.bmm(x, w1)) + h = h * torch.bmm(x, w3) + # out shape (num_experts, tokens_per_expert, dim) + out = torch.bmm(h, w2) + + return out + + @expert_parallel + @staticmethod + def _run_experts_grouped_mm( + w1: torch.Tensor, + w2: torch.Tensor, + w3: torch.Tensor, + x: torch.Tensor, + num_tokens_per_expert: torch.Tensor | None = None, + ) -> torch.Tensor: + if num_tokens_per_expert is not None: + offsets = torch.cumsum(num_tokens_per_expert, dim=0, dtype=torch.int32) # grouped mm between a 2D tensor and a 3D tensor assert x.dim() == 2 else: @@ -80,15 +140,17 @@ def forward( # fall back to regular bmm between 3D tensors assert x.dim() == 3 - assert ( - x.dtype == self.w1.dtype == self.w2.dtype == self.w3.dtype == torch.bfloat16 - ), "torch._grouped_mm only supports bf16 dtypes" - h = F.silu(torch._grouped_mm(x, self.w1, offs=offsets)) - h = h * torch._grouped_mm(x, self.w3, offs=offsets) - out = torch._grouped_mm(h, self.w2, offs=offsets) + h = F.silu(torch._grouped_mm(x.bfloat16(), w1.bfloat16(), offs=offsets)) + h = h * torch._grouped_mm(x.bfloat16(), w3.bfloat16(), offs=offsets) + out = torch._grouped_mm(h, w2.bfloat16(), offs=offsets).type_as(x) return out + def init_weights(self, init_std: float): + nn.init.trunc_normal_(self.w1, mean=0.0, std=0.02) + nn.init.trunc_normal_(self.w2, mean=0.0, std=init_std) + nn.init.trunc_normal_(self.w3, mean=0.0, std=init_std) + class TokenChoiceTopKRouter(nn.Module): """This class implements token-choice routing. In token-choice top-K routing, each token is @@ -116,11 +178,10 @@ def __init__( self.top_k = top_k self.use_sigmoid = use_sigmoid self.route_sclaing_factor = route_sclaing_factor - - self.weight = nn.Parameter(torch.empty((self.num_experts, self.dim))) + self.gate = nn.Linear(self.dim, self.num_experts, bias=False) def forward( - self, x: torch.Tensor, expert_bias: torch.Tensor = None + self, x: torch.Tensor, expert_bias: torch.Tensor | None = None ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """ TODO: We haven't implement the group-based routing (node limit routing), @@ -134,11 +195,11 @@ def forward( Tokens grouped together by experts indices with shape ``(bs*slen*top_k,)``. token_indices (torch.Tensor): Token indices for routed_input with shape ``(bs*slen*top_k,)``. - num_local_tokens_per_expert (torch.Tensor): + num_tokens_per_expert (torch.Tensor): Number of tokens assigned to each expert with shape ``(num_experts,)``. """ # scores shape (bs*slen, num_experts) - scores = F.linear(x, self.weight, bias=None) + scores = self.gate(x) # By default, sigmoid or softmax is performed in float32 to avoid loss explosion if self.use_sigmoid: @@ -149,30 +210,41 @@ def forward( # top scores shape (bs*slen, top_k) # NOTE: The expert_bias is only used for routing. The gating value # top_scores is still derived from the original scores. - _, selected_experts_indices = torch.topk( - scores + expert_bias, k=self.top_k, dim=1 - ) - top_scores = scores.gather(dim=1, index=selected_experts_indices) + if expert_bias is not None: + _, selected_experts_indices = torch.topk( + scores + expert_bias, k=self.top_k, dim=1 + ) + top_scores = scores.gather(dim=1, index=selected_experts_indices) + else: + top_scores, selected_experts_indices = torch.topk( + scores, k=self.top_k, dim=1 + ) # group tokens together by expert indices from 0 to num_experts and pass that to experts forward - num_local_tokens_per_expert = torch.histc( + num_tokens_per_expert = torch.histc( selected_experts_indices.view(-1), bins=self.num_experts, min=0, max=self.num_experts, ) + + # Reorder the token indices to match the order of the experts # token_indices_experts_sorted shape (bs*slen*top_k,) token_indices_experts_sorted = torch.argsort( selected_experts_indices.view(-1), stable=True ) + + # reorder the scores to match the order of the token indices top_scores = top_scores.view(-1)[token_indices_experts_sorted] token_indices_experts_sorted = token_indices_experts_sorted // self.top_k top_scores = ( top_scores * self.route_sclaing_factor ) # must multiply the scaling factor + return top_scores, token_indices_experts_sorted, num_tokens_per_expert - return top_scores, token_indices_experts_sorted, num_local_tokens_per_expert + def init_weights(self, init_std: float): + nn.init.trunc_normal_(self.gate.weight, mean=0.0, std=init_std) class MoE(nn.Module): @@ -186,12 +258,11 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): top_k = model_args.n_activated_experts route_scaling_factor = model_args.route_scale - self.use_grouped_mm = model_args.use_grouped_mm self.experts = GroupedExperts( dim=dim, hidden_dim=hidden_dim, num_experts=num_experts, - use_grouped_mm=self.use_grouped_mm, + use_grouped_mm=model_args.use_grouped_mm, ) self.router = TokenChoiceTopKRouter( dim=dim, @@ -205,8 +276,8 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): GroupedExperts( dim=dim, hidden_dim=hidden_dim * model_args.n_shared_experts, - num_experts=1, - use_grouped_mm=self.use_grouped_mm, + num_experts=1, # Here needs to be 1 to make it equivalent to the MLP + use_grouped_mm=model_args.use_grouped_mm, ) if model_args.n_shared_experts > 0 else None @@ -214,33 +285,20 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): # auxiliary-loss-free load balancing self.load_balance_coeff = model_args.load_balance_coeff - # the fields below are defined even when load_balance_coeff is None - # to make initialization and checkpointing code simpler - self.register_buffer( - "expert_bias", - torch.zeros(num_experts, dtype=torch.float32), - persistent=True, - ) - self.register_buffer( - "tokens_per_expert", - torch.zeros(num_experts, dtype=torch.float32), - persistent=True, - ) - - # NOTE: forward hook, forward pre hook, or backward pre hook - # would conflict with activation checkpointing - if self.load_balance_coeff is not None and self.load_balance_coeff > 0: - self.register_full_backward_hook(self._update_expert_bias) - - # TODO: double check the bias update logic. It aligns with the paper. - def _update_expert_bias(self, *_): - expert_bias_delta = self.load_balance_coeff * torch.sign( - self.tokens_per_expert.mean() - self.tokens_per_expert - ) - expert_bias_delta = expert_bias_delta - expert_bias_delta.mean() - self.expert_bias.add_(expert_bias_delta) - - self.tokens_per_expert.zero_() + if self.load_balance_coeff is not None: + assert self.load_balance_coeff > 0.0 + self.register_buffer( + "expert_bias", + torch.zeros(num_experts, dtype=torch.float32), + persistent=True, + ) + self.register_buffer( + "tokens_per_expert", + torch.zeros(num_experts, dtype=torch.float32), + persistent=True, + ) + else: + self.expert_bias = None def forward(self, x: torch.Tensor) -> torch.Tensor: """ @@ -253,16 +311,18 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: bs, slen, dim = x.shape # top_scores and selected_indices shape (bs*slen*top_k,) - # num_local_tokens_per_expert shape (num_experts,) + # num_tokens_per_expert shape (num_experts,) ( top_scores, token_indices, - num_local_tokens_per_expert, + num_tokens_per_expert, ) = self.router(x.reshape(bs * slen, dim), self.expert_bias) - # will be used to update the expert bias for load balancing - self.tokens_per_expert += num_local_tokens_per_expert - + # tokens_per_expert will be used to update the expert bias for load balancing. + # Prevent extra local tokens accumulation on evaluation or activation recomputation. + if self.load_balance_coeff is not None and torch.is_grad_enabled(): + with torch.no_grad(): + self.tokens_per_expert.add_(num_tokens_per_expert) # shape (bs*slen*top_k, dim) token_indices = token_indices.reshape(-1, 1).expand(-1, dim) @@ -273,40 +333,9 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: index=token_indices, ) - if self.use_grouped_mm: - # NOTE: In order to use torch._grouped_mm, we need to make sure - # the number of tokens each expert gets is a multiple of 16. - # The following kernel helps achieve this via padding, without - # incurring synchronization between device and host. - from torchtitan.experiments.kernels.moe.indices import ( - generate_permute_indices, - ) - - ALIGN_SIZE_M = 16 - - with torch.no_grad(): - ( - permuted_indices, - num_local_tokens_per_expert, - _, - ) = generate_permute_indices( - num_local_tokens_per_expert, - self.experts.num_experts, - 1, - ALIGN_SIZE_M, - ) - token_indices = torch.vstack( - (token_indices, token_indices.new_zeros((dim))) - ) - token_indices = token_indices[permuted_indices, :] - routed_input = torch.vstack((routed_input, routed_input.new_zeros((dim)))) - routed_input = routed_input[permuted_indices, :] - else: - # NOTE: this would incur a synchronization between device and host - num_local_tokens_per_expert = num_local_tokens_per_expert.tolist() - # shape (bs*slen*top_k, dim) - routed_output = self.experts(routed_input, num_local_tokens_per_expert) + routed_output = self.experts(routed_input, num_tokens_per_expert) + routed_output = (routed_output.to(torch.float32) * top_scores.unsqueeze(-1)).to( x.dtype ) @@ -319,6 +348,7 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: else: out = torch.zeros_like(x.reshape(bs * slen, dim)) + # Accumulate multiple expert results becase each token can be routed to multiple experts out = out.scatter_add(dim=0, index=token_indices, src=routed_output) out = out.reshape(bs, slen, dim) return out @@ -333,10 +363,11 @@ def init_weights( if self.shared_expert is not None: self.shared_expert.init_weights(init_std) - with torch.device(buffer_device): - self.expert_bias = torch.zeros( - self.experts.num_experts, dtype=torch.float32 - ) - self.tokens_per_expert = torch.zeros( - self.experts.num_experts, dtype=torch.float32 - ) + if self.load_balance_coeff is not None: + with torch.device(buffer_device): + self.expert_bias = torch.zeros( + self.experts.num_experts, dtype=torch.float32 + ) + self.tokens_per_expert = torch.zeros( + self.experts.num_experts, dtype=torch.float32 + ) diff --git a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml index eddca8849..d160d02db 100644 --- a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml +++ b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml @@ -24,7 +24,7 @@ enable_wandb = false name = "deepseek_v3" flavor = "debugmodel" # test tokenizer.model, for debug purpose only -tokenizer_path = "./tests/assets/test_tiktoken.model" +tokenizer_path = "./assets/tokenizer/DeepSeek-V3" # converters = ["float8"] [optimizer] @@ -50,6 +50,8 @@ dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) data_parallel_replicate_degree = 1 data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always +tensor_parallel_degree = 1 +enable_async_tensor_parallel = false [checkpoint] enable_checkpoint = false diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index 4f08fb098..ad043827b 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -13,7 +13,7 @@ enable_memory_snapshot = false save_memory_snapshot_folder = "memory_snapshot" [metrics] -log_freq = 1 +log_freq = 10 disable_color_printing = false enable_tensorboard = false save_tb_folder = "tb" @@ -23,7 +23,7 @@ enable_wandb = false name = "deepseek_v3" flavor = "16B" # test tokenizer.model, for debug purpose only -tokenizer_path = "./tests/assets/test_tiktoken.model" +tokenizer_path = "./assets/tokenizer/DeepSeek-V3" # converters = ["float8"] [optimizer] @@ -38,10 +38,10 @@ decay_type = "linear" lr_min = 0.0 [training] -local_batch_size = 32 +local_batch_size = 16 seq_len = 2048 max_norm = 1.0 # grad norm clipping -steps = 10 +steps = 100 compile = false dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) @@ -49,6 +49,8 @@ dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) data_parallel_replicate_degree = 1 data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always +tensor_parallel_degree = 2 +enable_async_tensor_parallel = false [checkpoint] enable_checkpoint = false diff --git a/torchtitan/train.py b/torchtitan/train.py index 3dc8a61b2..2ea9a344f 100644 --- a/torchtitan/train.py +++ b/torchtitan/train.py @@ -128,7 +128,7 @@ def __init__(self, job_config: JobConfig): # build dataloader tokenizer = ( - self.train_spec.build_tokenizer_fn(job_config) + self.train_spec.build_tokenizer_fn(job_config.model.tokenizer_path) if self.train_spec.build_tokenizer_fn is not None else None ) From 55aef3aae9b3cf96b6bd59ff5162177bb353d722 Mon Sep 17 00:00:00 2001 From: Howard Huang Date: Thu, 3 Jul 2025 14:45:05 -0400 Subject: [PATCH 06/14] Fix CI (#1366) Current deepseek-v3 branch has a CI job broken --- torchtitan/components/tokenizer.py | 3 --- torchtitan/train.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/torchtitan/components/tokenizer.py b/torchtitan/components/tokenizer.py index 45ecf34f9..31fb24784 100644 --- a/torchtitan/components/tokenizer.py +++ b/torchtitan/components/tokenizer.py @@ -408,14 +408,11 @@ def build_hf_tokenizer( ) -> Union[HuggingFaceTokenizer, BaseTokenizer]: """ Builds a HuggingFaceTokenizer from the specified path. - This function creates a HuggingFaceTokenizer instance that handles BOS/EOS token inference and intelligent encoding. The tokenizer automatically detects and loads from various file formats and infers special token behavior. - Args: JobConfig: A JobConfig object containing the path to the tokenizer directory. - Returns: tokenizer (HuggingFaceTokenizer): Loaded tokenizer instance with intelligent BOS/EOS handling """ diff --git a/torchtitan/train.py b/torchtitan/train.py index 2ea9a344f..3dc8a61b2 100644 --- a/torchtitan/train.py +++ b/torchtitan/train.py @@ -128,7 +128,7 @@ def __init__(self, job_config: JobConfig): # build dataloader tokenizer = ( - self.train_spec.build_tokenizer_fn(job_config.model.tokenizer_path) + self.train_spec.build_tokenizer_fn(job_config) if self.train_spec.build_tokenizer_fn is not None else None ) From e61d46ac08f92826fb803f798e28ada92910775c Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 8 Jul 2025 11:36:44 -0700 Subject: [PATCH 07/14] rebase onto main branch --- torchtitan/models/deepseek_v3/README.md | 2 + .../models/deepseek_v3/infra/parallelize.py | 48 +++++++++++++++---- .../train_configs/deepseek_v3_16b.toml | 3 +- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index e0271bf82..d9c6849f9 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -1,3 +1,5 @@ +# DeepSeek-V3 in torchtitan + Download tokenizer: ``` diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index d387f1a02..436ff6ab6 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -18,18 +18,18 @@ from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP from torchtitan.distributed import ParallelDims from torchtitan.experiments.llama4.infra.expert_parallel import NoParallel -from torchtitan.experiments.llama4.infra.parallelize import apply_moe_ep_tp -from torchtitan.models.llama3.infra.parallelize import apply_ac, apply_fsdp +from torchtitan.experiments.llama4.infra.parallelize import apply_fsdp, apply_moe_ep_tp +from torchtitan.models.llama3.infra.parallelize import apply_ac, apply_ddp from torchtitan.tools.logging import logger +# Adapted from llama4/infra/parallelize.py def parallelize_deepseekv3( model: nn.Module, world_mesh: DeviceMesh, parallel_dims: ParallelDims, job_config: JobConfig, ): - if parallel_dims.tp_enabled: if job_config.parallelism.enable_async_tensor_parallel: # TODO(jianiw): This branch needs to be tested and enabled @@ -59,6 +59,7 @@ def parallelize_deepseekv3( enable_async_tp=False, ) + if parallel_dims.tp_enabled or parallel_dims.ep_enabled: apply_moe_ep_tp( model, tp_mesh=world_mesh["tp"] if parallel_dims.tp_enabled else None, @@ -73,16 +74,26 @@ def parallelize_deepseekv3( if job_config.activation_checkpoint.mode != "none": apply_ac(model, job_config.activation_checkpoint) + # turn on per-TransformerBlock compile after AC wrapping and before FSDP + if job_config.training.compile: + raise NotImplementedError("torch.compile is not supported yet for deepseekv3") + dp_mesh: DeviceMesh | None = None - if ( - parallel_dims.dp_shard_enabled - ): # apply FSDP or HSDP, potentially with Context Parallel + if parallel_dims.fsdp_enabled or parallel_dims.ep_enabled: + # apply FSDP or HSDP, potentially with Context Parallel if parallel_dims.dp_replicate_enabled: - dp_mesh_dim_names = ("dp_replicate", "dp_shard") + dp_mesh_dim_names = ("dp_replicate", "dp_shard_cp") else: - dp_mesh_dim_names = ("dp_shard",) + dp_mesh_dim_names = ("dp_shard_cp",) dp_mesh = world_mesh[tuple(dp_mesh_dim_names)] + # the mesh dim names of which the MoE params are sharded on via FSDP/HSDP + dp_mod_ep_mesh_dim_names = [] + if parallel_dims.ep_enabled: + if parallel_dims.dp_replicate_enabled: + dp_mod_ep_mesh_dim_names.append("dp_replicate") + dp_mod_ep_mesh_dim_names.append("dp_shard_mod_ep") + apply_fsdp( model, dp_mesh, @@ -91,6 +102,11 @@ def parallelize_deepseekv3( pp_enabled=parallel_dims.pp_enabled, cpu_offload=job_config.training.enable_cpu_offload, reshard_after_forward_policy=job_config.parallelism.fsdp_reshard_after_forward, + dp_mod_ep_mesh=( + world_mesh[tuple(dp_mod_ep_mesh_dim_names)] + if dp_mod_ep_mesh_dim_names + else None + ), ) if parallel_dims.dp_replicate_enabled: @@ -98,6 +114,22 @@ def parallelize_deepseekv3( else: logger.info("Applied FSDP to the model") + if parallel_dims.cp_enabled: + logger.info("Applied Context Parallel to the model") + + if job_config.training.enable_cpu_offload: + logger.info("Applied CPU Offloading to the model") + elif parallel_dims.dp_replicate_enabled: + if world_mesh.ndim > 1: + raise RuntimeError("DDP has not supported > 1D parallelism") + dp_mesh = world_mesh + apply_ddp( + model, + dp_mesh, + enable_compile=job_config.training.compile, + enable_compiled_autograd=job_config.parallelism.enable_compiled_autograd, + ) + return model diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index ad043827b..54be0be08 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -49,8 +49,9 @@ dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) data_parallel_replicate_degree = 1 data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always -tensor_parallel_degree = 2 +tensor_parallel_degree = 1 enable_async_tensor_parallel = false +expert_parallel_degree = 2 [checkpoint] enable_checkpoint = false From b33933c8dbe2907c69881e6026994aa46850568f Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 8 Jul 2025 11:51:27 -0700 Subject: [PATCH 08/14] add README --- .github/CODEOWNERS | 10 ++++++ torchtitan/models/deepseek_v3/README.md | 33 +++++++++++++++++-- .../train_configs/debug_model.toml | 3 +- .../train_configs/deepseek_v3_16b.toml | 6 ++-- 4 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..000cc1af1 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,10 @@ +# This is a CODEOWNERS file. +# Each line is a file pattern followed by one or more owners. + +# These owners will be the default owners for everything in +# the repo. Unless a later match takes precedence, +# they will be requested for review when someone opens a pull request. +* @tianyu-l @fegin @wwwjn @wconstab + +# Exclude the experiments directory by adding a pattern without owners +/torchtitan/experiments/ diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index d9c6849f9..f7e85d5b9 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -1,8 +1,35 @@ -# DeepSeek-V3 in torchtitan +# DeepSeek-V3 in TorchTitan -Download tokenizer: +DeepSeek-V3 is a Mixture-of-Experts (MoE) transformer model with Multi-head Latent Attention (MLA) architecture. -``` +## Setup + +### Download Tokenizer + +```bash # DeepSeek tokenizer (automatically downloads tokenizer.json and tokenizer_config.json) python scripts/download_tokenizer.py --repo_id deepseek-ai/DeepSeek-V3 ``` + +## Training + +### Debug Training + +```bash +# Quick debug run with small model +CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/debug_model.toml" ./run_train.sh +``` + +### Full Model Training + +```bash +# 16B parameter model +CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml" ./run_train.sh +``` + + +## Supported Features +- FSDP, HSDP +- Activation checkpointing +- Tensor Parallel (TP) +- Expert Parallel (EP) diff --git a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml index d160d02db..54253f94e 100644 --- a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml +++ b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml @@ -40,7 +40,7 @@ lr_min = 0.0 [training] local_batch_size = 8 -seq_len = 2048 +seq_len = 4096 max_norm = 1.0 # grad norm clipping steps = 10 compile = false @@ -52,6 +52,7 @@ data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always tensor_parallel_degree = 1 enable_async_tensor_parallel = false +expert_parallel_degree = 1 [checkpoint] enable_checkpoint = false diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index 54be0be08..53c9ba564 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -38,8 +38,8 @@ decay_type = "linear" lr_min = 0.0 [training] -local_batch_size = 16 -seq_len = 2048 +local_batch_size = 8 +seq_len = 4096 max_norm = 1.0 # grad norm clipping steps = 100 compile = false @@ -51,7 +51,7 @@ data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always tensor_parallel_degree = 1 enable_async_tensor_parallel = false -expert_parallel_degree = 2 +expert_parallel_degree = 1 [checkpoint] enable_checkpoint = false From 3674ca8e381b22bd8a2a1f0d563177f087faed20 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 8 Jul 2025 11:56:01 -0700 Subject: [PATCH 09/14] restore unrelated files --- torchtitan/experiments/llama4/train_configs/debug_model.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchtitan/experiments/llama4/train_configs/debug_model.toml b/torchtitan/experiments/llama4/train_configs/debug_model.toml index c805c1326..26b0250ba 100644 --- a/torchtitan/experiments/llama4/train_configs/debug_model.toml +++ b/torchtitan/experiments/llama4/train_configs/debug_model.toml @@ -27,7 +27,7 @@ tokenizer_path = "./tests/assets/tokenizer" [optimizer] # TODO: AdamW has numerical issues when TP is used, need to fix it -name = "Adam" +name = "AdamW" lr = 4e-3 eps = 1e-15 From d7bceb060a278523d00779398d63c08e40dd1b29 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 8 Jul 2025 11:58:18 -0700 Subject: [PATCH 10/14] update README --- torchtitan/models/deepseek_v3/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index f7e85d5b9..2003e3259 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -33,3 +33,16 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml" - Activation checkpointing - Tensor Parallel (TP) - Expert Parallel (EP) + + +## To be added +- Modeling + - Merge DeepSeek-V3 and Llama4 MoE common components +- Parallelism + - Context Parallel support for DeepSeek-V3 + - PP support for DeepSeek-V3 +- torch.compile +- Quantization +- Testing + - perfomance and loss converging tests + - CI integration From 399732323d175d5eb715c73780e84769b244b73d Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 10 Jul 2025 13:05:22 -0700 Subject: [PATCH 11/14] add 671B model config --- torchtitan/models/deepseek_v3/README.md | 7 ++ torchtitan/models/deepseek_v3/__init__.py | 4 +- .../models/deepseek_v3/infra/parallelize.py | 43 ++++++++---- .../train_configs/debug_model.toml | 7 +- .../train_configs/deepseek_v3_16b.toml | 12 ++-- .../train_configs/deepseek_v3_671b.toml | 70 +++++++++++++++++++ 6 files changed, 119 insertions(+), 24 deletions(-) create mode 100644 torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index 2003e3259..06c51b9bf 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -27,6 +27,11 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/debug_model.toml" ./r CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml" ./run_train.sh ``` +```bash +# 671B parameter model +CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml" ./run_train.sh +``` + ## Supported Features - FSDP, HSDP @@ -36,6 +41,8 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml" ## To be added +- TP: + - TP has a known numerical issue with DeepSeek-V3 (https://github.com/pytorch/torchtitan/pull/1373#issuecomment-3050249520). - Modeling - Merge DeepSeek-V3 and Llama4 MoE common components - Parallelism diff --git a/torchtitan/models/deepseek_v3/__init__.py b/torchtitan/models/deepseek_v3/__init__.py index 3ab9fc3c2..e86917bbc 100644 --- a/torchtitan/models/deepseek_v3/__init__.py +++ b/torchtitan/models/deepseek_v3/__init__.py @@ -30,8 +30,8 @@ "debugmodel": DeepSeekV3ModelArgs( vocab_size=102400, dim=256, - inter_dim=10944, - moe_inter_dim=1408, + inter_dim=1024, + moe_inter_dim=256, n_layers=3, n_dense_layers=1, n_heads=16, diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 436ff6ab6..3b3669689 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -11,6 +11,7 @@ ColwiseParallel, parallelize_module, PrepareModuleInput, + PrepareModuleInputOutput, RowwiseParallel, SequenceParallel, ) @@ -51,7 +52,7 @@ def parallelize_deepseekv3( "Currently, float8 tensorwise TP is not tested for deepseekv3" ) - apply_tp( + apply_non_moe_tp( model, world_mesh["tp"], loss_parallel=parallel_dims.loss_parallel_enabled, @@ -133,7 +134,7 @@ def parallelize_deepseekv3( return model -def apply_tp( +def apply_non_moe_tp( model: nn.Module, tp_mesh: DeviceMesh, loss_parallel: bool, @@ -145,6 +146,7 @@ def apply_tp( # transformer block's inputs) # 2. Parallelize the root norm layer over the sequence dim # 3. Parallelize the final linear output layer + logger.warning("There are known issue with TP for deepseekv3. Please see details in discussion: https://github.com/pytorch/torchtitan/pull/1373#issuecomment-3050249520.") parallelize_module( model, tp_mesh, @@ -182,21 +184,36 @@ def apply_tp( "attention.wkv_a": NoParallel(), "attention.wkv_b": colwise_parallel(), "attention.kv_norm": NoParallel(), - "attention.wq_a": NoParallel(), - "attention.wq_b": colwise_parallel(), - "attention.q_norm": NoParallel(), - "attention.wq": colwise_parallel(), # This is only used when q_lora_rank==0 "attention.wo": rowwise_parallel(output_layouts=Shard(1)), "ffn_norm": SequenceParallel(), - "feed_forward": prepare_module_input( - input_layouts=(Shard(1),), - desired_input_layouts=(Replicate(),), - ), - "feed_forward.w1": colwise_parallel(), - "feed_forward.w2": rowwise_parallel(output_layouts=Shard(1)), - "feed_forward.w3": colwise_parallel(), } + if transformer_block.attention.q_lora_rank == 0: + layer_plan.update( + { + "attention.wq": colwise_parallel(), # This is only used when q_lora_rank==0 + } + ) + else: + layer_plan.update( + { + "attention.wq_a": NoParallel(), + "attention.wq_b": colwise_parallel(), + "attention.q_norm": NoParallel(), + } + ) + + if not transformer_block.moe_enabled: + layer_plan.update({ + "feed_forward": prepare_module_input( + input_layouts=(Shard(1),), + desired_input_layouts=(Replicate(),), + ), + "feed_forward.w1": colwise_parallel(), + "feed_forward.w2": rowwise_parallel(output_layouts=Shard(1)), + "feed_forward.w3": colwise_parallel(), + }) + parallelize_module( module=transformer_block, device_mesh=tp_mesh, diff --git a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml index 54253f94e..dfa2111a3 100644 --- a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml +++ b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml @@ -24,7 +24,7 @@ enable_wandb = false name = "deepseek_v3" flavor = "debugmodel" # test tokenizer.model, for debug purpose only -tokenizer_path = "./assets/tokenizer/DeepSeek-V3" +tokenizer_path = "./tests/assets/tokenizer" # converters = ["float8"] [optimizer] @@ -40,7 +40,7 @@ lr_min = 0.0 [training] local_batch_size = 8 -seq_len = 4096 +seq_len = 2048 max_norm = 1.0 # grad norm clipping steps = 10 compile = false @@ -69,4 +69,5 @@ selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac bas [float8] enable_fsdp_float8_all_gather = false precompute_float8_dynamic_scale_for_fsdp = false -filter_fqns = ["output"] +filter_fqns = ["output", "router.gate"] +moe_fqns = ["experts"] diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index 53c9ba564..84c6b5f6b 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -22,26 +22,25 @@ enable_wandb = false [model] name = "deepseek_v3" flavor = "16B" -# test tokenizer.model, for debug purpose only tokenizer_path = "./assets/tokenizer/DeepSeek-V3" # converters = ["float8"] [optimizer] name = "AdamW" -lr = 8e-4 +lr = 2.2e-4 eps = 1e-8 [lr_scheduler] -warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps +warmup_steps = 200 # lr scheduler warm up, normally 20% of the train steps decay_ratio = 0.8 # lr scheduler decay ratio, 80% of the train steps decay_type = "linear" -lr_min = 0.0 +lr_min = 2.2e-5 [training] local_batch_size = 8 seq_len = 4096 max_norm = 1.0 # grad norm clipping -steps = 100 +steps = 1000 compile = false dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) @@ -67,4 +66,5 @@ mode = "full" # ["none", "selective", "full"] [float8] enable_fsdp_float8_all_gather = false precompute_float8_dynamic_scale_for_fsdp = false -filter_fqns = ["output"] +filter_fqns = ["output", "router.gate"] +moe_fqns = ["experts"] diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml new file mode 100644 index 000000000..26cb64fb7 --- /dev/null +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml @@ -0,0 +1,70 @@ +# torchtitan Config.toml + +[job] +dump_folder = "./outputs" +description = "DeepSeek-V3 671B model training" +print_args = false + +[profiling] +enable_profiling = false +save_traces_folder = "profile_trace" +profile_freq = 10 +enable_memory_snapshot = false +save_memory_snapshot_folder = "memory_snapshot" + +[metrics] +log_freq = 10 +disable_color_printing = false +enable_tensorboard = false +save_tb_folder = "tb" +enable_wandb = false + +[model] +name = "deepseek_v3" +flavor = "671B" +tokenizer_path = "./assets/tokenizer/DeepSeek-V3" +# converters = ["float8"] + +[optimizer] +name = "AdamW" +lr = 2.2e-4 +eps = 1e-8 + +[lr_scheduler] +warmup_steps = 2_000 # lr scheduler warm up, normally 20% of the train steps +decay_ratio = 0.8 # lr scheduler decay ratio, 80% of the train steps +decay_type = "linear" +lr_min = 2.2e-5 + +[training] +local_batch_size = 4 +seq_len = 4096 +max_norm = 1.0 # grad norm clipping +steps = 10_000 +compile = false +dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) + +[parallelism] +data_parallel_replicate_degree = 1 +data_parallel_shard_degree = -1 +fsdp_reshard_after_forward = "default" # default / never / always +tensor_parallel_degree = 8 +enable_async_tensor_parallel = false +expert_parallel_degree = 1 + +[checkpoint] +enable_checkpoint = false +folder = "checkpoint" +interval = 500 +last_save_model_weights_only = false +export_dtype = "float32" +async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem]" + +[activation_checkpoint] +mode = "full" # ["none", "selective", "full"] + +[float8] +enable_fsdp_float8_all_gather = false +precompute_float8_dynamic_scale_for_fsdp = false +filter_fqns = ["output", "router.gate"] +moe_fqns = ["experts"] From d882f49dea0fa0248a83ce8666ec61e4c9fd2939 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 10 Jul 2025 13:34:06 -0700 Subject: [PATCH 12/14] address comments --- torchtitan/components/tokenizer.py | 3 +++ torchtitan/experiments/llama4/train_configs/debug_model.toml | 1 - torchtitan/models/deepseek_v3/README.md | 3 ++- torchtitan/models/deepseek_v3/infra/parallelize.py | 1 - torchtitan/models/deepseek_v3/model/model.py | 1 - 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/torchtitan/components/tokenizer.py b/torchtitan/components/tokenizer.py index 31fb24784..c45641101 100644 --- a/torchtitan/components/tokenizer.py +++ b/torchtitan/components/tokenizer.py @@ -408,11 +408,14 @@ def build_hf_tokenizer( ) -> Union[HuggingFaceTokenizer, BaseTokenizer]: """ Builds a HuggingFaceTokenizer from the specified path. + This function creates a HuggingFaceTokenizer instance that handles BOS/EOS token inference and intelligent encoding. The tokenizer automatically detects and loads from various file formats and infers special token behavior. + Args: JobConfig: A JobConfig object containing the path to the tokenizer directory. + Returns: tokenizer (HuggingFaceTokenizer): Loaded tokenizer instance with intelligent BOS/EOS handling """ diff --git a/torchtitan/experiments/llama4/train_configs/debug_model.toml b/torchtitan/experiments/llama4/train_configs/debug_model.toml index 26b0250ba..d72406d8c 100644 --- a/torchtitan/experiments/llama4/train_configs/debug_model.toml +++ b/torchtitan/experiments/llama4/train_configs/debug_model.toml @@ -26,7 +26,6 @@ tokenizer_path = "./tests/assets/tokenizer" # converters = ["float8"] [optimizer] -# TODO: AdamW has numerical issues when TP is used, need to fix it name = "AdamW" lr = 4e-3 eps = 1e-15 diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index 06c51b9bf..d0b9900fd 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -23,7 +23,7 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/debug_model.toml" ./r ### Full Model Training ```bash -# 16B parameter model +# 16B parameter model: adapted from older 16B parameter model from https://huggingface.co/deepseek-ai/deepseek-moe-16b-base CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml" ./run_train.sh ``` @@ -45,6 +45,7 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml - TP has a known numerical issue with DeepSeek-V3 (https://github.com/pytorch/torchtitan/pull/1373#issuecomment-3050249520). - Modeling - Merge DeepSeek-V3 and Llama4 MoE common components + - Attention Layer: need to pass softmax_scale to sdpa() to support scaling - Parallelism - Context Parallel support for DeepSeek-V3 - PP support for DeepSeek-V3 diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 3b3669689..d6eeaf567 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -75,7 +75,6 @@ def parallelize_deepseekv3( if job_config.activation_checkpoint.mode != "none": apply_ac(model, job_config.activation_checkpoint) - # turn on per-TransformerBlock compile after AC wrapping and before FSDP if job_config.training.compile: raise NotImplementedError("torch.compile is not supported yet for deepseekv3") diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index 3c2bb9e84..476c14837 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -277,7 +277,6 @@ def __init__(self, layer_id: int, model_args: DeepSeekV3ModelArgs): else: self.feed_forward = FeedForward(model_args.dim, model_args.inter_dim) - # TODO: Need to revisit the weight initialization for the TransformerBlock self.weight_init_std = 0.02 / (2 * (layer_id + 1)) ** 0.5 self.layer_id = layer_id From b787bf2814da2bc045be7b7ddc6656f6493c6f78 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 10 Jul 2025 16:07:02 -0700 Subject: [PATCH 13/14] fix TP --- torchtitan/components/tokenizer.py | 4 +- torchtitan/models/deepseek_v3/README.md | 2 - .../models/deepseek_v3/infra/parallelize.py | 42 ++++++++++--------- torchtitan/models/deepseek_v3/model/model.py | 4 +- .../train_configs/debug_model.toml | 6 +-- 5 files changed, 30 insertions(+), 28 deletions(-) diff --git a/torchtitan/components/tokenizer.py b/torchtitan/components/tokenizer.py index c45641101..45ecf34f9 100644 --- a/torchtitan/components/tokenizer.py +++ b/torchtitan/components/tokenizer.py @@ -412,10 +412,10 @@ def build_hf_tokenizer( This function creates a HuggingFaceTokenizer instance that handles BOS/EOS token inference and intelligent encoding. The tokenizer automatically detects and loads from various file formats and infers special token behavior. - + Args: JobConfig: A JobConfig object containing the path to the tokenizer directory. - + Returns: tokenizer (HuggingFaceTokenizer): Loaded tokenizer instance with intelligent BOS/EOS handling """ diff --git a/torchtitan/models/deepseek_v3/README.md b/torchtitan/models/deepseek_v3/README.md index d0b9900fd..107bd0481 100644 --- a/torchtitan/models/deepseek_v3/README.md +++ b/torchtitan/models/deepseek_v3/README.md @@ -41,8 +41,6 @@ CONFIG_FILE="./torchtitan/models/deepseek_v3/train_configs/deepseek_v3_671b.toml ## To be added -- TP: - - TP has a known numerical issue with DeepSeek-V3 (https://github.com/pytorch/torchtitan/pull/1373#issuecomment-3050249520). - Modeling - Merge DeepSeek-V3 and Llama4 MoE common components - Attention Layer: need to pass softmax_scale to sdpa() to support scaling diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index d6eeaf567..4e103d569 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -145,7 +145,6 @@ def apply_non_moe_tp( # transformer block's inputs) # 2. Parallelize the root norm layer over the sequence dim # 3. Parallelize the final linear output layer - logger.warning("There are known issue with TP for deepseekv3. Please see details in discussion: https://github.com/pytorch/torchtitan/pull/1373#issuecomment-3050249520.") parallelize_module( model, tp_mesh, @@ -177,12 +176,13 @@ def apply_non_moe_tp( layer_plan = { "attention_norm": SequenceParallel(), "attention": prepare_module_input( - input_layouts=(Shard(1), None), - desired_input_layouts=(Replicate(), None), + input_layouts=(Shard(1), Replicate()), + desired_input_layouts=(Replicate(), Replicate()), ), - "attention.wkv_a": NoParallel(), - "attention.wkv_b": colwise_parallel(), - "attention.kv_norm": NoParallel(), + # use_local_output=False make the output to be a DTensor instead of a plain Tensor + "attention.wkv_a": NoParallel(use_local_output=False), + "attention.wkv_b": colwise_parallel(use_local_output=False), + "attention.kv_norm": NoParallel(use_local_output=False), "attention.wo": rowwise_parallel(output_layouts=Shard(1)), "ffn_norm": SequenceParallel(), } @@ -190,28 +190,32 @@ def apply_non_moe_tp( if transformer_block.attention.q_lora_rank == 0: layer_plan.update( { - "attention.wq": colwise_parallel(), # This is only used when q_lora_rank==0 + "attention.wq": colwise_parallel( + use_local_output=False + ), # This is only used when q_lora_rank==0 } ) else: layer_plan.update( { - "attention.wq_a": NoParallel(), - "attention.wq_b": colwise_parallel(), - "attention.q_norm": NoParallel(), + "attention.wq_a": NoParallel(use_local_output=False), + "attention.wq_b": colwise_parallel(use_local_output=False), + "attention.q_norm": NoParallel(use_local_output=False), } ) if not transformer_block.moe_enabled: - layer_plan.update({ - "feed_forward": prepare_module_input( - input_layouts=(Shard(1),), - desired_input_layouts=(Replicate(),), - ), - "feed_forward.w1": colwise_parallel(), - "feed_forward.w2": rowwise_parallel(output_layouts=Shard(1)), - "feed_forward.w3": colwise_parallel(), - }) + layer_plan.update( + { + "feed_forward": prepare_module_input( + input_layouts=(Shard(1),), + desired_input_layouts=(Replicate(),), + ), + "feed_forward.w1": colwise_parallel(), + "feed_forward.w2": rowwise_parallel(output_layouts=Shard(1)), + "feed_forward.w3": colwise_parallel(), + } + ) parallelize_module( module=transformer_block, diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index 476c14837..61034e4c7 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -212,6 +212,7 @@ def forward( # Key-value projection kv = self.wkv_a(x) # (bsz, seqlen, kv_lora_rank + qk_rope_head_dim) kv, k_pe = torch.split(kv, [self.kv_lora_rank, self.qk_rope_head_dim], dim=-1) + k_pe = apply_rotary_emb( k_pe.unsqueeze(2), freqs_cis ) # (bsz, seqlen, 1, qk_rope_head_dim) @@ -221,9 +222,8 @@ def forward( ) # (bsz, seqlen, n_heads * (qk_nope_head_dim + v_head_dim)) kv = kv.view(bsz, seqlen, -1, self.qk_nope_head_dim + self.v_head_dim) k_nope, v = torch.split(kv, [self.qk_nope_head_dim, self.v_head_dim], dim=-1) - n_local_heads = k_nope.size(2) k = torch.cat( - [k_nope, k_pe.expand(-1, -1, n_local_heads, -1)], dim=-1 + [k_nope, k_pe.expand(-1, -1, self.n_heads, -1)], dim=-1 ) # (bsz, seqlen, n_heads, qk_head_dim) q = q.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) diff --git a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml index dfa2111a3..905aa0067 100644 --- a/torchtitan/models/deepseek_v3/train_configs/debug_model.toml +++ b/torchtitan/models/deepseek_v3/train_configs/debug_model.toml @@ -23,7 +23,7 @@ enable_wandb = false [model] name = "deepseek_v3" flavor = "debugmodel" -# test tokenizer.model, for debug purpose only +# test tokenizer, for debug purpose only tokenizer_path = "./tests/assets/tokenizer" # converters = ["float8"] @@ -42,7 +42,7 @@ lr_min = 0.0 local_batch_size = 8 seq_len = 2048 max_norm = 1.0 # grad norm clipping -steps = 10 +steps = 1 compile = false dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) @@ -50,7 +50,7 @@ dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) data_parallel_replicate_degree = 1 data_parallel_shard_degree = -1 fsdp_reshard_after_forward = "default" # default / never / always -tensor_parallel_degree = 1 +tensor_parallel_degree = 2 enable_async_tensor_parallel = false expert_parallel_degree = 1 From df96da9bb0594bedb1a5347966ad273c2ad5dfca Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 10 Jul 2025 16:13:41 -0700 Subject: [PATCH 14/14] lint --- torchtitan/models/deepseek_v3/infra/parallelize.py | 1 - 1 file changed, 1 deletion(-) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 4e103d569..44e0bc6bb 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -11,7 +11,6 @@ ColwiseParallel, parallelize_module, PrepareModuleInput, - PrepareModuleInputOutput, RowwiseParallel, SequenceParallel, )