Pytorch Lightning框架:使用笔记【LightningModule、LightningDataModule、Trainer、ModelCheckpoint】
pytorch是有缺陷的,例如要用半精度训练、BatchNorm参数同步、单机多卡训练,则要安排一下Apex,Apex安装也是很烦啊,我个人经历是各种报错,安装好了程序还是各种报错,而pl则不同,这些全部都安排,而且只要设置一下参数就可以了。另外,根据我训练的模型,4张卡的训练速度大概提升3倍,训练效果(图像生成)好很多,真香。另外,还有一个特色,就是你的超参数全部保存到模型中,如果你要调巨多参数
pytorch是有缺陷的,例如要用半精度训练、BatchNorm参数同步、单机多卡训练,则要安排一下Apex,Apex安装也是很烦啊,我个人经历是各种报错,安装好了程序还是各种报错,而pl则不同,这些全部都安排,而且只要设置一下参数就可以了。另外,根据我训练的模型,4张卡的训练速度大概提升3倍,训练效果(图像生成)好很多,真香。另外,还有一个特色,就是你的超参数全部保存到模型中,如果你要调巨多参数,那就不需要再对每个训练的模型进行参数标记了,而且恢复模型时可以直接恢复超参数,可以大大减小代码量和工作量,这点真是太香了。
Pytorch Lightning官方手册
Pytorch Lightning源码:GitHub地址
Pytorch Lightning使用案例:Pytorch-Lightning-Template项目 GitHub地址
一、Pytorch Lightning 的流程
Pytorch Lightning框架应用的流程很简单,生产流水线,有一个固定的顺序:
- 初始化 def init (self)
- 训练training_step(self, batch, batch_idx)
- 校验validation_step(self, batch, batch_idx)
- 测试 test_step(self, batch, batch_idx)
就完事了,总统是实现这三个函数的重写。
当然,除了这三个主要的,还有一些其他的函数,为了方便我们实现其他的一些功能,因此更为完整的流程是:
- 在training_step 后面都紧跟着其相应的 training_step_end(self,batch_parts)和training_epoch_end(self, training_step_outputs) 函数;
- validation_step 后面都紧跟着其相应的 validation_step_end(self,batch_parts)和validation_epoch_end(self, training_step_outputs) 函数;
- test_step 后面都紧跟着其相应的 test_step_end(self,batch_parts)和 test_epoch_end(self, training_step_outputs) 函数;
这里以训练为例:
def training_step(self, batch, batch_idx): x, y = batch y_hat = self.model(x) loss = F.cross_entropy(y_hat, y) pred = ... return {'loss': loss, 'pred': pred} def training_step_end(self, batch_parts): 当gpus=0 or 1时,这里的batch_parts即为traing_step的返回值(已验证) 当gpus>1时,这里的batch_parts为list,list中每个为training_step返回值,list[i]为i号gpu的返回值(这里未验证) gpu_0_prediction = batch_parts[0]['pred'] gpu_1_prediction = batch_parts[1]['pred'] # do something with both outputs return (batch_parts[0]['loss'] + batch_parts[1]['loss']) / 2 def training_epoch_end(self, training_step_outputs): 当gpu=0 or 1时,training_step_outputs为list,长度为steps的数量(不包括validation的步数,当你训练时,你会发现返回list<训练时的steps数,这是因为训练时显示的steps数据还包括了validation的,若将limit_val_batches=0.,即关闭validation,则显示的steps会与training_step_outputs的长度相同)。list中的每个值为字典类型,字典中会存有`training_step_end()`返回的键值,键名为`training_step()`函数返回的变量名,另外还有该值是在哪台设备上(哪张GPU上),例如{device='cuda:0'} for out in training_step_outputs: # do something with preds
1、Train
训练主要是重写def training_setp(self, batch, batch_idx)函数,并返回要反向传播的loss即可,其中batch 即为从 train_dataloader 采样的一个batch的数据,batch_idx即为目前batch的索引。
def training_setp(self, batch, batch_idx): image, label = batch pred = self.forward(iamge) loss = ... # 一定要返回loss return loss
2、Validation(设置校验的频率)
2.1 每训练n个epochs 校验一次
默认为每1个epoch校验一次,即自动调用validation_step()函数:
check_val_every_n_epoch
trainer = Trainer(check_val_every_n_epoch=1)
2.2 单个epoch内校验频率
当一个epoch 比较大时,就需要在单个epoch 内进行多次校验,这时就需要对校验的调动频率进行修改, 传入
val_check_interval
的参数为float型时表示百分比,为int时表示batch:# 每训练单个epoch的 25% 调用校验函数一次,注意:要传入float型数 trainer = Trainer(val_check_interval=0.25) # 当然也可以是单个epoch训练完多少个batch后调用一次校验函数,但是一定是传入int型 trainer = Trainer(val_check_interval=100) # 每训练100个batch校验一次
校验和训练是一样的,重写
def validation_step(self, batch, batch_idx)
函数,不需要返回值:
def validation_step(self, batch, batch_idx): image, label = batch pred = self.forward(iamge) loss = ... # 标记该loss,用于保存模型时监控该量 self.log('val_loss', loss)
3、test
在
pytoch_lightning
框架中,test 在训练过程中是不调用的,也就是说是不相关,在训练过程中只进行training
和validation
,因此如果需要在训练过中保存validation的一些信息,就要放到validation中。关于测试,测试是在训练完成之后的,因此这里假设已经训练完成:
# 获取恢复了权重和超参数等的模型 model = MODEL.load_from_checkpoint(checkpoint_path='my_model_path/heiheihei.ckpt') # 修改测试时需要的参数,例如预测的步数等 model.pred_step = 1000 # 定义trainer, 其中limit_test_batches表示取测试集中的0.05的数据来做测试 trainer = pl.Trainer(gpus=1, precision=16, limit_test_batches=0.05) # 测试,自动调用test_step(), 其中dm为数据集,放在下面讲 trainer.test(model=dck, datamodule=dm)
二、PL 的数据集
数据集有两种实现方法:
- 直接调用第三方公开数据集(如:MNIST等数据集)
- 自定义数据集(自己去继承torch.utils.data.dataset.Dataset,自定义类)
1、直接在Module中调用DataLoader
直接实现是指在 Model 中重写def train_dataloader(self)等函数来返回dataloader
1.1 使用现有的公平数据集
可以用现有的,例如MNIST等数据集
from torch.utils.data import DataLoader, random_split import pytorch_lightning as pl class MyExampleModel(pl.LightningModule): def __init__(self, args): super().__init__() dataset = MNIST(os.getcwd(), download=True, transform=transforms.ToTensor()) train_dataset, val_dataset, test_dataset = random_split(dataset, [50000, 5000, 5000]) self.train_dataset = train_dataset self.val_dataset = val_dataset self.test_dataset = test_dataset ... def train_dataloader(self): return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0) def val_dataloader(self): return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False) def test_dataloader(self): return DataLoader(self.test_dataset, batch_size=1, shuffle=True)
这样就完成了数据集和dataloader的编程了
1.2 自定义dataset
自己完成dataset的编写
# -*- coding: utf-8 -*- @Description: Define the format of data used in the model. import sys import pathlib import torch from torch.utils.data import Dataset from utils import sort_batch_by_len, source2ids abs_path = pathlib.Path(__file__).parent.absolute() sys.path.append(sys.path.append(abs_path)) class SampleDataset(Dataset): The class represents a sample set for training. def __init__(self, data_pairs, vocab): self.src_texts = [data_pair[0] for data_pair in data_pairs] self.tgt_texts = [data_pair[1] for data_pair in data_pairs] self.vocab = vocab self._len = len(data_pairs) # Keep track of how many data points. def __len__(self): return self._len def __getitem__(self, index): # print("\nself.src_texts[{0}] = {1}".format(index, self.src_texts[index])) src_ids, oovs = source2ids(self.src_texts[index], self.vocab) # 将当前文本self.src_texts[index]转为ids,oovs为超出词典范围的词汇文本 item = { 'x': [self.vocab.SOS] + src_ids + [self.vocab.EOS], 'y': [self.vocab.SOS] + [self.vocab[i] for i in self.tgt_texts[index]] + [self.vocab.EOS], 'x_len': len(self.src_texts[index]), 'y_len': len(self.tgt_texts[index]), 'oovs': oovs, 'len_oovs': len(oovs) return item
2、自定义DataModule类(继承LightningDataModule)来调用DataLoader
这种方法是继承
pl.LightningDataModule
来提供训练、校验、测试的数据。from torch.utils.data import DataLoader, random_split import pytorch_lightning as pl class MyDataModule(pl.LightningDataModule): def __init__(self): super().__init__() def prepare_data(self): # 在该函数里一般实现数据集的下载等,只有cuda:0 会执行该函数 # download, split, etc... # only called on 1 GPU/TPU in distributed def setup(self, stage): # make assignments here (val/train/test split) # called on every process in DDP # 实现数据集的定义,每张GPU都会执行该函数, stage 用于标记是用于什么阶段 if stage == 'fit' or stage is None: self.train_dataset = MyDataset(self.train_file_path, self.train_file_num, transform=None) self.val_dataset = MyDataset(self.val_file_path, self.val_file_num, transform=None) if stage == 'test' or stage is None: self.test_dataset = MyDataset(self.test_file_path, self.test_file_num, transform=None) def train_dataloader(self): return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0) def val_dataloader(self): return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False) def test_dataloader(self): return DataLoader(self.test_dataset, batch_size=1, shuffle=True)
三、PL的使用
# Training & Test if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-is_train", default='yes', type=str, choices=['yes', 'no']) args = parser.parse_args() seed_everything(42) # 定义数据集 data_module = GLUEDataModule(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", task_name="cola") print("args.is_train = ", args.is_train) if args.is_train == 'yes': # 训练 # 定义数据集为训练校验(train+validation)阶段 data_module.setup("fit") # 初始化`ModelCheckpoint`回调,并设置要监控的量 checkpoint_callback = ModelCheckpoint(monitor='val_loss', dirpath="saved_module", filename='sample-cola-{epoch:02d}-{val_loss:.2f}') # 定义模型 model = GLUETransformer(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", num_labels=data_module.num_labels, eval_splits=data_module.eval_splits, task_name=data_module.task_name) # 定义logger logger = TensorBoardLogger('log_dir', name='test_PL') # 定义trainer trainer = Trainer(max_epochs=20, gpus=AV
AIL_GPUS, check_val_every_n_epoch=1, callbacks=[checkpoint_callback]) # 默认为每1个epoch校验一次,即自动调用validation_step()函数;将 checkpoint_callback 放到Trainer的callback 的list中 # 开始训练 trainer.fit(model=model, datamodule=data_module) # trainer.save_checkpoint(filepath=os.path.join("saved_module")) # trainer.validate(model, data_module.val_dataloader()) else: # 定义数据集为测试(test)阶段 data_module.setup("test") # 恢复模型 model = GLUETransformer.load_from_checkpoint(checkpoint_path='saved_module/sample-cola-epoch=00-val_loss=0.53.ckpt') # 定义trainer并测试 trainer = Trainer(gpus=AVAIL_GPUS, precision=16, limit_test_batches=0.05) trainer.test(model=model, datamodule=data_module)
四、模型、Trainer的保存与恢复
1、利用Trainer保存模型(在Trainer中设置default_root_dir参数)
Lightning 会自动保存最近训练的epoch的模型到当前的工作空间(or.getcwd()),也可以在定义Trainer的时候指定:
trainer = Trainer(default_root_dir='/your/path/to/save/checkpoints')
当然,也可以关闭自动保存模型:
trainer = Trainer(checkpoint_callback=False)
2、利用ModelCheckpoint (callbacks)保存模型
2.1 参数说明(所有参数均为optional)
ModelCheckpoint( dirpath=None, filename=None, monitor=None, verbose=False, save_last=None, save_top_k=1, save_weights_only=False, mode="min", auto_insert_metric_name=True, every_n_train_steps=None, train_time_interval=None, every_n_epochs=None, save_on_train_epoch_end=None, every_n_val_epochs=None
dirpath
: string类型。例如:dirpath=‘my/path_to_save_model/’;filename
: string类型;前面就说过不建议使用filepath变量,推荐使用monitor
:需要监控的量,string类型。例如’val_loss’(在training_step() or validation_step()函数中通过self.log(‘val_loss’, loss)进行标记);默认为None,只保存最后一个epoch的模型参数,(我的理解是只保留最后一个epoch的模型参数,但是还是每训练完一个epoch之后会保存一次,然后覆盖上一次的模型)verbose
:冗余模式,默认为False.save_last
: bool类型; 默认None,当为True时,表示在每个epoch 结果的时候,总是会保存一个模型last.ckpt,也就意味着会覆盖保存,只会有一个文件保留。save_top_k
:int类型;
- 当 save_top_k==k,根据monitor监控的量,保存k个最好的模型,而最好的模型是当monitor监控的量最大时表示最好,还是最小时表示最好,在后面的参数mode中进行设置。
- 当save_top_k==0时,不保存;
- 当save_top_k==-1时,保存所有的模型,即每个次保存模型不进行覆盖保存,全都保存下来;
- 当save_top_k>=2,并且在单个epoch内多次调用保存模型的函数,则模型的名字最后会追加版本号,从v0开始。
save_weights_only
: bool 类型;True只保存模型权重(model.save_weights(flepath)),否则保存整个模型。建议保存权重就可以了,保存整个模型会消耗更多时间和存储空间。mode
:string类型,只能取{‘auto’, ‘min’, ‘max’}中的一个;当save_top_k!=0时,保存模型时就会覆盖保存,如果monitor监控的是val_loss等越小就表示模型越好的,这个参数应该被设置成’min’,当monitor监控的是val_acc(校验准确度)等越大就表示模型训练的越好的量,则应该设置成’max’。auto会自动根据monitor的名字来判断(auto模式是个人理解,可能会出错,例如你编程的时候,你就喜欢用val_loss表示模型准确度这样就会导致保存的模型是最差的模型了)。2.2 ModelCheckpoint的使用
自动保存下,也可以自定义要监控的量来保存模型,步骤如下:
- 计算需要监控的量,例如校验误差:loss
- 使用log()函数标记该要监控的量(直接在training_step、validation_step中添加)
- 初始化ModelCheckpoint回调,并设置要监控的量,
- 将其传回到Trainer中
from pytorch_lightning import Trainer, LightningDataModule, LightningModule, Callback, seed_everything from pytorch_lightning.callbacks import ModelCheckpoint from pytorch_lightning.loggers import TensorBoardLogger # Transformer LightningModule class GLUETransformer(LightningModule): ... def training_step(self, batch, batch_idx): # 1. 计算loss outputs = self(**batch) train_loss = outputs[0] # 2. 使用log()函数标记该要监控的量,名字叫'val_loss' self.log('ltrain_lossoss', train_loss) return train_loss ... def validation_step(self, batch, batch_idx, dataloader_idx=0): outputs = self(**batch) # 1. 计算需要监控的量 val_loss, logits = outputs[:2] # 2. 使用log()函数标记该要监控的量,名字叫'val_loss' self.log('val_loss', val_loss) if self.hparams.num_labels >= 1: preds = torch.argmax(logits, axis=1) elif self.hparams.num_labels == 1: preds = logits.squeeze() labels = batch["labels"] return {"loss": val_loss, "preds": preds, "labels": labels} ... # Training & Test if __name__ == "__main__": seed_everything(42) # 定义数据集 data_module = GLUEDataModule(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", task_name="cola") # 定义模型 model = GLUETransformer(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", num_labels=data_module.num_labels, eval_splits=data_module.eval_splits, task_name=data_module.task_name) # 初始化`ModelCheckpoint`回调,并设置要监控的量 checkpoint_callback = ModelCheckpoint( dirpath='saved_module', filename='sample-cola-{epoch:02d}-{val_loss:.2f}', monitor='val_loss' # 定义trainer trainer = Trainer(max_epochs=20, gpus=AVAIL_GPUS, check_val_every_n_epoch=1, callbacks=[checkpoint_callback]) # 默认为每1个epoch校验一次,即自动调用validation_step()函数;将 checkpoint_callback 放到Trainer的callback 的list中 # 开始训练 trainer.fit(model=model, datamodule=data_module)
在上面的
filename
参数中,定义了模型文件的保存格式,然后通过自动
调用format_checkpoint_name
函数给其中的变量赋值的,返回string
类型,文件名。>>> tmpdir = os.path.dirname(__file__) >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch}') >>> os.path.basename(ckpt.format_checkpoint_name(0, 1, metrics={})) 'epoch=0.ckpt' >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch:03d}') >>> os.path.basename(ckpt.format_checkpoint_name(5, 2, metrics={})) 'epoch=005.ckpt' >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{epoch}-{val_loss:.2f}') >>> os.path.basename(ckpt.format_checkpoint_name(2, 3, metrics=dict(val_loss=0.123456))) 'epoch=2-val_loss=0.12.ckpt' >>> ckpt = ModelCheckpoint(dirpath=tmpdir, filename='{missing:d}') >>> os.path.basename(ckpt.format_checkpoint_name(0, 4, metrics={})) 'missing=0.ckpt' >>> ckpt = ModelCheckpoint(filename='{step}') >>> os.path.basename(ckpt.format_checkpoint_name(0, 0, {})) 'step=0.ckpt'
2.2 获取最好的模型
因为根据上面保存的参数,可能保存了多个模型,根据
best_model_path
恢复最好的模型。from pytorch_lightning import Trainer from pytorch_lightning.callbacks import ModelCheckpoint checkpoint_callback = ModelCheckpoint(dirpath='my/path/') trainer = Trainer(callbacks=[checkpoint_callback]) model = ... trainer.fit(model) # 训练完成之后,保存了多个模型,下面是获得最好的模型,也就是将原来保存的模型中最好的模型权重apply到当前的
网络上 checkpoint_callback.best_model_path
3、手动保存模型
除了自动保存,也可以手动保存、加载模型
model = MyLightningModule(hparams) trainer.fit(model) trainer.save_checkpoint("example.ckpt") new_model = MyModel.load_from_checkpoint(checkpoint_path="example.ckpt")
当我们采用该 Pytorch Lightning 框架做强化学习的时候,由于强化学习的训练数据集不是固定的,是与环境实时交互生成的训练数据,因此在整个训练过程中,Epoch恒为0,模型就不会自动保存,这时候需要我们手动保存模型。
另外,保存的模型一般都挺大的,因此保存最好的三个模型就OK了,可以通过一个队列来进行维护,保存新的,删除旧的:
from collections import deque import os # 维护一个队列 self.save_models = deque(maxlen=3) # 这里的self 是指这个函数放到继承了pl.LightningModule的类里,跟training_step()是同级的 def manual_save_model(self): model_path = 'your_model_save_path_%s' % (your_loss) if len(self.save_models) >= 3: # 当队列满了,取出最老的模型的路径,然后删除掉 old_model = self.save_models.popleft() if os.path.exists(old_model): os.remove(old_model) # 手动保存 self.trainer.save_checkpoint(model_path) # 将保存的模型路径加入到队列中 self.save_models.append(model_path)
上面的函数,可以通过简单的判断,如果损失更小的,或者reward更大了,我们再调用,保存模型。
为了保险起见,我们也可以每隔一段时间就保存一个最新的模型。
这个函数是从pl的原码中抠出来的,因此保存的路径是我们前面在设置
checkpoint_callbacks
的时候设置的路径,也就是本文前面ModelCheckpoint (callbacks)
这一节中的dir_path
路径,会在该路径下自动保存latest.ckpt
文件
# 保存最新的路径 def save_latest_model(self): checkpoint_callbacks = [c for c in self.trainer.callbacks if isinstance(c, ModelCheckpoint)] print("Saving latest checkpoint...") model = self.trainer.get_model() [c.on_validation_end(self.trainer, model) for c in checkpoint_callbacks]
4、加载Checkpoint
4.1 load_from_checkpoint 方法
pl.LightningModule.load_from_checkpoint( checkpoint_path=checkpoint_path, map_location=None, hparams_file=None, strict=True, **kwargs
该方法是从checkpoint 加载模型的主要方法。
4.2 加载模型的权重、偏置和超参数
model = MyLightingModule.load_from_checkpoint(PATH) print(model.learning_rate) # prints the learning_rate you used in this checkpoint model.eval() y_hat = model(x)
4.3 如果需要修改超参数,在写Module的时候进行覆盖
class LitModel(LightningModule): def __init__(self, in_dim, out_dim): super().__init__() self.save_hyperparameters() # 在这里使用新的超参数,而不是从模型中加载的超参数 self.l1 = nn.Linear(self.hparams.in_dim, self.hparams.out_dim)
这样的话,可以如下恢复模型:
# 例如训练的时候初始化in_dim=32, out_dim=10 LitModel(in_dim=32, out_dim=10) # 下面的方式恢复模型,使用in_dim=32和out_dim=10为保存的参数 model = LitModel.load_from_checkpoint(PATH) # 当然也可以覆盖这些参数,例如改成in_dim=128, out_dim=10 model = LitModel.load_from_checkpoint(PATH, in_dim=128, out_dim10)
4.4 恢复模型和Trainer
如果不仅仅是想恢复模型,而且还要接着训练,则可以恢复Trainer
model = LitModel() trainer = Trainer(resume_from_checkpoint='some/path/to/my_checkpoint.ckpt') # 自动恢复模型、epoch、step、学习率信息(包括LR schedulers),精度等 # automatically restores model, epoch, step, LR schedulers, apex, etc... trainer.fit(model)
五、训练辅助工具
1、Early Stopping
监控
validation_step()
方法步骤中某一个量,如果其不能再变得更优,则提前停止训练pytorch_lightning.callbacks.early_stopping.EarlyStopping( monitor='early_stop_on', min_delta=0.0, patience=3, verbose=False, mode='auto', strict=True, check_finite=True, stopping_threshold=None, divergence_threshold=None, check_on_train_epoch_end=None
monitor
(str) – 监控的量;默认为:early_stop_on;可以通过self.log(‘var_name’, val_loss)来标记要监控的量min_delta
(float) – 最小的改变量;默认:0.0;即当监控的量的绝对值变量量小于该值,则认为没有新的提升patience
(int) - 默认:3;如果监控的量持续patience 个epoch没有得到更好的提升,则停止训练;verbose
(bool) – 默认:False;mode
(str) – {auto, min, max}中的一个,跟前面的ModelCheckpoint中的mode是一样的含义。如果monitor监控的是val_loss等越小就表示模型越好的,这个参数应该被设置成’min’,当monitor监控的是val_acc(校验准确度)等越大就表示模型训练的越好的量,则应该设置成’max’。auto会自动根据monitor的名字来判断(auto模式是个人理解,可能会出错,例如你编程的时候,你就喜欢用val_loss表示模型准确度这样就会导致保存的模型是最差的模型了)。strict
(bool) – 默认True;如果监控器没有在validation_step()函数中找到你监控的量,则强制报错,中止训练;比如设置如下,要监控的量为
val_loss
:
pytorch_lightning.callbacks.early_stopping.EarlyStopping(monitor='val_loss', min_delta=0.1, patience=3)
2、Logging
这里只涉及Tensorboard, 其它有需要的可参考官方文档Logging,tensorboard 有两种基本的方法:一种是只适用于scaler,可直接使用self.log(),另一种是图像、权重等。
# 在定义Trainer对象的时候,传入tensorboardlogger logger = TensorBoardLogger(args['log_dir'], name='DCK_PL') trainer = pl.Trainer(logger=logger) # 获取tensorboard Logger, 以在validation_step()函数为例 def validation_step(): tensorboard = self.logger.experiment # 例如求得validation loss为: loss = ... # 直接log self.log('val_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True) # 如果是图像等,就需要用到tensorboard的API tensorboard.add_image() # 同时log多个 other_loss = ... loss_dict = {'val_loss': loss, 'loss': other_loss} tensorboard.add_scalars(loss_dict) # log 权重等 tensorboard.add_histogram(...)
注意如果是用anaconda的话,要先激活你的env,另外要注意的是,–logdir=my_log_dir/, 这里的logdir要到version_0/目录,该目录下保存有各种你log的变量的文件夹
# 查看的方法跟tensorboard是一样的,在终端下 (base) C:\whx-study-pytorch-lightning\my_logs\WHX_PL\version_0>tensorboard --logdir ./ 2022-03-19 20:18:17.460974: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library cudart64_110.dll Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all TensorBoard 2.4.0 at http://localhost:6006/ (Press CTRL+C to quit)
当然也可以继承
LightningLoggerBase
类来自定义Logger,这个自己看官方文档
3、optimizer 和 lr_scheduler
当然,在训练过程中,对学习率的掌控也是非常重要的,合理设置学习率有利于提高效果,学习率衰减可查看 pytorch必须掌握的的4种学习率衰减策略。那在pytorch_lightning 中如何设置呢?其实跟pytorch是一样的,基本上不需要修改。
重写configure_optimizers()函数即可:
# 设置优化器 def configure_optimizers(self): weight_decay = 1e-6 # l2正则化系数 # 假如有两个网络,一个encoder一个decoder optimizer = optim.Adam([{'encoder_params': self.encoder.parameters()}, {'decoder_params': self.decoder.parameters()}], lr=learning_rate, weight_decay=weight_decay) # 同样,如果只有一个网络结构,就可以更直接了 optimizer = optim.Adam(my_model.parameters(), lr=learning_rate, weight_decay=weight_decay) # 我这里设置2000个epoch后学习率变为原来的0.5,之后不再改变 scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[2000], gamma=0.5) optim_dict = {'optimizer': optimizer, 'lr_scheduler': scheduler} return optim_dict
def configure_optimizers(self): """Prepare optimizer and schedule (linear warmup and decay)""" model = self.model no_decay = ["bias", "LayerNorm.weight"] optimizer_grouped_parameters = [ "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], "weight_decay": self.hparams.weight_decay, "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], "weight_decay": 0.0, optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon) scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=self.hparams.warmup_steps, num_training_steps=self.total_steps) scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1} return [optimizer], [scheduler]
这样就OK了,只要在
training_step()
函数中返回了loss,就会自动反向传播,并自动调用loss.backward()
和optimizer.step()
和scheduler .step()
了4、多优化器用于多模型等网络结构
当我们训练的是复杂的网络结构时,可能有多个模型,需要不同的训练顺序,不同的训练学习率等,这时候就需要设计多个优化器,并手动调用梯度反传函数
# multiple optimizer case (e.g.: GAN) def configure_optimizers(self): opt_d = Adam(self.model_d.parameters(), lr=0.01) opt_g = Adam(self.model_g.parameters(), lr=0.02) return opt_d, opt_g
然后要关掉自动优化,这样就可以跟pytorch一样手动控制优化器的权重更新了,达到了跟pytorch一样可以进行复杂地更新顺序等地控制,同时pytorch lightning的优势还在,例如多GPU下batchnorm的参数同步等。
# 在new Trainer对象的时候,把自动优化关掉 trainer = Trainer(automatic_optimization=False)
这时候
training_step()
函数也就不是直接返回 loss 或者 字典了,而是不需要返回loss了,因为在该函数里就手动完成权重更新函数地调用。另外需要注意的是:
- 不再使用
loss.backward()
函数,改用self.manual_backward(loss, opt)
,就可以实现半精度训练。- 忽略
optimizer_idx
参数def training_step(self, batch, batch_idx, opt_idx): # 获取在configure_optimizers()中返回的优化器 (opt_d, opt_g) = self.optimizers() loss_g = self.acquire_loss_g() # 注意:不再使用loss.backward(). 另外以GAN为例,因为生成器的动态图还要保持给判别器用于更新,因此retain_graph=True. self.manual_backward(loss_g, opt_g, retain_graph=True) # 销毁动态图 self.manual_backward(loss_g, opt_g) opt_g.step() # 在更新判别器的时候,保存生成器是0梯度的 opt_g.zero_grad() # 更新判别器 loss_d = self.acquire_loss_d() self.manual_backward(loss_d, opt_d)
其他比较重要的设置主要有同步BatchNorm的参数、采用半精度训练(原来apex的特色,不过PL比apex更香),多gpu 训练等
5、多GPU训练
如果是CPU训练,在定义Trainer时不管gpus这个参数就可以了,或者设置该参数为0:
trainer = pl.Trainer(gpus=0)
而多GPU训练,也是很方便,只要将该参数设置为你要用的gpu数就可以,例如用4张GPU:
trainer = pl.Trainer(gpus=4)
而如果你有很多张GPU,但是要跟同学分别使用,只要在程序最前面设置哪些GPU可用就可以了,例如服务器有4张卡,但是你只能用0和2号卡:
import os os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2' trainer = pl.Trainer(gpus=2)
6、半精度训练
半精度训练也是Apex的一大特色,可以在几乎不影响效果的情况下降低GPU显存的使用率(大概50%),提高训练速度,现在pytorch_lightning 统统都给你,可以只要设置一下参数就可以:
trainer = pl.Trainer(precision=16)
7、累积梯度
默认情况是每个batch 之后都更新一次梯度,当然也可以N个batch后再更新,这样就有了大batch size 更新的效果了,例如当你内存很小,训练的batch size 设置的很小,这时候就可以采用累积梯度:
# 默认情况下不开启累积梯度 trainer = Trainer(accumulate_grad_batches=1)
8、自动缩放batch_size(不建议用)
这方法还有很多限制,直接
trainer.fit(model)
是无效的,感觉挺麻烦,不建议用大的batch_size 通过可以获得更好的梯度估计。但同时也要更长的时间,另外,如果内存满了,电脑会卡住动不了。
'power'
– 从batch size 为1 开始翻倍地往上找,例如1-->2 --> 4 --> ...
一直到内存溢出(out-of-memory, OOM);binsearch
也是翻倍地找,直OOM,但是之后还要继续进行一个二叉搜索,找到一个更好的 batch size。另外,搜索的 batch size 最大不会超过数据集的尺寸。# 默认不开启 trainer = Trainer(auto_scale_batch_size=None) # 自动找满足内存的 batch size trainer = Trainer(auto_scale_batch_size=None|'power'|'binsearch') # 加载到模型 trainer.tune(model)
9、保存所有超参数到模型中
将所有的模型超参数都保存到模型中,恢复模型时再也不用自己去拖动恢复模型中的超参数了,这点是太有特色了:
# 例如你传入的超参数字典为params_dict self.hparams.update(params_dict) # 直接将你的超参数更新到pl模型的超参数字典中 # 这样,在保存的时候就会保存超参数了 self.save_hyperparameters()
当然,对于我们训练的不同的模型,我们还是需要查看其超参数,可以通过将超参数字典保存到本地txt的方法,来以便后期查看
def save_dict_as_txt(list_dict, save_dir): with open(save_dir, 'w') as fw: if isinstance(list_dict, list): for dict in list_dict: for key in dict.keys(): fw.writelines(key + ': ' + str(dict.get(key)) + '\n') else: for key in list_dict.keys(): fw.writelines(key + ': ' + str(list_dict.get(key)) + '\n') fw.close() # 保存超参数字典到txt save_dict_as_txt(self.hparams, save_dir)
10、梯度剪裁
当需要避免发生梯度爆炸时,可以采用梯度剪裁的方法,这个梯度范数是通过所有的模型权重计算出来的:
# 默认不剪裁 trainer = Trainer(gradient_clip_val=0) # 梯度范数的上限为0.5 trainer = Trainer(gradient_clip_val=0.5)
11、设置训练的最小和最大epochs
默认最小训练1个epoch,最大训练1000个epoch。
trainer = Trainer(min_epochs=1, max_epochs=1000)
12、小数据集
当我们的数据集过大或者当我们进行debug时,不想要加载整个数据集,则可以只加载其中的一小部分。
默认是全部加载,即下面的参数值都为1.0
# 参训练集、校验集和测试集分别只加载 10%, 20%, 30%,或者使用int 型表示batch trainer = Trainer( limit_train_batches=0.1, # 模型情况下是 1.0 limit_val_batches=0.2, # 模型情况下是 1.0 limit_test_batches=0.3 # 模型情况下是 1.0
其中比较需要注意的是训练集和测试集比例的设置,因为pytorch_lightning 每次validation和test时,都是会计算一个epoch,而不是一个step,因此在训练过程中,如果你的validation dataset比较大,那就会消耗大量的时间在validation上,而我们实际上只是想要知道在训练过程中,模型训练的怎么样了,不需要跑完整个epoch,因此就可以将limit_val_batches设置的小一些。对于test,在训练完成后,如果我们不希望对所有的数据都进行test,也可以通过这个参数来设置。
13、提前校验,避免校验时出错导致浪费时间(num_sanity_val_steps)
另外,该框架有个参数
num_sanity_val_steps
,用于设置在开始训练前先进行num_sanity_val_steps
个 batch 的 validation,以免你训练了一段时间,在校验的时候程序报错,导致浪费时间。该参数在获得trainer的时间传入:# 默认为2个batch的validation trainer = Trainer(num_sanity_val_steps=2) # 关闭开始训练前的validaion,直接开始训练 trainer = Trainer(num_sanity_val_steps=0) # 把校验集都运行一遍(可能会浪费很多时间) trainer = Trainer(num_sanity_val_steps=-1)
14、异常处理
14.1 多GPU CUDA设备不同步问题
在进行多GPU训练过程中,当完成一个epoch或者运行到epoch的指定百分比后,会进行validation过程,完成validation后,报了一个错:
RuntimeError: All input tensors must be on the same device. Received cuda:2 and cuda:0
我在github上发起了一个issue,有人已经修复了这个bug,在后面新版的pytorch_lightning中应该会被修改了。
14.2 DataLoader的问题
RuntimeError: DataLoader worker (pid(s) 6700, 10620) exited unexpectedly
这个问题一般是多GPU跑的时候才会出现,主要是加载DataLoader的时候,num_works=0就可以了,另外,我在一个task里,设置的是num_works=8是OK了,但是到了另一个task中,图像更大了,可能是内存不够,加载数据集特别特别特别慢,几乎不动。
如遇到这个报错,减小batch_size,设置num_works=0,在定义trainer的时候,设置
trainer = pl.Trainer(distributed_backend='ddp') trainer = pl.Trainer(distributed_backend='dp')
参考资料:
pytorch_lightning 全程笔记
PyTorch Lightning工具学习
人工智能,从PyTorch到PyTorch Lightning简要介绍
【PyTorch Lightning】1.0 正式发布:从 0 到 1
Pytorch-Lightning基本使用步骤
人工智能,从PyTorch到PyTorch Lightning简要介绍
Pytorch-Lightning基本方法介绍
所有评论(0)