跳转到主要内容

RL 训练模块

AITraining 包含一个全面的强化学习模块,用于高级 LLM 训练场景。
CLI 命令(--trainer ppo--trainer dpo--trainer reward)使用 TRL 库实现以确保稳定性。此处记录的 autotrain.trainers.rl 模块为自定义 RL 训练流程提供较低级别的构建块。

概述

RL 模块提供:
  • PPO Trainer - 带 KL 惩罚和 GAE 的近端策略优化
  • DPO Trainer - 从偏好数据直接偏好优化
  • 奖励模型 - 标准、成对和多目标奖励模型
  • RL 环境 - 文本生成、数学、代码和偏好比较环境
  • 异步流程 - 带梯度累积的前向-后向训练

PPO 训练

配置

from autotrain.trainers.rl import PPOConfig, PPOTrainer

config = PPOConfig(
    model_name="google/gemma-2-2b",
    learning_rate=1e-5,
    batch_size=16,
    mini_batch_size=4,
    gradient_accumulation_steps=1,

    # PPO hyperparameters
    ppo_epochs=4,
    gamma=0.99,           # Discount factor
    lam=0.95,             # GAE lambda
    clip_ratio=0.2,       # PPO clip ratio
    value_clip=0.2,       # Value function clip
    max_grad_norm=1.0,    # Gradient clipping

    # KL penalty
    kl_penalty_coef=0.01,
    kl_target=0.01,
    kl_horizon=10000,     # Horizon for adaptive KL

    # Coefficients
    entropy_coef=0.01,    # Entropy regularization
    value_coef=0.5,       # Value function coefficient

    # Generation
    max_new_tokens=128,
    temperature=0.7,
    top_p=0.9,

    # Training loop
    num_iterations=100,
    save_every=10,
    eval_every=5,
    device=None,          # Auto-detected
)

PPO 架构

PPO 实现使用 PPOModel 包装器,为任何因果 LM 添加价值头:
# PPOModel wraps base model with ValueHead
class PPOModel(nn.Module):
    def __init__(self, base_model):
        self.base_model = base_model
        self.value_head = ValueHead(hidden_size)

# ValueHead architecture
class ValueHead(nn.Module):
    # hidden -> ReLU -> output (scalar value)

自适应 KL 控制器

AdaptiveKLController 自动调整 KL 惩罚系数,使 KL 散度保持在目标附近:
# Automatically managed by PPOTrainer
# Adjusts kl_penalty_coef based on current KL vs target

训练循环

# Initialize trainer with custom reward function
def my_reward_fn(prompts, responses, metadata=None):
    rewards = []
    for response in responses:
        score = evaluate_response(response)
        rewards.append(score)
    return rewards

trainer = PPOTrainer(
    config=config,
    tokenizer=tokenizer,  # Optional, loaded from model if not provided
    reward_fn=my_reward_fn,
)

# Train on prompts
prompts = ["Write a poem about...", "Explain quantum..."]
metrics = trainer.train(prompts, num_iterations=100)

关键特性

特性描述
Adaptive KL Controller根据当前 KL 与目标 KL 自动调整 KL 惩罚系数
GAE Advantage Estimation广义优势估计用于稳定训练
Value Head用于评论者的独立价值函数(PPOModel 包装基础模型)
Reference Model冻结副本以防止漂移
Async Training使用 AsyncTrainingClient 进行高效前向-后向

DPO 训练

直接从偏好数据训练,无需单独的奖励模型。

配置

from autotrain.trainers.rl import DPOConfig, DPOTrainer

config = DPOConfig(
    model_name="google/gemma-2-2b",
    learning_rate=1e-6,
    batch_size=8,
    gradient_accumulation_steps=2,

    # DPO hyperparameters
    beta=0.1,              # Temperature parameter
    label_smoothing=0.0,   # For robustness
    reference_free=False,  # Use reference model

    # Training
    num_epochs=1,
    max_grad_norm=1.0,
    warmup_ratio=0.1,

    # Sequence lengths
    max_length=512,
    max_prompt_length=256,

    # Checkpointing
    eval_every=100,
    save_every=500,
    device=None,           # Auto-detected
)

偏好数据集

from autotrain.trainers.rl.dpo import PreferenceDataset

# Create dataset from preference pairs
dataset = PreferenceDataset(
    prompts=["What is AI?", "Explain gravity"],
    chosen=["AI is artificial intelligence...", "Gravity is a force..."],
    rejected=["idk lol", "its like magnets"],
    tokenizer=tokenizer,
    max_length=512,
    max_prompt_length=256,
)

# Train
trainer = DPOTrainer(config=config, tokenizer=tokenizer)
metrics = trainer.train(dataset, eval_dataset=eval_dataset)
PreferenceDataset 必须直接从 autotrain.trainers.rl.dpo 导入,因为它未在主 __init__.py 中导出。

无参考 DPO

用于无需参考模型的训练:
config = DPOConfig(
    model_name="google/gemma-2-2b",
    reference_free=True,  # No reference model needed
    beta=0.1,
)

奖励模型

标准奖励模型

from autotrain.trainers.rl import RewardModel, RewardModelConfig, RewardModelTrainer

config = RewardModelConfig(
    model_name="bert-base-uncased",
    num_labels=1,
    pooling_strategy="last",  # "mean", "last", or "cls"
    dropout_prob=0.1,
    temperature=1.0,          # Temperature scaling for rewards

    # LoRA settings
    use_lora=True,
    lora_rank=8,
    lora_alpha=16,
    lora_dropout=0.1,

    # Training
    learning_rate=1e-4,
    warmup_steps=100,
    gradient_accumulation_steps=1,
)

model = RewardModel(config)

在偏好上训练

trainer = RewardModelTrainer(
    model=model,
    tokenizer=tokenizer,
    config=config,
    device=None,  # Auto-detected
)

trainer.train_on_preferences(
    chosen_texts=["Good response 1", "Good response 2"],
    rejected_texts=["Bad response 1", "Bad response 2"],
    num_epochs=3,
    batch_size=8,
)

# Save/load
trainer.save_model("reward_model.pt")
trainer.load_model("reward_model.pt")

成对奖励模型

使用 Bradley-Terry 模型进行直接偏好比较:
from autotrain.trainers.rl import PairwiseRewardModel

model = PairwiseRewardModel(config)

# Forward pass compares two inputs
preference_score = model.forward_pair(
    input_ids_a, attention_mask_a,
    input_ids_b, attention_mask_b,
)

# Bradley-Terry loss for training
loss = model.compute_bradley_terry_loss(
    input_ids_a, attention_mask_a,
    input_ids_b, attention_mask_b,
    labels,  # 1 if A preferred, 0 if B preferred
)

多目标奖励模型

组合多个奖励信号:
from autotrain.trainers.rl import MultiObjectiveRewardModel

model = MultiObjectiveRewardModel(
    config=config,
    num_objectives=3,
    objective_weights=[0.5, 0.3, 0.2],  # Helpfulness, safety, honesty
)

# Get all objectives
outputs = model(input_ids, attention_mask, return_all_objectives=True)
# outputs["rewards"] shape: (batch_size, 3)
# outputs["combined_reward"] shape: (batch_size, 1)

# Multi-objective loss
loss, per_objective_losses = model.compute_multi_objective_loss(
    input_ids, attention_mask,
    target_rewards,      # Shape: (batch_size, num_objectives)
    objective_mask=None, # Optional: which objectives to train
)

RL 环境

环境数据类

from autotrain.trainers.rl.environments import Observation, StepResult, Trajectory

# Observation from environment
@dataclass
class Observation:
    input_ids: torch.Tensor
    attention_mask: torch.Tensor
    prompt: str
    metadata: Dict[str, Any]

# Result from env.step()
@dataclass
class StepResult:
    reward: float
    done: bool
    next_observation: Optional[Observation]
    info: Dict[str, Any]
    metrics: Dict[str, float]

# Full episode trajectory
@dataclass
class Trajectory:
    observations: List[Observation]
    actions: List[torch.Tensor]
    rewards: List[float]
    logprobs: List[torch.Tensor]
    done: bool
    total_reward: float
    metrics: Dict[str, Any]

文本生成环境

from autotrain.trainers.rl import TextGenerationEnv

env = TextGenerationEnv(
    tokenizer=tokenizer,
    prompts=["Write a story about...", "Explain how..."],
    max_length=512,
    reward_fn=my_reward_function,  # Optional, default is length-based
    stop_sequences=["</s>", "\n\n"],
    temperature=1.0,
)

# Reset and step
observation = env.reset()
result = env.step(action_token)
# result.reward, result.done, result.next_observation

# Render current state
print(env.render())

多目标环境

from autotrain.trainers.rl import MultiObjectiveRewardEnv

def correctness_reward(prompt, generated, full_text):
    return 1.0 if is_correct(generated) else 0.0

def formatting_reward(prompt, generated, full_text):
    return 0.5 if properly_formatted(generated) else 0.0

env = MultiObjectiveRewardEnv(
    tokenizer=tokenizer,
    prompts=prompts,
    reward_components={
        "correctness": correctness_reward,
        "formatting": formatting_reward,
    },
    reward_weights={
        "correctness": 1.0,
        "formatting": 0.1,
    },
)

# Step returns component rewards in metrics
result = env.step(action)
# result.metrics["reward_correctness"], result.metrics["reward_formatting"]

偏好比较环境

用于 RLHF 和 DPO 数据收集:
from autotrain.trainers.rl import PreferenceComparisonEnv

env = PreferenceComparisonEnv(
    tokenizer=tokenizer,
    prompts=prompts,
    preference_model=preference_model,  # Optional
    human_feedback_fn=feedback_fn,      # Optional callback
    max_length=512,
)

# Generates pairs of responses and computes preference
observation = env.reset()
result1 = env.step(response1_tokens)  # First response
result2 = env.step(response2_tokens)  # Second response, computes preference

内置环境

from autotrain.trainers.rl import create_math_problem_env, create_code_generation_env

# Math problem solving (correctness + formatting rewards)
math_env = create_math_problem_env(tokenizer)

# Code generation (syntax + style rewards)
code_env = create_code_generation_env(tokenizer)

前向-后向流程

带梯度累积的异步训练:
from autotrain.trainers.rl import ForwardBackwardPipeline

# Low-level pipeline
pipeline = ForwardBackwardPipeline(
    model=model,
    device="cuda",
    max_workers=2,                    # Thread pool size
    gradient_accumulation_steps=4,
)

# Queue forward-backward pass
future = pipeline.forward_backward(
    input_ids=input_ids,
    attention_mask=attention_mask,
    labels=labels,
    loss_fn="cross_entropy",
)

# Get result (blocks until complete)
result = future.result()
print(f"Loss: {result.loss}")

# Queue optimizer step
optim_future = pipeline.optim_step(
    optimizer=optimizer,
    scheduler=scheduler,  # Optional
    max_grad_norm=1.0,
)
optim_result = optim_future.result()

内置损失函数

流程支持多个内置损失函数:
损失函数描述必需 kwargs
"cross_entropy"标准语言建模损失None
"importance_sampling"带重要性采样的 RLold_logprobsadvantages
"ppo"完整 PPO 损失old_log_probsadvantages,可选 valuesreturns

自定义损失函数

def custom_loss_fn(model, inputs, outputs, **kwargs):
    # Your custom loss computation
    logits = outputs.logits
    # ... compute loss ...
    return loss_tensor  # Must be scalar

future = pipeline.forward_backward_custom(
    input_ids=input_ids,
    custom_loss_fn=custom_loss_fn,
    attention_mask=attention_mask,  # Optional
    my_param=42,  # Passed to loss function via kwargs
)

高级客户端

from autotrain.trainers.rl.forward_backward import AsyncTrainingClient

client = AsyncTrainingClient(
    model=model,
    reference_model=reference_model,  # For PPO/DPO
    device="cuda",
    gradient_accumulation_steps=4,
)

# Training step
fwd_future = client.forward_backward(batch, loss_fn="cross_entropy")
optim_future = client.optim_step(optimizer, max_grad_norm=1.0)

# Forward only (for reference model)
ref_future = client.forward(batch, use_reference=True)

# Clean up
client.shutdown()
AsyncTrainingClient 必须直接从 autotrain.trainers.rl.forward_backward 导入,因为它未在主 __init__.py 中导出。

检查点

# Save checkpoint
checkpoint_info = pipeline.save_state("checkpoint_1000")
# Returns: {"path": ..., "model_path": ..., "optimizer_path": ..., "state_path": ...}

# Load checkpoint
pipeline.load_state("checkpoints/checkpoint_1000")

采样

在训练期间生成样本:
samples = pipeline.sample(
    prompt=prompt_tokens,  # List[int] or Tensor
    max_tokens=100,
    temperature=0.7,
    top_k=50,
    top_p=0.9,
    stop=[tokenizer.eos_token_id],
)

print(f"Generated: {samples['tokens']}")
print(f"Logprobs: {samples['logprobs']}")
print(f"Prompt: {samples['prompt']}")

最佳实践

PPO 训练

  1. 从小 KL 系数开始 - 让自适应控制器调整
  2. 使用梯度累积 - 更大的有效批量大小更稳定
  3. 监控 KL 散度 - 应保持在目标附近
  4. 预热价值函数 - 在完整 PPO 之前训练评论者

DPO 训练

  1. 高质量偏好数据 - 质量比数量更重要
  2. 低学习率 - 推荐 1e-6 到 1e-5
  3. 标签平滑 - 0.1 可以提高鲁棒性
  4. 频繁评估 - 跟踪准确性和奖励边际

奖励建模

  1. 平衡数据 - 相等的选择/拒绝示例
  2. 多样化提示词 - 覆盖预期用例
  3. LoRA 提高效率 - 高效微调大型模型
  4. 多目标 - 分离安全性和有用性信号

CLI 集成

对于生产使用,CLI 使用 TRL 实现提供更简单的接口:
# PPO training (uses TRL PPOTrainer)
aitraining llm --train \
  --model google/gemma-2-2b \
  --trainer ppo \
  --reward-model ./my-reward-model

# DPO training (uses TRL DPOTrainer)
aitraining llm --train \
  --model google/gemma-2-2b \
  --trainer dpo \
  --dpo-beta 0.1

# Reward model training
aitraining llm --train \
  --model google/gemma-2-2b \
  --trainer reward \
  --data-path ./preference_data.jsonl

下一步