使用 Ray Tune 进行分布式调参

之前学习了怎么使用 PyTorch 进行分布式训练,不过,手动计算参数启动进程还是比较麻烦的。Berkeley 的 RISELab 开发了 Ray 这个框架来简化分布式计算程序的编写。Ray 有以下几个优点:

  • 支持在多种云平台一键部署集群
  • 支持 Autoscaling,可以节省成本
  • 支持多种深度学习框架,比如 PyTorch、TensorFlow

通过 Ray 统一的编程 API,写好的程序既可以在本地集群跑,本地集群资源紧张的时候也可以直接放到云端跑,不需要改代码,集群管理和调度都不需要我们自己操心。同时,Ray 还提供了调参库,等模型迭代成熟之后可以直接使用 Ray Tune 进行超参搜索,不需要自己写调参脚本了。

安装 Ray

安装 Ray 很简单,只需要在每台服务器都执行

pip install -U ray "ray[default]"

启动 Ray

Ray 采用了中心化的管理模型,分为头节点(Head Node)和工作节点(Worker Node)。头节点负责资源调度、数据分发以及用户交互等,工作节点会主动连接到头节点接受任务并执行。所以,在启动不同节点的时候需要的命令不一样。

启动头节点

首先,任选一台服务器作为头节点,然后执行命令:

ray start --head --node-ip-address 10.0.0.1 --port=6379 --redis-password=123 --dashboard-host=0.0.0.0

其中,--head 表示这是一个头节点,--node-ip-address 应该换成机器的真实 IP,不可以使用 127.0.0.1,接下来的两个参数则是指定了 Redis(用于数据交换的 KV 内存数据库)的启动选项,选择一个没有被占用的端口即可,密码可以自选。--dashboard-host 则是指定了 Ray 的监控面板应该监听哪个地址,默认监听的是 127.0.0.1,一般来说我们都不会在服务器上面开桌面,填 0.0.0.0 可以让我们用服务器的任何一个 IP 访问监控界面。

执行命令之后应该可以看到以下输出:

Local node IP: 10.0.0.1
2021-05-26 11:04:21,084 INFO services.py:1267 -- View the Ray dashboard at http://0.0.0.0:8265

--------------------
Ray runtime started.
--------------------

Next steps
  To connect to this Ray runtime from another node, run
    ray start --address='10.0.0.1:6379' --redis-password='123'

  Alternatively, use the following Python code:
    import ray
    ray.init(address='auto', _redis_password='123')

  If connection fails, check your firewall settings and network configuration.

  To terminate the Ray runtime, run
    ray stop

启动工作节点

然后,在除了头节点以外的服务器上,复制粘贴执行上面输出的命令。

ray start --address='10.0.0.1:6379' --redis-password='123'

成功应该可以看到以下输出:

Local node IP: 10.0.0.2

--------------------
Ray runtime started.
--------------------

To terminate the Ray runtime, run
  ray stop

说明工作节点成功连接到头节点了。

查看监控界面

头节点的输出中的

Local node IP: 10.0.0.1
2021-05-26 11:04:21,084 INFO services.py:1267 -- View the Ray dashboard at http://0.0.0.0:8265

显示了监控界面的监听端口,这里是 8265,将 0.0.0.0 换成你的服务器 IP,然后打开网址,就能看到 Ray 的监控界面了。

监控界面主要显示的是机器的资源情况还有任务执行情况。

用 Ray Tune 自动搜索超参训练模型

使用 Trainable 包装训练过程

Ray Tune 提供了几种方法将训练过程包装起来。首先是最基础的 tune.Trainable,这个类要求我们实现四个方法:

from ray import tune
class MNISTTrainable(tune.Trainable):
    def setup(self, config):
        pass
    def step(self):
        pass
    def save_checkpoint(self, checkpoint_dir):
        pass
    def load_checkpoint(self, checkpoint_path):
        pass

首先,在模型训练开始前,Ray 会调用 setup 函数,这个函数里,我们需要做初始化的工作,比如加载模型和数据集以及设置优化器等,另外,训练的超参数也会从 config 中传进来,在这个函数读取并保存到对象属性里即可。需要注意的是,如果涉及到数据集的下载等,需要注意多进程文件读写的同步。

from ray.tune.examples.mnist_pytorch import (train, test, get_data_loaders,
                                             ConvNet)
def setup(self, config):
    use_cuda = config.get("use_gpu") and torch.cuda.is_available()
    self.device = torch.device("cuda" if use_cuda else "cpu")
    self.train_loader, self.test_loader = get_data_loaders()
    self.model = ConvNet().to(self.device)
    self.optimizer = optim.SGD(
        self.model.parameters(),
        lr=config.get("lr", 0.01),
        momentum=config.get("momentum", 0.9))

然后,Ray 会循环调用 step 函数,在 step 的最后,应该返回一个字典,用来告诉 Ray 这次迭代的指标:

def step(self):
    train(self.model, self.optimizer, self.train_loader, device=self.device)
    acc = test(self.model, self.test_loader, self.device)
    return {"mean_accuracy": acc}

后面两个参数则是为了恢复训练而设计的,save_checkpoint 函数由 Ray 定期调用,在这个函数里,应该将训练的中间状态保存到 checkpoint_dir 指定的位置,并且返回保存后的路径。注意这里只能保存一个文件,保存的时候可以用 tuple 的方式将模型参数和优化器状态保存到文件里。

def save_checkpoint(self, checkpoint_dir):
    checkpoint_path = os.path.join(checkpoint_dir, "checkpoint.bin")
    torch.save((self.model.state_dict(), self.optimizer.state_dict()), checkpoint_path)
    return checkpoint_path

load_checkpoint 则顾名思义,只需要加载传进来的 checkpoint_path 文件即可 :

def load_checkpoint(self, checkpoint_path):
    model, optimizer = torch.load(checkpoint_path)
    self.model.load_state_dict(model)
    self.optimizer.load_state_dict(optimizer)

调用 Ray Tune 进行训练

实现了四个函数之后,就可以将我们的类交给 Ray Tune 进行调度运行,运行的方法非常简单。主函数只需要初始化 Ray 环境,然后调用 tune.run 即可。

from ray.tune.schedulers import ASHAScheduler
from ray.tune import CLIReporter
if __name__ == "__main__":
    # 初始化 Ray 集群
    ray.init(address='auto')
    # 开始搜索超参
    sched = ASHAScheduler(metric="mean_accuracy", mode="max")
    reporter = CLIReporter()
    analysis = tune.run(MNISTTrainable,
                      scheduler=sched,
                      progress_reporter=reporter,
                      stop={"mean_accuracy": 0.99,
                            "training_iteration": 100},
                      resources_per_trial={"cpu": 2, "gpu": 1},
                      num_samples=128,
                      config={"lr": tune.uniform(0.001, 1.0),
                              "momentum": tune.uniform(0.1, 0.9),
                              "use_gpu": True})
    # 保存实验结果
    analysis.results_df.to_csv("result.csv")
    # 获得最佳参数
    print("Best config is:", analysis.get_best_config(metric="mean_accuracy", mode="max"))

其中,scheduler 参数表示超参搜索的策略,有一些调度器可以提前终止表现一般的实验,节省资源给其他实验,同时也节省了搜索时间。具体有哪些调度器可以参考官方文档

stop 指定了停止的标准,一般需要设置 training_iteration,表示最多调用几次 step 函数,其他可以按照自己的需要设置,设置的值需要是 step 会返回的。

progress_reporter 指定了实验进度如何反馈给用户,这里用了默认的 CLIReporter ,如果用 Jupyter Notebook,可以用 JupyterNotebookReporter

resources_per_trial 则指定了分配多少资源给每个实验,默认是 1 个 CPU 和 0 个 GPU,如果不指定 GPU 数量的话,将无法使用 GPU 进行训练(CUDA_VISIBLE_DEVICES 会被设置成空值,即使服务器有 GPU 程序也会不认)。因为上面的示例没有写多卡代码,所以每个实验直接使用 1 个 GPU 即可。Ray 会尽可能调度多的实验一起跑,假如你有 8 个 GPU,那么会有 8 个实验同时进行。

num_samples 表示执行多少次实验,每一次实验,会将 config 里指定为随机分布的参数采样一遍。如果 config 里指定了 grid_search,那么同样的参数会被重复 num_samples 次。

# 会运行 9 个实验
tune.run(trainable, num_samples=1, config={
    "x": tune.grid_search([1, 2, 3]),
    "y": tune.grid_search([a, b, c])}
)

# 会运行 18 个实验
tune.run(trainable, num_samples=2, config={
    "x": tune.grid_search([1, 2, 3]),
    "y": tune.grid_search([a, b, c])}
)

编写完主函数之后,保存脚本,在任意一个启动了 Ray 的服务器上运行你的脚本

python main.py

应该可以看到输出:

2021-05-26 11:41:09,280 INFO worker.py:640 -- Connecting to existing Ray cluster at address: 10.0.0.1:6379

说明成功连接到了 Ray 集群,训练过程中,应该可以看到类似的进度报告:

== Status ==
Memory usage on this node: 19.0/250.9 GiB
Using AsyncHyperBand: num_stopped=3
Bracket: Iter 64.000: None | Iter 16.000: None | Iter 4.000: 0.1129 | Iter 1.000: 0.1129
Resources requested: 20.0/112 CPUs, 16.0/16 GPUs, 0.0/689.53 GiB heap, 0.0/299.5 GiB objects (0.0/4.0 accelerator_type:V100)
Current best trial: 358bb_00006 with mean_accuracy=0.1174 and parameters={'lr': 0.00032010745937302484, 'momentum': 0.3077716737703313}
Result logdir: /home/ray/ray_results/TorchTrainable_2021-05-26_11-41-09
Number of trials: 32/32 (25 PENDING, 4 RUNNING, 3 TERMINATED)
+----------------------------+------------+--------------------+-------------+------------+--------+--------+------------------+---------------+---------+---------------+
| Trial name                 | status     | loc                |          lr |   momentum |    acc |   iter |   total time (s) |   num_samples |   epoch |   batch_count |
|----------------------------+------------+--------------------+-------------+------------+--------+--------+------------------+---------------+---------+---------------|
| TorchTrainable_358bb_00000 | RUNNING    | 89.72.32.13:12113  | 0.00874452  |   0.652566 | 0.0829 |      6 |         50.2268  |         10000 |       6 |            40 |
| TorchTrainable_358bb_00003 | RUNNING    | 89.72.32.24:283613 | 0.00380314  |   0.755093 | 0.1128 |      6 |         49.1289  |         10000 |       6 |            40 |
| TorchTrainable_358bb_00004 | RUNNING    | 89.72.32.55:441696 | 0.000573206 |   0.531036 | 0.113  |      4 |         31.9356  |         10000 |       4 |            40 |
| TorchTrainable_358bb_00006 | RUNNING    | 89.72.32.18:30118  | 0.000320107 |   0.307772 | 0.1174 |      3 |         23.9964  |         10000 |       3 |            40 |
| TorchTrainable_358bb_00007 | PENDING    |                    | 0.000578684 |   0.826577 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00008 | PENDING    |                    | 0.000104167 |   0.841208 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00009 | PENDING    |                    | 0.00792254  |   0.419486 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00010 | PENDING    |                    | 0.00238698  |   0.496603 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00011 | PENDING    |                    | 0.00342996  |   0.611557 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00012 | PENDING    |                    | 0.000662736 |   0.379733 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00013 | PENDING    |                    | 0.000326747 |   0.835042 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00014 | PENDING    |                    | 0.00234961  |   0.761008 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00015 | PENDING    |                    | 0.00748315  |   0.234968 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00016 | PENDING    |                    | 0.000121585 |   0.806935 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00017 | PENDING    |                    | 0.00011588  |   0.578034 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00018 | PENDING    |                    | 0.00218129  |   0.663069 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00019 | PENDING    |                    | 0.000373483 |   0.726266 |        |        |                  |               |         |               |
| TorchTrainable_358bb_00001 | TERMINATED |                    | 0.000819226 |   0.293562 | 0.0594 |      1 |          8.50461 |         10000 |       1 |            40 |
| TorchTrainable_358bb_00002 | TERMINATED |                    | 0.00035852  |   0.701368 | 0.0722 |      1 |          8.65457 |         10000 |       1 |            40 |
| TorchTrainable_358bb_00005 | TERMINATED |                    | 0.00253235  |   0.834568 | 0.0767 |      1 |          8.26594 |         10000 |       1 |            40 |
+----------------------------+------------+--------------------+-------------+------------+--------+--------+------------------+---------------+---------+---------------+
... 12 more trials not shown (12 PENDING)

使用 TrainingOperator 包装训练过程

可以看到使用 tune.Trainable 的话,需要自己设置模型训练的初始化过程,需要自己封装多卡训练代码或者混合精度训练,如果没有特殊需求,可以使用 ray.util.sgd.torch 提供的 TrainingOperator。使用 TrainingOperator 可以不用自己操心 checkpoint 以及是否使用多卡等细节。

使用 TrainingOperatorTrainable 类似,需要我们重载几个方法。必须重载的只有 setup 方法。

class MNISTrainingOperator(TrainingOperator):
    def setup(self, config):
        train_loader, test_loader = get_data_loaders()
        model = ConvNet()
        optimizer = optim.SGD(model.parameters(), lr=config.get("lr", 1e-4), momentum=config.get("momentum", 0.9))
        criterion = torch.nn.NLLLoss()
        self.model, self.optimizer, self.criterion = self.register(models=model, optimizers=optimizer, criterion=criterion)
        self.register_data(train_loader=train_loader, validation_loader=test_loader)

setup 方法中,只需要做两件事:注册模型、注册数据加载器。而且,不用自己操心 DataParallel 之类的包装,也不需要操心 checkpoint 加载保存。甚至连训练循环都不需要自己写,当然大部分情况下还是需要自己重载的。

注册模型使用的是 self.register 方法,这个函数负责注册模型和优化器等,以便 Ray 分发模型参数。另外,如果需要自定义学习率的 scheduler,参数名是 schedulers。如果使用分布式训练,还可以使用 ddp_args={"find_unused_parameters": True} 来自定义 ddp 参数。如果使用混合精度训练,可以使用 apex_args={"opt_level": "O2"} 自定义 APEX 参数。

参数都可以传入数组,假如训练 GAN,models 就可以传 [generator, discriminator]。不过,假如 models 是数组,就必须重载 train_epoch 函数。

注册后,Ray 会自动使用 APEX 或者 DistributedDataParallel 包装模型,不需要我们自己包装。

register_data 负责注册数据加载器,同样的,Ray 会自动使用 DistributedRandomSampler 帮我们包装,不需要自己操心细节。

默认的情况下,TrainingOperator 的训练代码大概如下:

def train_epoch(self):
    for idx, batch in enumerate(self.train_loader):
        batch_info = {"batch_idx": idx, "global_step": global_step}
        metrics = self.train_batch(batch, batch_info)
        meter.update(metrics)
    return meter.summary()

def train_batch(self, batch, batch_info):
    *features, target = batch
    features = [feature.to(self.device) for feature in features]
    target = target.to(self.device)
    output = model(*features)
    loss = self.criterion(output, target)
    return {"train_loss": loss, "num_samples": target.size(0)}

当然,实际代码里会有其他的工作,比如更新 scheduler 还有一些统计工作。eval 的代码也大同小异,只是函数名字分别叫 validatevalidate_batch

如果要重写函数,记得 train_epoch 需要返回这个 epoch 的统计量字典。如果只重写 train_batch ,需要注意返回的字典一定要有 num_samples,代表这个 batch 有多少样本。

定义好 TrainingOperator 之后,用 TrainerTrainingOperator 包装起来:

trainer = TorchTrainer(
        training_operator_cls=MNISTrainingOperator,
        num_workers=8,
        num_cpus_per_worker=2,
        use_gpu=True,
        use_fp16=True,
        config={"batch_size": 16}
    )

其中,num_workers 代表使用多少个进程训练,其他参数 num_cpus_per_worker 则顾名思义。如果有多个 worker,模型和加载器就会自动被 DistributedDataParallel 和 DistributedRandomSampler 包装,自动扩展到多卡训练。而 use_fp16 则指定是否使用 APEX 混合精度训练。

之后,只需要调用 trainer.train() 就能训练一个 epoch:

for i in range(num_epochs):
    metrics = trainer.train()
    val_metrics = trainer.validate()

Trainer 本身不是一个可以直接传到 Ray Tune 的 Trainable,要想得到 Trainable,只需要执行

trainable = TorchTrainer.as_trainable(
        training_operator_cls=MNISTrainingOperator,
        num_workers=8,
        num_cpus_per_worker=2,
        use_gpu=True,
        use_fp16=True,
        config={"batch_size": 16}
    )

之后,参照上面的代码用 tune.run 执行这个 Trainable 就可以自动调参。

需要注意的是,使用这种方式调用 tune.run 的话,不需要也不可以指定 resources_per_trialtrainer 会有额外的方法自动计算所需要的资源传给 Ray Tune。