本章节以容器环境中的 PyTorch 架构为例,详细介绍如何在 AI 算力平台使用 CLI 命令行提交方式完成单节点模型训练任务。

前提条件

  • 已获取管理控制台账号和密码。

  • 已完成个人实名认证且账户余额大于 0 元。

  • 已准备好示例代码或数据,可参考附录内容。

  • 使用 CLI 命令行形式提交训练任务,需登录集群后台。建议使用 JupyterLab 登录

操作步骤

  1. 创建 AI 算力集群

  2. 将示例训练代码或数据上传至集群的并行文件存储中,并记录相应的存储路径。

  3. 使用 JupyterLab 登录至集群后台。

    example 2 1
  4. 进入示例代码 python.py 所在路径。在后台终端页面,确认当前已处于相应目录下。

    example 2 2
  5. 在示例代码所在路径下,创建 .slm 格式的执行脚本。例如,创建名为 python.slm 的脚本。

    vim python.slm

    脚本示例内容如下

    #!/bin/bash
    #SBATCH --job-name=Job-rFJJr
    #SBATCH --nodes=1
    #SBATCH --gpus-per-node=1
    #SBATCH --ntasks-per-node=1
    
    SINGULARITY_NOHTTPS=true singularity pull pytorch_2.0.1_cuda1.7_cudnn8_runtime.sif \
      http://10.42.12.8:8080/pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime
    
    export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
    export MASTER_PORT=40001
    
    srun singularity exec --nv --bind type=bind,source=<your_mount_path>,destination=<your_mount_path> \
      pytorch_2.0.1_cuda1.7_cudnn8_runtime.sif \
      torchrun  --nnodes=2 --nproc_per_node=1 \
      --rdzv_id=$SLURM_JOBID --rdzv_backend=c10d \
      --rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT \
      <your_python_training_script_path>

    脚本参数说明

    • <your_mount_path>:为挂载至集群上的并行文件存储的共享目录,即文件列表中的根目录,本示例中为 /shared_J_1

    • <your_python_training_script_path>:为示例代码所在的路径,本示例中为 /shared_J_1/home/user_1/python.py

  6. .slm 格式的执行脚本创建完成并保存后,执行如下命令,提交训练任务至集群。

    sbatch python.slm

    回显示例如下,将返回当前任务的 ID:

    Submitted batch job 21
  7. 查看训练任务状态,可通过以下两种方式。

    • 方法一

      在集群后台执行如下命令,查看对应任务 ID 的 State 值即可。

      sacct
    • 方法二

      训练服务页面,可查看上述任务显示在训练服务列表内。

      example 2 3
  8. 等待任务执行完成,点击相应任务操作列内的详情,进入其详细信息页面,查看或下载相应的标准输出日志错误输出日志

附录

示例训练代码 python.py 文件内容如下,可点击此处直接下载。

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

# [*] Packages required to import distributed data parallelism
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import subprocess


"""Start DDP code with "srun --partition=openai -n8 --gres=gpu:8 --ntasks-per-node=8 --job-name=slrum_test"
"""


# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)  # copy data from cpu to gpu

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # [*] only print log on rank 0
        if dist.get_rank() == 0 and batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)  # copy data from cpu to gpu
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    # [*] only print log on rank 0
    print_only_rank0(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


def print_only_rank0(log):
    if dist.get_rank() == 0:
        print(log)


if __name__ == '__main__':
    # [*] initialize the distributed process group and device
    # rank, local_rank, world_size, device = setup_DDP(verbose=True)
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.node id :  {os.getenv('SLURM_NODEID')}")
    print("torch.cuda.device_count() : " + str(torch.cuda.device_count()))

    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()

    # initialize dataset
    training_data = datasets.FashionMNIST(root="data", train=True, download=True, transform=ToTensor())
    test_data = datasets.FashionMNIST(root="data", train=False, download=True, transform=ToTensor())

    # initialize data loader
    # [*] using DistributedSampler
    batch_size = 64 // dist.get_world_size()   # [*] // world_size
    train_sampler = DistributedSampler(training_data, shuffle=True)  # [*]
    test_sampler = DistributedSampler(test_data, shuffle=False)  # [*]
    train_dataloader = DataLoader(training_data, batch_size=batch_size, sampler=train_sampler)  # [*] sampler=...
    test_dataloader = DataLoader(test_data, batch_size=batch_size, sampler=test_sampler)  # [*] sampler=...

    # initialize model
    model = NeuralNetwork().to(device_id)  # copy model from cpu to gpu
    # [*] using DistributedDataParallel
    model = DDP(model, device_ids=[device_id], output_device=device_id)  # [*] DDP(...)
    print_only_rank0(model)  # [*]

    # initialize optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # train on multiple-GPU
    epochs = 5
    for t in range(epochs):
        # [*] set sampler
        train_dataloader.sampler.set_epoch(t)
        test_dataloader.sampler.set_epoch(t)

        print_only_rank0(f"Epoch {t + 1}\n-------------------------------")  # [*]
        train(train_dataloader, model, loss_fn, optimizer, device_id)
        test(test_dataloader, model, loss_fn, device_id)

    print_only_rank0("Done!")  # [*]

    # [*] save model on rank 0
    if dist.get_rank() == 0:
        model_state_dict = model.state_dict()
        torch.save(model_state_dict, "model.pth")
        print("Saved PyTorch Model State to model.pth")