diff --git a/docs/api/api_label b/docs/api/api_label index fb622f16792..6e3be163a2c 100644 --- a/docs/api/api_label +++ b/docs/api/api_label @@ -41,6 +41,9 @@ paddle.distributed.all_reduce .. _api_paddle_distributed_all_reduce: paddle.distributed.scatter .. _api_paddle_distributed_scatter: paddle.distributed.alltoall .. _api_paddle_distributed_alltoall: paddle.distributed.send .. _api_paddle_distributed_send: +paddle.distributed.isend .. _api_paddle_distributed_isend: +paddle.distributed.irecv .. _api_paddle_distributed_irecv: +paddle.distributed.reduce_scatter .. _api_paddle_distributed_reduce_scatter: paddle.distributed.QueueDataset .. _api_paddle_distributed_QueueDataset: paddle.distributed.barrier .. _api_paddle_distributed_barrier: paddle.distributed.CountFilterEntry .. _api_paddle_distributed_CountFilterEntry: diff --git a/docs/api/paddle/distributed/irecv_cn.rst b/docs/api/paddle/distributed/irecv_cn.rst new file mode 100644 index 00000000000..62663348311 --- /dev/null +++ b/docs/api/paddle/distributed/irecv_cn.rst @@ -0,0 +1,27 @@ +.. _cn_api_paddle_distributed_irecv: + +irecv +------------------------------- + + +.. py:function:: paddle.distributed.irecv(tensor, src=None, group=None) +异步接受发送来的tensor。 + +参数 +::::::::: + - tensor (Tensor) - 要接受的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 + - src (int) - 接受节点的全局rank号。 + - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。 + + +返回 +::::::::: +返回Task。 + +注意 +::::::::: +当前只支持动态图 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.irecv \ No newline at end of file diff --git a/docs/api/paddle/distributed/isend_cn.rst b/docs/api/paddle/distributed/isend_cn.rst new file mode 100644 index 00000000000..3d63fbbd3b7 --- /dev/null +++ b/docs/api/paddle/distributed/isend_cn.rst @@ -0,0 +1,28 @@ +.. _cn_api_paddle_distributed_isend: + +isend +------------------------------- + + +.. py:function:: paddle.distributed.isend(tensor, dst, group=None) +异步的将 ``tensor`` 发送到指定的rank进程上。 + +参数 +::::::::: + - tensor (Tensor) - 要发送的张量。其数据类型应为 float16、float32、float64、int32 或 int64。 + - dst (int) - 目标节点的全局rank号。 + - group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。 + + +返回 +::::::::: +返回Task。 + + +注意 +::::::::: +当前只支持动态图 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.isend \ No newline at end of file diff --git a/docs/api/paddle/reduce_scatter_cn.rst b/docs/api/paddle/reduce_scatter_cn.rst new file mode 100644 index 00000000000..09fa00c339d --- /dev/null +++ b/docs/api/paddle/reduce_scatter_cn.rst @@ -0,0 +1,29 @@ +.. _cn_api_paddle_distributed_reduce_scatter: + +reduce_scatter +------------------------------- + + +.. py:function:: paddle.distributed.reduce_scatter(tensor, tensor_list, op=ReduceOp.SUM, group=None, use_calc_stream=True) +规约,然后将张量列表分散到组中的所有进程上 + +参数 +::::::::: + - tensor (Tensor) – 输出的张量。 + - tensor_list (list(Tensor)) – 归约和切分的张量列表。 + - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD) – 操作类型,默认ReduceOp.SUM。 + - group: (Group, optional) – 通信组;如果是None,则使用默认通信组。 + - use_calc_stream: (bool, optional) – 决定是在计算流还是通信流上做该通信操作;默认为True,表示在计算流。 + + +返回 +::::::::: +返回Task。 + +注意 +::::::::: +当前只支持动态图 + +代码示例 +::::::::: +COPY-FROM: paddle.distributed.reduce_scatter \ No newline at end of file diff --git a/docs/guides/06_distributed_training/pipeline_parallel_cn.rst b/docs/guides/06_distributed_training/pipeline_parallel_cn.rst index 8a2dd31cd87..c757c7f1260 100644 --- a/docs/guides/06_distributed_training/pipeline_parallel_cn.rst +++ b/docs/guides/06_distributed_training/pipeline_parallel_cn.rst @@ -64,6 +64,38 @@ import paddle.nn.functional as F import paddle.distributed as dist import random + from paddle.io import Dataset, BatchSampler, DataLoader + + +创建数据集 + +.. code-block:: python + BATCH_NUM = 20 + BATCH_SIZE = 16 + EPOCH_NUM = 4 + + IMAGE_SIZE = 784 + CLASS_NUM = 10 + MICRO_BATCH_SIZE = 2 + + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([1, 28, 28]).astype('float32') + label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + dataset = RandomDataset(BATCH_NUM * BATCH_SIZE) + train_reader = DataLoader(dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) 构建一个可以运行流水线的模型,模型的layer需要被LayerDesc或者继承了LayerDesc的SharedLayerDesc包裹,这里因为不需要共享参数,所以就使用LayerDesc @@ -77,8 +109,9 @@ def forward(self, x): return x.reshape(shape=self.shape) + class AlexNetPipeDesc(PipelineLayer): - def __init__(self, num_classes=10, **kwargs): + def __init__(self, num_classes=CLASS_NUM, **kwargs): self.num_classes = num_classes decs = [ LayerDesc( @@ -108,14 +141,11 @@ ] super(AlexNetPipeDesc, self).__init__( layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs) - + 然后初始化分布式环境,这一步主要是构建流水线通信组的拓扑 .. code-block:: python - batch_size = 4 - micro_batch_size = 2 - strategy = fleet.DistributedStrategy() model_parallel_size = 1 data_parallel_size = 1 @@ -126,12 +156,11 @@ "pp_degree": pipeline_parallel_size } strategy.pipeline_configs = { - "accumulate_steps": batch_size // micro_batch_size, - "micro_batch_size": micro_batch_size + "accumulate_steps": BATCH_SIZE // MICRO_BATCH_SIZE, + "micro_batch_size": MICRO_BATCH_SIZE } - - - fleet.init(is_collective=True, strategy=strategy) + + fleet.init(is_collective=True, strategy=strategy) 为了保证流水线并行参数初始化和普通模型初始化一致,需要在不同卡间设置不同的seed。 @@ -162,7 +191,6 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属 .. code-block:: python - class ReshapeHelp(Layer): def __init__(self, shape): super(ReshapeHelp, self).__init__() @@ -214,35 +242,16 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属 optimizer = fleet.distributed_optimizer(optimizer) -创建mnist数据集 - -.. code-block:: python - - train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=batch_size, drop_last=True - ) - 开始训练 model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式 .. code-block:: python - for step_id, data in enumerate(train_reader()): - x_data = np.array([x[0] for x in data]).astype("float32").reshape( - batch_size, 1, 28, 28 - ) - y_data = np.array([x[1] for x in data]).astype("int64").reshape( - batch_size, 1 - ) - img = paddle.to_tensor(x_data) - label = paddle.to_tensor(y_data) - img.stop_gradient = True - label.stop_gradient = True - if step_id >= 5: - break - - loss = model.train_batch([img, label], optimizer, scheduler) + for i, (image, label) in enumerate(train_reader()): + if i >= 5: + break + loss = model.train_batch([image, label], optimizer, scheduler) print("pp_loss: ", loss.numpy()) 运行方式(需要保证当前机器有两张GPU): @@ -252,7 +261,7 @@ model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式 export CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch alexnet_dygraph_pipeline.py # alexnet_dygraph_pipeline.py是用户运行动态图流水线的python文件 -基于AlexNet的流水线并行动态图代码:`alex `_。 +基于AlexNet的完整的流水线并行动态图代码:`alex `_。 控制台输出信息如下: