pytorch 代码snippet
全局种子
设置全局通用的随机种子
def all_seed(seed = 6666):
"""
设置随机种子
"""
np.random.seed(seed)
random.seed(seed)
# CPU
torch.manual_seed(seed)
# GPU
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
# python 全局
os.environ['PYTHONHASHSEED'] = str(seed)
# cudnn
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False
print(f'Set env random_seed = {seed}')
数据集
dataset处理
class FoodDataset(Dataset):
def __init__(self, path, tfm=test_tfm, files=None):
super(FoodDataset, self).__init__()
def __len__(self):
##相应内容
def __getitem__(self, idx):
##相应内容
模型定义
首先是block的模式
class BasicBlock(nn.Module):# 继承 torch 的 Module
def __init__(self, input_dim, output_dim):
super(BasicBlock, self).__init__()
self.block = nn.Sequential(
nn.Linear(input_dim, output_dim),
nn.ReLU(),
)
def forward(self, x):
x = self.block(x)
return x
class Classifier(nn.Module):
def __init__(self, input_dim, output_dim=41, hidden_layers=1, hidden_dim=256):
super(Classifier, self).__init__()
self.fc = nn.Sequential(
BasicBlock(input_dim, hidden_dim),
*[BasicBlock(hidden_dim, hidden_dim) for _ in range(hidden_layers)], # *[]将循环得到的解压
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
x = self.fc(x)
return x
以及像这样直接罗列
class Classifier(nn.Module):
def __init__(self):
super(Classifier, self).__init__()
# input 維度 [3, 128, 128]
self.cnn = nn.Sequential(
nn.Conv2d(3, 64, 3, 1, 1), # [64, 128, 128]
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [64, 64, 64]
nn.Conv2d(64, 128, 3, 1, 1), # [128, 64, 64]
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [128, 32, 32]
nn.Conv2d(128, 256, 3, 1, 1), # [256, 32, 32]
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [256, 16, 16]
nn.Conv2d(256, 512, 3, 1, 1), # [512, 16, 16]
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [512, 8, 8]
nn.Conv2d(512, 512, 3, 1, 1), # [512, 8, 8]
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [512, 4, 4]
)
self.fc = nn.Sequential(
nn.Linear(512*7*7, 1024),
nn.ReLU(),
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, 11)
)
def forward(self, x):
out = self.cnn(x)
out = out.view(out.size()[0], -1)
return self.fc(out)
训练函数
def trainer(train_loader, valid_loader, model, config, device, rest_net_flag=False):
# 对于分类任务, 我们常用cross-entropy评估模型表现.
criterion = nn.CrossEntropyLoss(ignore_index=-1, label_smoothing=0.1) # 交叉熵计算时,label范围为[0, n_classes-1]
# 初始化优化器
optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'], weight_decay=config['weight_decay'])
#学习率调度器,cosine退火
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=config['n_epochs'])
# 模型存储位置
save_path = config['save_path'] if rest_net_flag else config['resnet_save_path']
writer = SummaryWriter()#tensorboard记录工具
if not os.path.isdir('./models'):
os.mkdir('./models')
n_epochs, best_loss, step, early_stop_count = config['n_epochs'], math.inf, 0, 0
for epoch in range(n_epochs):
model.train()
loss_record = []
train_accs = []
train_pbar = tqdm(train_loader, position=0, leave=True)
for x, y in train_pbar:
optimizer.zero_grad()
x, y = x.to(device), y.to(device)
pred = model(x)
loss = criterion(pred, y)
loss.backward()
# 稳定训练的技巧
if config['clip_flag']:
grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)
optimizer.step()
scheduler.step()
step += 1
acc = (pred.argmax(dim=-1) == y.to(device)).float().mean()
l_ = loss.detach().item()
loss_record.append(l_)
train_accs.append(acc.detach().item())
train_pbar.set_description(f'Epoch [{epoch+1}/{n_epochs}]')
train_pbar.set_postfix({'loss': f'{l_:.5f}', 'acc': f'{acc:.5f}'})
mean_train_acc = sum(train_accs) / len(train_accs)
mean_train_loss = sum(loss_record)/len(loss_record)
writer.add_scalar('Loss/train', mean_train_loss, step)
writer.add_scalar('ACC/train', mean_train_acc, step)
model.eval() # 设置模型为评估模式
loss_record = []
test_accs = []
for x, y in valid_loader:
x, y = x.to(device), y.to(device)
with torch.no_grad():
pred = model(x)
loss = criterion(pred, y)
acc = (pred.argmax(dim=-1) == y.to(device)).float().mean()
loss_record.append(loss.item())
test_accs.append(acc.detach().item())
mean_valid_acc = sum(test_accs) / len(test_accs)
mean_valid_loss = sum(loss_record)/len(loss_record)
print(f'Epoch [{epoch+1}/{n_epochs}]: Train loss: {mean_train_loss:.4f},acc: {mean_train_acc:.4f} Valid loss: {mean_valid_loss:.4f},acc: {mean_valid_acc:.4f} ')
writer.add_scalar('Loss/valid', mean_valid_loss, step)
writer.add_scalar('ACC/valid', mean_valid_acc, step)
if mean_valid_loss < best_loss:
best_loss = mean_valid_loss
torch.save(model.state_dict(), save_path) # 保存最优模型
print('Saving model with loss {:.3f}...'.format(best_loss))
early_stop_count = 0
else:
early_stop_count += 1
if early_stop_count >= config['early_stop']:
print('\nModel is not improving, so we halt the training session.')
return
数据加载
_dataset_dir = config['dataset_dir']
train_set = FoodDataset(os.path.join(_dataset_dir,"training"), tfm=train_tfm)
train_loader = DataLoader(train_set, batch_size=config['batch_size'], shuffle=True, num_workers=0, pin_memory=True)
valid_set = FoodDataset(os.path.join(_dataset_dir,"validation"), tfm=test_tfm)
valid_loader = DataLoader(valid_set, batch_size=config['batch_size'], shuffle=True, num_workers=0, pin_memory=True)
# 测试级保证输出顺序一致
test_set = FoodDataset(os.path.join(_dataset_dir,"test"), tfm=test_tfm)
test_loader = DataLoader(test_set, batch_size=config['batch_size'], shuffle=False, num_workers=0, pin_memory=True)

说些什么吧!