|
| 1 | +import os |
| 2 | +import numpy as np |
| 3 | +import mindspore as ms |
| 4 | +from mindspore import context, nn, Tensor, Parameter |
| 5 | +from mindnlp.transformers import BlenderbotTokenizer, BlenderbotForConditionalGeneration |
| 6 | +from datasets import load_dataset as hf_load_dataset |
| 7 | +from mindspore.dataset import GeneratorDataset |
| 8 | + |
| 9 | +# 环境配置 |
| 10 | +context.set_context( |
| 11 | + mode=context.PYNATIVE_MODE, |
| 12 | + device_target="Ascend", |
| 13 | + device_id=0, |
| 14 | + enable_graph_kernel=False, |
| 15 | + max_call_depth=3000, |
| 16 | + pynative_synchronize=True |
| 17 | +) |
| 18 | +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.STAND_ALONE) |
| 19 | +ms.set_context(reserve_class_name_in_scope=False) |
| 20 | + |
| 21 | +# 数据加载和过滤 |
| 22 | +def load_and_process_data(): |
| 23 | + print("加载数据集...") |
| 24 | + dataset = hf_load_dataset("databricks/databricks-dolly-15k", split="train") |
| 25 | + print(f"原始数据集大小: {len(dataset)}") |
| 26 | + filtered_dataset = dataset.filter( |
| 27 | + lambda x: x["instruction"] is not None and len(x["instruction"]) > 10 |
| 28 | + ) |
| 29 | + print(f"过滤后数据集大小: {len(filtered_dataset)}") |
| 30 | + return filtered_dataset |
| 31 | + |
| 32 | +# 数据预处理 |
| 33 | +def preprocess_data(tokenizer, dataset): |
| 34 | + print("开始数据预处理...") |
| 35 | + def process(examples): |
| 36 | + inputs = [str(text) for text in examples["instruction"]] |
| 37 | + targets = [str(text) for text in examples["response"]] |
| 38 | + model_inputs = tokenizer( |
| 39 | + inputs, |
| 40 | + max_length=128, |
| 41 | + truncation=True, |
| 42 | + padding="max_length", |
| 43 | + return_tensors="np" |
| 44 | + ) |
| 45 | + with tokenizer.as_target_tokenizer(): |
| 46 | + labels = tokenizer( |
| 47 | + targets, |
| 48 | + max_length=128, |
| 49 | + truncation=True, |
| 50 | + padding="max_length", |
| 51 | + return_tensors="np" |
| 52 | + ) |
| 53 | + input_ids = model_inputs["input_ids"].astype(np.int32) |
| 54 | + attention_mask = model_inputs["attention_mask"].astype(np.int32) |
| 55 | + labels_ids = labels["input_ids"].astype(np.int32) |
| 56 | + print(f"预处理中 input_ids 类型: {input_ids.dtype}, 示例值: {input_ids[0][:5]}") |
| 57 | + return { |
| 58 | + "input_ids": input_ids, |
| 59 | + "attention_mask": attention_mask, |
| 60 | + "labels": labels_ids |
| 61 | + } |
| 62 | + |
| 63 | + processed_dataset = dataset.map( |
| 64 | + process, |
| 65 | + batched=True, |
| 66 | + batch_size=64, |
| 67 | + remove_columns=dataset.column_names |
| 68 | + ) |
| 69 | + print(f"预处理后数据集大小: {len(processed_dataset)}") |
| 70 | + return processed_dataset |
| 71 | + |
| 72 | +# 创建 MindSpore 数据集 |
| 73 | +def create_dynamic_dataset(tokenized_dataset): |
| 74 | + print("创建 MindSpore 数据集...") |
| 75 | + def generator(): |
| 76 | + for item in tokenized_dataset: |
| 77 | + yield ( |
| 78 | + ms.Tensor(item["input_ids"], dtype=ms.int32), |
| 79 | + ms.Tensor(item["attention_mask"], dtype=ms.int32), |
| 80 | + ms.Tensor(item["labels"], dtype=ms.int32) |
| 81 | + ) |
| 82 | + |
| 83 | + dataset = GeneratorDataset( |
| 84 | + source=generator, |
| 85 | + column_names=["input_ids", "attention_mask", "labels"], |
| 86 | + shuffle=False |
| 87 | + ).batch(32, drop_remainder=True) |
| 88 | + print("数据集创建完成") |
| 89 | + return dataset |
| 90 | + |
| 91 | +# 模型定义 |
| 92 | +class DynamicBlenderbot(nn.Cell): |
| 93 | + def __init__(self, model_name="facebook/blenderbot-400M-distill"): |
| 94 | + super().__init__() |
| 95 | + print(f"加载模型和分词器: {model_name}") |
| 96 | + self.tokenizer = BlenderbotTokenizer.from_pretrained(model_name) |
| 97 | + self.model = BlenderbotForConditionalGeneration.from_pretrained(model_name) |
| 98 | + self.model.set_train(True) |
| 99 | + |
| 100 | + # 冻结底层 Transformer 层:冻结 encoder 和 decoder 中前 3 层 |
| 101 | + num_layers_to_freeze = 3 |
| 102 | + for name, param in self.model.parameters_and_names(): |
| 103 | + if "encoder.layers" in name: |
| 104 | + try: |
| 105 | + layer_num = int(name.split("encoder.layers.")[1].split(".")[0]) |
| 106 | + if layer_num < num_layers_to_freeze: |
| 107 | + param.requires_grad = False |
| 108 | + except Exception as e: |
| 109 | + pass |
| 110 | + if "decoder.layers" in name: |
| 111 | + try: |
| 112 | + layer_num = int(name.split("decoder.layers.")[1].split(".")[0]) |
| 113 | + if layer_num < num_layers_to_freeze: |
| 114 | + param.requires_grad = False |
| 115 | + except Exception as e: |
| 116 | + pass |
| 117 | + |
| 118 | + # 显式注册模型参数 |
| 119 | + print("显式注册模型参数...") |
| 120 | + for name, param in self.model.parameters_and_names(): |
| 121 | + # 根据冻结标志注册对应参数 |
| 122 | + setattr(self, f"param_{name.replace('.', '_')}", Parameter(param, requires_grad=param.requires_grad)) |
| 123 | + |
| 124 | + # 检查参数加载情况 |
| 125 | + trainable_params = self.trainable_params() |
| 126 | + print("检查模型参数...") |
| 127 | + for idx, param in enumerate(trainable_params): |
| 128 | + # 这里只会包含可训练参数(未冻结部分) |
| 129 | + param.requires_grad = True |
| 130 | + if idx < 5: |
| 131 | + print(f"Parameter {idx} shape: {param.shape}, requires_grad: {param.requires_grad}") |
| 132 | + |
| 133 | + total_params = len(trainable_params) |
| 134 | + print(f"模型总参数数量: {total_params}") |
| 135 | + if not trainable_params: |
| 136 | + raise ValueError(f"模型 {model_name} 初始化后没有可训练参数!请检查模型是否正确加载或兼容 MindSpore。") |
| 137 | + |
| 138 | + def construct(self, input_ids, attention_mask, labels): |
| 139 | + input_ids = input_ids.astype(ms.int32) |
| 140 | + return self.model( |
| 141 | + input_ids=input_ids, |
| 142 | + attention_mask=attention_mask, |
| 143 | + labels=labels |
| 144 | + ).loss |
| 145 | + |
| 146 | +# 定义训练单步网络 |
| 147 | +class TrainOneStepCell(nn.Cell): |
| 148 | + def __init__(self, network, optimizer, grad_clip_value=1.0): |
| 149 | + super(TrainOneStepCell, self).__init__() |
| 150 | + self.network = network |
| 151 | + self.optimizer = optimizer |
| 152 | + self.weights = ms.ParameterTuple(network.trainable_params()) |
| 153 | + self.grad = ms.ops.GradOperation(get_by_list=True) |
| 154 | + self.grad_clip_value = grad_clip_value |
| 155 | + self.clip_by_value = ms.ops.clip_by_value |
| 156 | + |
| 157 | + def construct(self, *inputs): |
| 158 | + loss = self.network(*inputs) |
| 159 | + grads = self.grad(self.network, self.weights)(*inputs) |
| 160 | + # 手动裁剪梯度 |
| 161 | + grads = tuple(self.clip_by_value(g, -self.grad_clip_value, self.grad_clip_value) for g in grads) |
| 162 | + self.optimizer(grads) |
| 163 | + return loss |
| 164 | + |
| 165 | +# 训练循环 |
| 166 | +def dynamic_train(model_name="facebook/blenderbot-400M-distill"): |
| 167 | + # 创建检查点目录 |
| 168 | + checkpoint_dir = "./checkpoints" |
| 169 | + os.makedirs(checkpoint_dir, exist_ok=True) |
| 170 | + print(f"检查点将保存至: {os.path.abspath(checkpoint_dir)}") |
| 171 | + |
| 172 | + dataset = load_and_process_data() |
| 173 | + tokenizer = DynamicBlenderbot(model_name).tokenizer |
| 174 | + processed_data = preprocess_data(tokenizer, dataset) |
| 175 | + train_dataset = create_dynamic_dataset(processed_data) |
| 176 | + |
| 177 | + print("初始化模型...") |
| 178 | + net = DynamicBlenderbot(model_name) |
| 179 | + |
| 180 | + # 执行一次虚拟前向传播以初始化参数 |
| 181 | + print("执行虚拟前向传播...") |
| 182 | + dummy_input_ids = Tensor(np.zeros((16, 128)), dtype=ms.int32) |
| 183 | + dummy_attention_mask = Tensor(np.ones((16, 128)), dtype=ms.int32) |
| 184 | + dummy_labels = Tensor(np.zeros((16, 128)), dtype=ms.int32) |
| 185 | + net(dummy_input_ids, dummy_attention_mask, dummy_labels) |
| 186 | + print("虚拟前向传播完成") |
| 187 | + |
| 188 | + # 获取可训练参数 |
| 189 | + params = net.trainable_params() |
| 190 | + print(f"优化器可训练参数数量: {len(params)}") |
| 191 | + if not params: |
| 192 | + raise ValueError(f"前向传播后无可训练参数!请检查模型 {model_name} 兼容性或 MindSpore 配置。") |
| 193 | + |
| 194 | + # 创建学习率调度器和优化器 |
| 195 | + total_steps = len(train_dataset) * 3 # 3个epoch的总步数 |
| 196 | + lr_scheduler = nn.CosineDecayLR(min_lr=1e-6, max_lr=2e-5, decay_steps=total_steps) |
| 197 | + optimizer = nn.Adam(params, learning_rate=lr_scheduler) |
| 198 | + |
| 199 | + # 包装网络为单步训练网络 |
| 200 | + train_net = TrainOneStepCell(net, optimizer, grad_clip_value=1.0) |
| 201 | + train_net.set_train(True) |
| 202 | + |
| 203 | + step = 0 |
| 204 | + for epoch in range(3): |
| 205 | + print(f"开始第 {epoch + 1} 个 epoch...") |
| 206 | + for batch in train_dataset: |
| 207 | + loss = train_net(*batch) |
| 208 | + current_lr = optimizer.learning_rate(step).asnumpy() # 获取当前学习率 |
| 209 | + if step % 10 == 0: |
| 210 | + print(f"Step {step} Loss: {loss.asnumpy()}, Learning Rate: {current_lr}") |
| 211 | + if step % 100 == 0: |
| 212 | + ms.save_checkpoint(net, f"{checkpoint_dir}/step_{step}.ckpt") |
| 213 | + step += 1 |
| 214 | + |
| 215 | +if __name__ == "__main__": |
| 216 | + assert context.get_context("mode") == context.PYNATIVE_MODE, "必须使用动态图模式" |
| 217 | + dynamic_train(model_name="facebook/blenderbot-400M-distill") |
| 218 | + # 如果需要尝试 3B 模型,取消注释以下行 |
| 219 | + # dynamic_train(model_name="facebook/blenderbot-3B") |
0 commit comments