引言:写不出的第一行

韵染流光完成后,我能清楚地说出训练循环的每个环节。

dataset从文件中提取第idx个样本,dataloader通过sampler控制采样顺序,collate把样本组装成batch,模型接收batch开始前向传播。为什么要padding?因为GPU需要形状一致的数据。为什么要mask?因为padding的0会影响学习。为什么sampler要按长度分组?因为可以减少padding浪费的空间。

我以为我理解了训练循环。

直到新项目开始,我想从零搭建训练框架——打开IDE,创建train.py,然后我愣住了。

脑子里的链路很清晰,但手不知道该敲什么。不是不知道collate该做什么,而是面对空白的 collate_fn函数,不知道第一行该写什么。不是不知道checkpoint该保存什么,而是看着 torch.save()的文档,不知道该传哪些参数。

我知道一个训练中需要哪些组件,每个组件要怎么实现,但我写不出一个完整的训练循环。缺的不是概念,是手感——那种"下一行代码自然就敲出来"的感觉。

这篇文章记录这个认知补全的过程——从理解链路到写出代码,中间隔着三百行必须逐行实现的练习。

这是对《韵染流光·其一:"亮一些的蓝"》的补遗。
那时我理解了训练循环的"为什么",现在我要知道“怎么做”。

从概念到代码,缺失的那一环

我理解了训练循环,在整个开发过程中,我能够发现代码里的逻辑问题,能够理解每个组件在做什么。我整理出了这些组件的职责和它们之间的关系,以为这样就足够了。

Dataset:样本的提取与返回

Dataset做的事情很简单:从语料文件中提取第idx个样本,然后返回它。

这里有个关键认知:dataset返回什么格式其实都没关系。字典、元组、列表,甚至是原始字符串——都可以。因为dataset的返回值不会直接进入模型,它会被传给collate函数。真正决定"进入模型的数据长什么样"的,是collate。

idx默认是从0到数据集长度-1的顺序,但这个顺序可以被sampler改变。如果不指定sampler,dataloader就按顺序采样;如果自定义了sampler,就按sampler给出的顺序。

Dataset只负责“给我idx,给你第idx个样本”这一件事,其余一切都和它无关。

Collate:batch的组装与转换

Collate接收一批样本——也就是一个batch的dataset返回值——把它们组装成模型需要的格式。

这是整个链路中最关键的环节。Dataset返回的可能是字符串、可能是列表,但模型需要的是tensor。所有的格式转换、padding、mask生成,都在collate里完成。

为什么要在这里做?因为只有拿到一批样本,才知道这个batch内部的最大长度,才能决定padding到多长。如果在dataset里做padding,要么浪费空间(所有样本都padding到全局最大长度),要么没法做(不知道其他样本的长度)。

Collate接收的是一个列表,列表里的每个元素是dataset的一次返回。然后,它把这个列表组装成模型需要的格式——输入和标签,都是batch维度的tensor。输入通常是 [batch, len] 形状(也可以是 [len, batch],取决于项目的batch_first设置),标签的形状则取决于任务——如果是从N个类别中选一个,标签通常是 [batch];如果允许同时选多个类别,标签就是 [batch, n_classes];如果是序列标注,标签可能是 [batch, len]。

Sampler:采样顺序的控制

Sampler决定dataloader按什么顺序提取样本。

有两种sampler:

普通Sampler返回一个索引序列,比如 [0, 1, 2, 3, ...]或者打乱后的 [3, 1, 5, 2, ...]。Dataloader拿到这个序列后,按照指定的batch_size自动分批。

BatchSampler返回的是已经分好组的batch索引列表,比如 [[0, 1, 2], [3, 4, 5], ...]。每个元素是一个batch包含的样本索引。这种sampler既控制顺序,也控制分组。

按长度分组就是用BatchSampler实现的。它会把长度相近的样本索引放在一起,让它们进入同一个batch。这样padding时,整个batch只需要填充到batch内的最大长度,而不是全局最大长度,减少了空间浪费。

那怎么决定以什么为基准分组?看输入和输出哪个更不平衡。如果输入长度差异大,就按输入长度分组;如果输出长度差异大,就按输出长度分组。

Padding与Mask:形状统一与信息保护

GPU的运算特性决定了它需要形状一致的数据。但样本天然是不等长的——在词嵌入之后,一句话可能5个 token,另一句可能50个token。

所以需要padding:把短样本填充到batch内的最大长度。填充值通常是0,但其实填什么都可以,反正mask都能标记出来。

Mask的作用是告诉模型:哪些位置是真实数据,哪些是填充的。模型在计算attention、计算损失时,会根据mask跳过填充位置。

这里有个容易混淆的地方:PyTorch的 src_key_padding_mask约定是,True表示"这个位置需要被排除",False表示"这个位置是真实数据"。但在构建mask时,我习惯用1表示真实数据、0表示padding(符合直觉),所以在传给模型之前,需要依据实际情况转换一下mask中True/False的含义。

Model:接收与前向传播

Model接收的是collate处理后的batch。

具体接收什么参数,取决于任务和模型设计。可能是 model(input_ids, attention_mask),也可能是 model(src, tgt, src_mask),随便你。PyTorch的抽象层级有三四层楼那么高,设计空间相当之大,只要你能收得回来,怎么设计都好。

这个地方唯一的约束是:collate的输出就是model的输入,两者需要对接好——也就是说,collate在逻辑上完全可以返回原始batch,在model中再token化、padding,只是这样会让model的职责变得复杂且低效,所以通常不会这么干。

前向传播就是模型根据输入计算输出的过程。输出是什么?本质上都是logits——未归一化的分数。

分类任务的输出是对每个类别的logits,生成任务的输出是对词表中每个token的logits。不管叫它"分类结果"还是"embedding",从模型的视角看,它只是一组数字。

模型其实不关心我们设计的任务结构是什么。你给它什么输入、定义什么损失函数,它就优化什么。它不知道"分类"是什么意思,不知道"生成下一个词"是什么概念,它只是一味地调整参数好让loss变小,只不过这个调整过程恰好产生了符合我们需求的模式,仅此而已。

写不出的第一行

我花了一些时间回顾韵染流光的训练过程,把那些零零散散的理解整理了一下,然后...我觉得可以了,是时候展开下一个项目,把理解深化一下了。

但当我打开IDE,创建train.py,面对空白的文件——我不知道第一行该写什么。

很奇怪的感觉,我知道每个组件都做了什么,我也知道单个组件内部的逻辑怎么写——sampler里怎么排序分组、dataset里怎么缓存偏移,这些基础逻辑我都会。但面对空白文件,要从零搭建整个训练系统时,我不知道该从哪里开始。

我知道每个环节是什么、为什么要这样,但这些知识没有转化成"第一行该写什么、第二行该写什么"的实现能力。

缺的不是概念,是手感。

我需要补上这一环。

代码三百行,散落的拼图

我打开新项目的目录,创建了基础的文件结构:dataset.py、collate.py、sampler.py、model.py、train.py。

然后开始写。不是完全从零开始,我写不出来——我参考韵染流光的实现,看一段、理解一段、写一段。我知道每个组件该做什么,但我需要把这些"该做什么"变成实际能跑的代码,从第一行开始,把它们用我的语言解释一遍。

这个过程很机械。我不需要思考"dataset应该返回什么",我已经知道答案了。我需要做的是:打开dataset.py,写 class,写 __init__,写 __getitem__...一行一行敲出来。

Dataset:提取样本

我打开新项目的dataset.py,准备写第一个类。

第一行:class ColumnDataset(Dataset):

然后是 __init__。需要什么?样本文件路径,肯定要。tokenizer也需要,我的规划中,这里会返回tokenize结果,collate只做batch的padding和构建。还有 max——这个项目的输入会很长,但基本模式比较固定,适合设置截断长度——我不会只传一个固定参数,我有一个 Config集中管理项目配置。

def __init__(self, sample_file: Path, tokenizer: Tokenizer, config: Config):
    self._prepare_offset()

然后,这个项目的样本文件会很大,如果直接放进内存,每次训练都要忍受漫长的IO处理。解决方案是:内存中保存文件的行位置偏移(file.readline -> file.tell),一行一个偏移,整个文件就是一个列表,然后存进一个pkl。

_prepare_offset里做什么?判断pkl是否存在,存在则直接使用,返回,不存在则继续后续流程——读文件,解析成样本列表,具体格式取决于语料文件的结构和设计——可能是JSON,可能是CSV,反正读进来,解析成一个列表,保存pkl。当然,我这里是一个int行偏移列表。

然后是 __len__,这个简单:

def __len__(self) -> int:
    return len(self.offsets)

关键是 __getitem__。它接收idx,返回第idx个样本。因为用了offset而不是samples列表,所以获取第idx样本时步骤多一些,首先获取第idx所在的偏移(offsets),文件跳转偏移(file.seek),然后 readline,再进行tokenizer之类的处理。需要注意,max_len的截断应该在tokenizer之后进行——模型处理的是token序列,不是原始文本。

def __getitem__(self, idx: int) -> TokenizedColumnSample:
    offset = file.seek(self.offsets[idx])
    sample = file.readline()
    input_, target = parse(sample)
    return TokenizedColumnSample(tensor(tokenizer.encode(input_)[:max_len]), ...)

这里我定义了一个 Sample类,而不是返回字典或元组。这样在后续的collate和训练循环中,IDE能准确提示Sample有哪些字段,拼写错误能在写代码时就被发现。

Sample类长什么样?取决于任务。对于我的项目,它是这样的:

@dataclass
class TokenizedColumnSample:
    """
    单个列训练样本,已tokenized
  
    Attributes:
        input: 编码后的输入tokens
        target: 编码后的输出tokens
    """
    input: Tensor
    target: Tensor

对于其他任务,可能是 input_textlabel,或者 srctgt

Dataset就写完了。很简单,它只做一件事:给我idx,给你第idx个样本的tokenize结果,不做padding,不做任何额外转换。

Collate:组装Batch

Collate接收的是一个列表,列表里的每个元素是 Sample。它要把这个列表组装成batch。

我新建collate.py,定义一个函数——理论上来说定义一个类会更好,可以封装配置参数(比如padding方向),但这个collate逻辑很简单,用函数就够了。

def collate_fn(batch: list[TokenizedColumnSample]) -> BatchedColumnSample:

那么这个方法内部做什么?

第一步:找到这个batch内的最大长度,一个max就好了。

max(len(input_id) for input_id in input_ids)

第二步:创建padding后的tensor,所有样本都padding到batch内最大长度。填充值通常用0,但也可以用其他值——就像之前说的,反正有mask标记,有个填充让形状规整即可。

padding_length = max_input_len - len(input_id)
# 简单的右填充
padded = pad(input=input_id, pad=(0, padding_length), value=PAD_TOKEN_ID)

第三步:生成mask。记录哪些位置是真实数据,哪些是padding。我这里选择符合直觉的方案——1是真实数据,0是padding。

mask = torch.cat([torch.ones(len(input_id)), torch.zeros(padding_length)])

第四步:组装成Batch对象返回。

BatchedColumnSample(
    # [batch, max_input_len]
    input_ids=torch.stack(padded_inputs),
    # [batch, max_input_len]
    attention_masks=torch.stack(attention_masks),
    # [batch, n_classes]
    # loss计算需要标准类型为float
    labels=torch.stack(labels).float(),
)

同样,我定义了一个 Batch类:

@dataclass
class BatchedColumnSample:
    """
    分组后,批次内数据padding之后的结构
  
    Attributes:
        input_ids: [batch, max_input_len]
        attention_masks: [batch, max_input_len]
        labels: [batch, n_classes]
    """

    input_ids: Tensor
    attention_masks: Tensor
    labels: Tensor

Batch的具体字段取决于任务,但关键是用类而不是字典。这样在训练循环里写 batch.input_ids时,IDE知道这是个Tensor,拼写错误会立刻报红。

Collate的核心逻辑就是:遍历、padding、组装。没有什么复杂的,但需要仔细处理tensor的形状,确保padding对齐。

Sampler:批次编排

Sampler控制采样顺序。如果想控制批次策略,需要实现BatchSampler。

新建sampler.py:

class LengthGroupSampler:
    def __init__(self, dataset_type: str, dataset: Subset[ColumnDataset], batch_size: int, drop_last: bool = False):
        self.dataset = dataset
        self.batch_size = batch_size
        self.drop_last = drop_last
        self.length_groups = self._group_by_length()

_group_by_length里做什么?

第一步:遍历dataset获取所有样本的长度 (length, idx)

第二步:按长度排序——这里需要提前统计输入和输出的长度分布,以更不均匀的那个为基准。

第三步:分组,每batch_size个样本划分成一个batch。因为已经排序了,所以同一batch内的样本长度接近。

最后返回一个列表,每个元素是一个batch包含的样本索引。整个遍历、排序、分组过程很慢,但完成后结果基本不会再变,可以用pkl之类的方式缓存——但缓存时需要区分不同数据集,所以我有 dataset_type,分别标识训练、验证、测试数据集的缓存结果。

def _group_by_length(self) -> list[list[int]]:

Sampler需要支持遍历:

def __iter__(self) -> Iterator[list[int]]:
    """生成批次序列"""

    batches: list[list[int]] = []

    # 为每个长度组创建批次
    for indices in self.length_groups:
        # 处理 drop_last 逻辑
        if len(indices) < self.batch_size and self.drop_last:
            continue

        random.shuffle(indices)  # 组内随机
        batches.append(indices)

    random.shuffle(batches)  # 批次间随机
    return iter(batches)

我这里没什么特别的逻辑,所以直接使用 iter,就不用循环搭配yield了。

Sampler就完成了。

Model的架构与前向传播

Model接收Batch对象,进行前向传播。

class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, ...):
        super().__init__()
        # 定义层
  
    def forward(self, batch: Batch) -> torch.Tensor:
        input_ids = batch.input_ids
        attention_mask = batch.attention_mask
  
        # 前向传播逻辑
        # ...
  
        return logits  # [batch, num_classes] 或其他形状

前向传播就是根据输入计算输出。可能是embedding → transformer → 分类头,也可能是其他结构。具体设计取决于任务。

关键是确保:

  1. 接收的参数类型和collate的输出对应
  2. 返回的logits形状符合损失函数的要求

模型本身的实现没什么特别的,就是正常的PyTorch模型定义。

有一个需要注意的点,这里传递src_key_padding_mask时,需要和之前collate中的实现呼应——如果是按照通常直觉设计的1为真实数据0为padding,则需要将mask反转。

attention_masks = attention_masks == 0

拼图,各就各位

写完这些代码后,每个组件的理念、设计、实现就都没问题了。

Dataset怎么读数据、返回什么格式——清楚了。
Collate怎么组装batch、生成mask——清楚了。
Sampler怎么按长度分组——清楚了。
Model怎么接收输入、进行前向传播——清楚了。

每个部分单独拿出来,我都知道它在做什么,代码该怎么写。

但这还不够。我还不知道怎么把它们串联起来,写出一个完整的训练循环。

下一步,是把这些组件组装成系统。

手感的形成,齿轮的啮合

写完所有组件后,我需要把它们串起来。在动手之前,我先梳理了训练的本质。

3Blue1Brown的神经网络系列中有个经典场景:

   输入层           隐藏层1   隐藏层2     输出层

                      ●        ○
                      ●        ●
                      ○        ●
        a8            ●        ○         ○  0
      ,d88            ●        ○         ○  1
     a8P88            ●        ○         ○  2
   ,d8" 88            ●        ●         ○  3
  a8P'  88            ○        ○        [●] 4
,d8"    88            ●        ○         ○  5
888888888888          ●        ○         ○  6
        88            ○        ○         ○  7
        88            ●        ○         ○  8
        88            ○        ○         ○  9
                      ○        ●
                      ○        ○
                      ●        ○

输入图像"4",经过多层神经网络,最后输出层有10个节点,分别对应0-9。模型认为输入是"4",本质上就是输出层的第4个节点被激活了。

从输入,经过N层×M个节点,到达输出。这个过程中,哪些节点被激活、激活程度多少,决定了最终的输出结果。

那么训练是在做什么?

模型的结构是固定的,本质上是一个函数 f,权重就是决定 f行为的参数。训练就是:

  • 前向传播:用当前权重计算 f(input)
  • 计算损失:看 f(input)和期望输出差多少
  • 反向传播:计算"改变哪些权重、改变多少,能让结果更接近期望"
  • 更新权重:按照这个计算结果调整参数

循环这个过程,权重逐渐被调整到让 f的行为符合预期。

理清楚了这个,接下来的组装就水到渠成了。

训练循环的完整结构

我打开train.py,准备写训练循环。

首先是结构。训练循环有三层嵌套:

外层循环:epoch
遍历整个数据集多次。一个epoch就是把所有训练样本过一遍。

中层循环:batch

每个epoch内,按batch遍历DataLoader。之前写的sampler、dataset、collate在这里终于串联起来——DataLoader自动调用它们,给我一个个组装好的batch。

内层逻辑:step
每个batch的处理流程:前向传播 → 计算损失 → 反向传播 → 更新参数。

代码框架是这样的:

for epoch in range(num_epochs):
    for batch in dataloader:
        # 前向传播
        outputs = model(batch)
        loss = criterion(outputs, batch.labels)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

这个结构很清晰,但细节还需要补充。比如:什么时候保存checkpoint?什么时候打印日志?打印什么信息?验证集怎么处理?

把组件真正组装起来时,这些具体问题就需要逐个解决了。

把拼图组装起来

第一步是创建DataLoader。

之前写的dataset、sampler、collate,在这里串联起来:

dataset = ColumnDataset(data_dir / "samples/samples.txt", tokenizer, config)
train_dataset, val_dataset, _ = dataset.split()

train_loader = DataLoader(
    dataset=train_dataset,
    collate_fn=collate_fn,
    batch_sampler=LengthGroupSampler("train", train_dataset, config.batch_size, True),
    num_workers=4,
    pin_memory=True,
)

这几行代码把之前写的所有组件连起来了。DataLoader内部会:

  1. 用sampler生成batch的索引列表
  2. 用这些索引从dataset获取样本
  3. 把样本列表传给collate_fn组装成batch

我不需要手动调用这些方法,只需要遍历DataLoader:

for batch in train_loader:
    # batch就是collate_fn返回的BatchedColumnSample对象
    ...

然后是训练循环的完整实现:

model = Model(config).to(device)
optimizer = AdamW(model.parameters(), lr=config.lr)
criterion = BCEWithLogitsLoss()

for epoch in range(config.num_epochs):
    model.train()  # 设置训练模式
  
    for step, batch in enumerate(train_loader):
        # 数据迁移到GPU
        input_ids = batch.input_ids.to(self.config.device)
        attention_masks = batch.attention_masks.to(self.config.device)
        labels = batch.labels.to(self.config.device)
  
        # 前向传播
        logits = model(batch)  # [batch, num_classes]
        loss = criterion(logits, batch.labels)
  
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
  
        # 打印日志
        if step % config.log_interval == 0:
            print(f"Epoch {epoch}, Step {step}, Loss: {loss.item():.4f}")

写到这里,我理解了几个之前模糊的点:

model.train()是什么?
设置模型为训练模式。某些层(比如Dropout、BatchNorm)在训练和推理时行为不同,这个方法告诉它们"现在是训练阶段"。

为什么要 optimizer.zero_grad()
PyTorch的梯度是累加的。如果不清零,这次反向传播计算的梯度会加到上次的梯度上,导致错误的参数更新。

@dataclass
class BatchedColumnSample:
    input_ids: Tensor
    attention_masks: Tensor
    labels: Tensor
  
    def to(self, device):
        return BatchedColumnSample(
            input_ids=self.input_ids.to(device),
            attention_masks=self.attention_masks.to(device),
            labels=self.labels.to(device)
        )

这些细节在概念层面我都知道,但写出完整代码时,才发现它们是怎么配合的——train()在循环外调用一次,zero_grad()在每个step都要调用,to(device)要在前向传播之前。

组件串起来了,训练循环能跑了。

Checkpoint:保存什么、何时保存

训练循环跑起来了,但还缺一个关键功能:checkpoint。

训练可能需要几个小时,甚至几天。如果中途断电、程序崩溃、或者想换个超参数重新试,不能从头开始。需要一个机制:定期保存训练状态,能随时恢复。

最开始,我以为checkpoint就是保存模型权重:

torch.save(model.state_dict(), "checkpoint.pt")

但写完训练循环后,我发现这不够。

问题1:optimizer的状态去哪了?

optimizer内部维护了每个参数的梯度动量、自适应学习率等状态。如果只保存模型权重,恢复时optimizer从初始状态开始,训练曲线会出现明显的波动——就像车开到一半突然换了个新司机,需要重新适应。

所以checkpoint需要同时保存optimizer:

torch.save({
    'model_state': model.state_dict(),
    'optimizer_state': optimizer.state_dict(),
}, "checkpoint.pt")

问题2:训练到第几轮了?

如果我在第50个epoch保存checkpoint,恢复时应该从第51个epoch开始。但如果只保存模型和优化器,我不知道应该从哪里继续。

所以需要保存epoch:

torch.save({
    'model_state': model.state_dict(),
    'optimizer_state': optimizer.state_dict(),
    'start_epoch': epoch + 1,  # 下次从这里开始
}, "checkpoint.pt")

问题3:最好的模型在哪个epoch?

训练过程中,验证集的表现可能先上升后下降。我需要记录哪个epoch的模型最好、当时的分数是多少。如果后面几轮表现变差,可以直接用最好的那个checkpoint。

torch.save({
    'model_state': model.state_dict(),
    'optimizer_state': optimizer.state_dict(),
    'start_epoch': epoch + 1,
    'best_score': best_score,
    'best_epoch': best_epoch,
}, "checkpoint.pt")

问题4:早停机制的计数器?

如果验证集连续N轮没提升,就停止训练。这个"连续N轮"的计数器也需要保存,否则恢复后会重新计数,导致早停逻辑失效。

实际项目中,checkpoint可能还需要保存其他状态。比如学习率调度器的状态、自定义的dropout调度器、甚至训练过程中的loss历史。我的项目里就保存了lr_scheduler和自定义的dp_scheduler:

torch.save({
    'model_state': model.state_dict(),
    'optimizer_state': optimizer.state_dict(),
    'lr_scheduler_state': lr_scheduler.state_dict(),
    'dp_scheduler_state': dp_scheduler.state_dict(),
    'best_score': best_score,
    'best_epoch': best_epoch,
    'early_stop_count': early_stop_count,
    'start_epoch': epoch + 1,
}, f"checkpoint_best.pt")

何时保存?

通常是每个epoch结束后,验证集评估完成时。如果这个epoch的表现超过历史最好,就保存checkpoint。

for epoch in range(start_epoch, num_epochs):
    # 训练
    train_one_epoch(...)
  
    # 验证
    val_score = validate(...)
  
    # 保存最佳checkpoint
    if val_score > best_score:
        best_score = val_score
        best_epoch = epoch
        save_checkpoint(...)

写到这里,我理解了checkpoint的本质:它不只是保存模型权重,而是保存"训练的完整状态"——所有影响训练继续进行的信息,都需要被记录下来。

恢复训练时,只需要加载checkpoint,所有状态就回到了保存时的样子。

日志与监控

训练循环跑起来后,屏幕上只有不断滚动的进度条。我知道训练在进行,但不知道它在做什么、效果如何、有没有问题。

我需要日志。

最开始,我只打印loss:

print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

这能看到loss在下降,但信息太少了。训练速度怎么样?验证集表现如何?学习率有没有调整?我需要更完整的监控。

训练开始前,记录配置

logger.info("🚀 开始训练")
logger.info(f"    Epochs: {start_epoch} -> {num_epochs}")
logger.info(f"    模型参数: {model.param_num}")
logger.info(f"    当前学习率: {optimizer.param_groups[0]['lr']:.2e}")
logger.info(f"    最佳分数: {best_score}")

这些信息在调试时很重要。如果训练效果不理想,回头看日志能知道当时用的什么配置。

每个epoch,记录关键指标

logger.info(f"⏭️ Epoch {epoch}/{num_epochs}")
logger.info(f"    Train loss: {train_loss:.8f}")
logger.info(f"    Val loss: {val_loss:.8f}")
logger.info(f"    Score: {score:.8f}")
logger.info(f"    F1: {f1:.8f}")
logger.info(f"    Time: {minutes}m {seconds}s")

这里不只有loss,还有验证集的评估指标。我的项目用的是多标签分类,所以定义了一个综合指标:

score = 0.5 * f1 + 0.2 * precision + 0.2 * recall + 0.1 * em

这个score反映了模型的整体表现,用来判断是否保存checkpoint、是否触发早停。

超参数变化,单独记录

如果学习率调度器降低了学习率,或者自定义的dropout调度器调整了dropout概率,这些变化需要明确标记:

logger.info(f"🔄 学习率调整 ({epoch}): {old_lr:.2e} -> {new_lr:.2e}")
logger.info(f"🔄 Dropout调整 ({epoch}): {old_dp:.3f} -> {new_dp:.3f}")

这能帮我理解"为什么这个epoch的表现突然变化"——可能是学习率降低了,模型进入了更细致的优化阶段。

TensorBoard:可视化训练过程

纯文本日志适合快速查看,但看趋势不方便。我用TensorBoard记录关键指标,生成曲线图:

summary_writer.add_scalars("Training/Loss", {
    "train": train_loss,
    "val": val_loss,
}, epoch)
summary_writer.add_scalar("Training/Score", score, epoch)
summary_writer.add_scalars("Metrics", {
    "Precision": precision,
    "Recall": recall,
    "F1": f1,
}, epoch)

训练过程中或结束后,打开TensorBoard能看到loss的下降曲线、验证集指标的变化、超参数调整的时间点。这些图表比log文件中的数字直观得多。

早停和最佳模型的标记

如果验证集连续几轮没提升,早停计数器会递增。达到阈值时触发早停:

logger.info(f"⚠️ 接近早停阈值 ({early_stop_count}/{patience})")
logger.info(f"🚨 早停触发: 连续 {patience} 个epoch无提升")
logger.info(f"    最佳模型: Epoch {best_epoch}, Score={best_score:.8f}")

这些标记让我知道训练为什么停止、最好的模型在哪里。

训练结束,总结信息

logger.info(f"✅ 训练完成,总时长: {hours}h {minutes}m {seconds}s")
logger.info(f"    最佳分数: {best_score:.8f} (Epoch {best_epoch})")

日志不只是打印数字,它是理解训练过程的工具。loss在下降吗?验证集有过拟合吗?超参数调整有效吗?这些问题都能从日志中找到答案。

写完完整的训练循环后,我发现监控和调试的时间,可能比写代码的时间还长。

从分散到整体

项目阶段性完结后,我再打开train.py,不再像之前那样盯着空白文件发呆。

第一行该写什么?创建dataset。 然后呢?创建sampler,把dataset传进去。 collate怎么连接?作为DataLoader的参数。 训练循环的结构?epoch嵌套batch,每个batch做前向、反向、更新。 什么时候保存checkpoint?每个epoch验证后,如果超过历史最好就保存。 打印什么日志?train loss、val loss、指标、学习率、时间。

这些问题的答案,我现在能自然地回答出来。不是因为我背下来了代码,而是因为我理解了这些组件如何协作。

在韵染流光时期,我理解了每个组件是什么、为什么要这样设计。但这些理解是分散的——dataset做采样、collate做组装、sampler控制顺序、model做前向传播,它们各自独立,像散落的拼图。

代码三百行完成后,拼图组装起来了。我看到了它们如何咬合:

  • dataset的返回值,通过sampler的索引列表,进入collate的组装逻辑
  • collate生成的batch,被送入model的前向传播
  • model的输出,和batch里的labels一起计算loss
  • loss反向传播,更新model的权重,同时optimizer的状态也在变化
  • 每个epoch结束后,checkpoint把整个系统的状态保存下来
  • 日志记录着每个环节的运行情况,让我能看到训练的进展

这不是单个齿轮,而是一套啮合的传动系统。每个组件都有自己的职责,但它们共同构成了"训练"这个完整的行为。

之前我面对空白文件时的困惑——"第一行该写什么"——现在有了答案。不是因为我记住了模板,而是因为我理解了整个系统的运作方式。知道下一行该写什么,是因为我知道这一行在整个系统中的位置。

这种"下一行代码自然敲出来"的感觉,就是手感形成的过程。

从第一章的概念理解,到第二章的组件实现,再到第三章的系统组装,我补上了那条从"理解"到"实现"的路径。

韵染流光教会我"为什么",这三百行代码教会我"怎么做"。现在,我能从零搭建一个完整的训练体系了。

尾声:补全认知

韵染流光完成时,我以为自己理解了训练循环。

我能说出每个环节是什么、为什么要这样设计、它们之间的关系。dataset提取样本、collate组装batch、sampler控制顺序、model进行前向传播、loss反向更新权重。这些概念和链路,我都清楚。

但当我打开新项目的train.py,面对空白文件,我发现自己写不出第一行代码。

于是我开始了三百行代码的练习。从dataset到collate,从sampler到checkpoint,从训练循环到日志监控,我把每个组件逐行写了一遍。不是背模板,而是理解它们如何协作、如何组装成系统。

写完之后,"写不出的第一行"的困惑消失了。现在打开train.py,我知道第一行该写什么、接下来该连接哪些组件、每个环节该如何配合。不是因为我记住了代码,而是因为我有了手感——那种"下一行代码自然敲出来"的熟练度。

这篇文章是《韵染流光·其一:"亮一些的蓝"》的补遗。韵染流光记录了我对颜色语义建模的探索,那个过程让我理解了训练循环的概念和链路——"为什么要这样设计",此刻我补上"怎么做"的过程——从概念理解到代码实现,中间隔着的认知断层。

现在,我能从零搭建完整的训练体系了。认知补全了。

tabular-sense项目是这次练习的载体,但它不只是训练循环的练习场——它是一次完整的多标签分类探索。那会是另一个故事,我会在《数语觅类:"nl是什么?27是年龄吗?"》中记录。


💾 项目资源

完整源码:GitHub - tabular-sense