| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- """
- 完整的Agentic RL训练流程(更新版)
- 从数据准备到模型部署的端到端示例
- 更新内容:
- 1. 修复了JSON解析问题
- 2. 添加了训练监控配置(wandb/tensorboard)
- 3. 支持详细日志输出
- """
- import sys
- import os
- # 添加HelloAgents到路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "HelloAgents"))
- from hello_agents.tools import RLTrainingTool
- import json
- from datetime import datetime
- class AgenticRLPipeline:
- """Agentic RL训练流水线"""
-
- def __init__(self, config_path="config.json"):
- """
- 初始化训练流水线
-
- Args:
- config_path: 配置文件路径
- """
- self.rl_tool = RLTrainingTool()
- self.config = self.load_config(config_path)
- self.results = {}
-
- def load_config(self, config_path):
- """加载配置文件"""
- with open(config_path, 'r') as f:
- return json.load(f)
-
- def log(self, message):
- """记录日志"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"[{timestamp}] {message}")
-
- def stage1_prepare_data(self):
- """阶段1: 数据准备"""
- self.log("=" * 50)
- self.log("阶段1: 数据准备")
- self.log("=" * 50)
-
- # 加载并检查数据集
- result = self.rl_tool.run({
- "action": "load_dataset",
- "format": "sft",
- "max_samples": self.config["data"]["max_samples"],
- })
-
- # 解析JSON结果
- dataset_info = json.loads(result)
- self.log(f"✓ 数据集加载完成")
- self.log(f" - 样本数: {dataset_info['dataset_size']}")
- self.log(f" - 格式: {dataset_info['format']}")
- self.log(f" - 数据列: {', '.join(dataset_info['sample_keys'])}")
-
- self.results["data"] = dataset_info
-
- return dataset_info
-
- def stage2_sft_training(self):
- """阶段2: SFT训练"""
- self.log("\n" + "=" * 50)
- self.log("阶段2: SFT训练")
- self.log("=" * 50)
-
- sft_config = self.config["sft"]
-
- result = self.rl_tool.run({
- "action": "train",
- "algorithm": "sft",
- "model_name": self.config["model"]["base_model"],
- "output_dir": sft_config["output_dir"],
- "max_samples": self.config["data"]["max_samples"],
- "num_epochs": sft_config["num_epochs"],
- "batch_size": sft_config["batch_size"],
- "use_lora": True,
- # 训练监控配置
- "use_wandb": self.config.get("monitoring", {}).get("use_wandb", False),
- "use_tensorboard": self.config.get("monitoring", {}).get("use_tensorboard", True),
- "wandb_project": self.config.get("monitoring", {}).get("wandb_project", None),
- })
-
- # 解析JSON结果
- result_data = json.loads(result)
-
- self.log(f"✓ SFT训练完成")
- self.log(f" - 模型路径: {result_data['output_dir']}")
- self.log(f" - 状态: {result_data['status']}")
-
- self.results["sft_training"] = result_data
-
- return result_data["output_dir"]
-
- def stage3_sft_evaluation(self, model_path):
- """阶段3: SFT评估"""
- self.log("\n" + "=" * 50)
- self.log("阶段3: SFT评估")
- self.log("=" * 50)
-
- result = self.rl_tool.run({
- "action": "evaluate",
- "model_path": model_path,
- "max_samples": self.config["eval"]["max_samples"],
- "use_lora": True,
- })
- eval_data = json.loads(result)
- self.log(f"✓ SFT评估完成")
- self.log(f" - 准确率: {eval_data['accuracy']}")
- self.log(f" - 平均奖励: {eval_data['average_reward']}")
- self.results["sft_evaluation"] = eval_data
- return eval_data
-
- def stage4_grpo_training(self, sft_model_path):
- """阶段4: GRPO训练"""
- self.log("\n" + "=" * 50)
- self.log("阶段4: GRPO训练")
- self.log("=" * 50)
-
- grpo_config = self.config["grpo"]
-
- result = self.rl_tool.run({
- "action": "train",
- "algorithm": "grpo",
- "model_name": sft_model_path,
- "output_dir": grpo_config["output_dir"],
- "max_samples": self.config["data"]["max_samples"],
- "num_epochs": grpo_config["num_epochs"],
- "batch_size": grpo_config["batch_size"],
- "use_lora": True,
- # 训练监控配置
- "use_wandb": self.config.get("monitoring", {}).get("use_wandb", False),
- "use_tensorboard": self.config.get("monitoring", {}).get("use_tensorboard", True),
- "wandb_project": self.config.get("monitoring", {}).get("wandb_project", None),
- })
-
- # 解析JSON结果
- result_data = json.loads(result)
-
- self.log(f"✓ GRPO训练完成")
- self.log(f" - 模型路径: {result_data['output_dir']}")
- self.log(f" - 状态: {result_data['status']}")
-
- self.results["grpo_training"] = result_data
-
- return result_data["output_dir"]
-
- def stage5_grpo_evaluation(self, model_path):
- """阶段5: GRPO评估"""
- self.log("\n" + "=" * 50)
- self.log("阶段5: GRPO评估")
- self.log("=" * 50)
-
- result = self.rl_tool.run({
- "action": "evaluate",
- "model_path": model_path,
- "max_samples": self.config["eval"]["max_samples"],
- "use_lora": True,
- })
- eval_data = json.loads(result)
- self.log(f"✓ GRPO评估完成")
- self.log(f" - 准确率: {eval_data['accuracy']}")
- self.log(f" - 平均奖励: {eval_data['average_reward']}")
- self.results["grpo_evaluation"] = eval_data
- return eval_data
-
- def stage6_save_results(self):
- """阶段6: 保存结果"""
- self.log("\n" + "=" * 50)
- self.log("阶段6: 保存结果")
- self.log("=" * 50)
-
- # 保存训练结果
- results_path = "training_results.json"
- with open(results_path, 'w') as f:
- json.dump(self.results, f, indent=2)
-
- self.log(f"✓ 结果已保存到: {results_path}")
-
- def run(self):
- """运行完整流程"""
- try:
- # 阶段1: 数据准备
- self.stage1_prepare_data()
-
- # 阶段2: SFT训练
- sft_model_path = self.stage2_sft_training()
-
- # 阶段3: SFT评估
- self.stage3_sft_evaluation(sft_model_path)
-
- # 阶段4: GRPO训练
- grpo_model_path = self.stage4_grpo_training(sft_model_path)
-
- # 阶段5: GRPO评估
- self.stage5_grpo_evaluation(grpo_model_path)
-
- # 阶段6: 保存结果
- self.stage6_save_results()
-
- self.log("\n" + "=" * 50)
- self.log("✓ 训练流程完成!")
- self.log("=" * 50)
-
- except Exception as e:
- self.log(f"\n✗ 训练失败: {str(e)}")
- raise
- # 使用示例
- if __name__ == "__main__":
- # 创建配置文件
- config = {
- "model": {
- "base_model": "Qwen/Qwen3-0.6B"
- },
- "data": {
- "max_samples": 100 # 使用100个样本快速测试
- },
- "sft": {
- "output_dir": "./models/sft_model",
- "num_epochs": 2,
- "batch_size": 4,
- },
- "grpo": {
- "output_dir": "./models/grpo_model",
- "num_epochs": 2,
- "batch_size": 2,
- },
- "eval": {
- "max_samples": 20,
- "sft_accuracy_threshold": 0.40
- },
- "monitoring": {
- "use_wandb": False, # 是否使用Wandb
- "use_tensorboard": True, # 是否使用TensorBoard
- "wandb_project": "agentic-rl-pipeline" # Wandb项目名
- }
- }
-
- # 保存配置
- with open("config.json", 'w') as f:
- json.dump(config, f, indent=2)
-
- # 运行训练流程
- pipeline = AgenticRLPipeline("config.json")
- pipeline.run()
|