""" 完整的Agentic RL训练流程(更新版) 从数据准备到模型部署的端到端示例 更新内容: 1. 修复了JSON解析问题 2. 添加了训练监控配置(wandb/tensorboard) 3. 支持详细日志输出 """ import sys import os # 添加HelloAgents到路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "HelloAgents")) from hello_agents.tools import RLTrainingTool import json from datetime import datetime class AgenticRLPipeline: """Agentic RL训练流水线""" def __init__(self, config_path="config.json"): """ 初始化训练流水线 Args: config_path: 配置文件路径 """ self.rl_tool = RLTrainingTool() self.config = self.load_config(config_path) self.results = {} def load_config(self, config_path): """加载配置文件""" with open(config_path, 'r') as f: return json.load(f) def log(self, message): """记录日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def stage1_prepare_data(self): """阶段1: 数据准备""" self.log("=" * 50) self.log("阶段1: 数据准备") self.log("=" * 50) # 加载并检查数据集 result = self.rl_tool.run({ "action": "load_dataset", "format": "sft", "max_samples": self.config["data"]["max_samples"], }) # 解析JSON结果 dataset_info = json.loads(result) self.log(f"✓ 数据集加载完成") self.log(f" - 样本数: {dataset_info['dataset_size']}") self.log(f" - 格式: {dataset_info['format']}") self.log(f" - 数据列: {', '.join(dataset_info['sample_keys'])}") self.results["data"] = dataset_info return dataset_info def stage2_sft_training(self): """阶段2: SFT训练""" self.log("\n" + "=" * 50) self.log("阶段2: SFT训练") self.log("=" * 50) sft_config = self.config["sft"] result = self.rl_tool.run({ "action": "train", "algorithm": "sft", "model_name": self.config["model"]["base_model"], "output_dir": sft_config["output_dir"], "max_samples": self.config["data"]["max_samples"], "num_epochs": sft_config["num_epochs"], "batch_size": sft_config["batch_size"], "use_lora": True, # 训练监控配置 "use_wandb": self.config.get("monitoring", {}).get("use_wandb", False), "use_tensorboard": self.config.get("monitoring", {}).get("use_tensorboard", True), "wandb_project": self.config.get("monitoring", {}).get("wandb_project", None), }) # 解析JSON结果 result_data = json.loads(result) self.log(f"✓ SFT训练完成") self.log(f" - 模型路径: {result_data['output_dir']}") self.log(f" - 状态: {result_data['status']}") self.results["sft_training"] = result_data return result_data["output_dir"] def stage3_sft_evaluation(self, model_path): """阶段3: SFT评估""" self.log("\n" + "=" * 50) self.log("阶段3: SFT评估") self.log("=" * 50) result = self.rl_tool.run({ "action": "evaluate", "model_path": model_path, "max_samples": self.config["eval"]["max_samples"], "use_lora": True, }) eval_data = json.loads(result) self.log(f"✓ SFT评估完成") self.log(f" - 准确率: {eval_data['accuracy']}") self.log(f" - 平均奖励: {eval_data['average_reward']}") self.results["sft_evaluation"] = eval_data return eval_data def stage4_grpo_training(self, sft_model_path): """阶段4: GRPO训练""" self.log("\n" + "=" * 50) self.log("阶段4: GRPO训练") self.log("=" * 50) grpo_config = self.config["grpo"] result = self.rl_tool.run({ "action": "train", "algorithm": "grpo", "model_name": sft_model_path, "output_dir": grpo_config["output_dir"], "max_samples": self.config["data"]["max_samples"], "num_epochs": grpo_config["num_epochs"], "batch_size": grpo_config["batch_size"], "use_lora": True, # 训练监控配置 "use_wandb": self.config.get("monitoring", {}).get("use_wandb", False), "use_tensorboard": self.config.get("monitoring", {}).get("use_tensorboard", True), "wandb_project": self.config.get("monitoring", {}).get("wandb_project", None), }) # 解析JSON结果 result_data = json.loads(result) self.log(f"✓ GRPO训练完成") self.log(f" - 模型路径: {result_data['output_dir']}") self.log(f" - 状态: {result_data['status']}") self.results["grpo_training"] = result_data return result_data["output_dir"] def stage5_grpo_evaluation(self, model_path): """阶段5: GRPO评估""" self.log("\n" + "=" * 50) self.log("阶段5: GRPO评估") self.log("=" * 50) result = self.rl_tool.run({ "action": "evaluate", "model_path": model_path, "max_samples": self.config["eval"]["max_samples"], "use_lora": True, }) eval_data = json.loads(result) self.log(f"✓ GRPO评估完成") self.log(f" - 准确率: {eval_data['accuracy']}") self.log(f" - 平均奖励: {eval_data['average_reward']}") self.results["grpo_evaluation"] = eval_data return eval_data def stage6_save_results(self): """阶段6: 保存结果""" self.log("\n" + "=" * 50) self.log("阶段6: 保存结果") self.log("=" * 50) # 保存训练结果 results_path = "training_results.json" with open(results_path, 'w') as f: json.dump(self.results, f, indent=2) self.log(f"✓ 结果已保存到: {results_path}") def run(self): """运行完整流程""" try: # 阶段1: 数据准备 self.stage1_prepare_data() # 阶段2: SFT训练 sft_model_path = self.stage2_sft_training() # 阶段3: SFT评估 self.stage3_sft_evaluation(sft_model_path) # 阶段4: GRPO训练 grpo_model_path = self.stage4_grpo_training(sft_model_path) # 阶段5: GRPO评估 self.stage5_grpo_evaluation(grpo_model_path) # 阶段6: 保存结果 self.stage6_save_results() self.log("\n" + "=" * 50) self.log("✓ 训练流程完成!") self.log("=" * 50) except Exception as e: self.log(f"\n✗ 训练失败: {str(e)}") raise # 使用示例 if __name__ == "__main__": # 创建配置文件 config = { "model": { "base_model": "Qwen/Qwen3-0.6B" }, "data": { "max_samples": 100 # 使用100个样本快速测试 }, "sft": { "output_dir": "./models/sft_model", "num_epochs": 2, "batch_size": 4, }, "grpo": { "output_dir": "./models/grpo_model", "num_epochs": 2, "batch_size": 2, }, "eval": { "max_samples": 20, "sft_accuracy_threshold": 0.40 }, "monitoring": { "use_wandb": False, # 是否使用Wandb "use_tensorboard": True, # 是否使用TensorBoard "wandb_project": "agentic-rl-pipeline" # Wandb项目名 } } # 保存配置 with open("config.json", 'w') as f: json.dump(config, f, indent=2) # 运行训练流程 pipeline = AgenticRLPipeline("config.json") pipeline.run()