{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 智能邮件助手(EmailSmartAssistant)\n", "\n", "这个Notebook实现了一个完整的智能邮件处理系统,包括:\n", "- 邮件自动分类\n", "- 智能回复草稿生成\n", "- 重要事项智能提醒\n", "- 邮件关键信息提取\n", "- 邮件归档整理" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. 导入必要的库" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import imaplib\n", "import smtplib\n", "import email\n", "from email.mime.text import MIMEText\n", "from email.mime.multipart import MIMEMultipart\n", "from email.header import decode_header\n", "import json\n", "import pandas as pd\n", "import numpy as np\n", "from datetime import datetime, timedelta\n", "import re\n", "import jieba\n", "from textblob import TextBlob\n", "from langdetect import detect\n", "from sklearn.feature_extraction.text import TfidfVectorizer\n", "from sklearn.naive_bayes import MultinomialNB\n", "from sklearn.pipeline import Pipeline\n", "import dateparser\n", "import arrow\n", "from jinja2 import Template\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns\n", "from tqdm import tqdm\n", "from rich.console import Console\n", "from rich.table import Table\n", "from rich.panel import Panel\n", "import warnings\n", "warnings.filterwarnings('ignore')\n", "\n", "# 设置中文字体\n", "plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']\n", "plt.rcParams['axes.unicode_minus'] = False\n", "\n", "console = Console()\n", "print(\"✅ 所有库导入成功!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. 配置加载" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 加载配置文件\n", "def load_config():\n", " try:\n", " with open('config/email_config.json', 'r', encoding='utf-8') as f:\n", " config = json.load(f)\n", " console.print(\"✅ 配置文件加载成功\", style=\"green\")\n", " return config\n", " except FileNotFoundError:\n", " console.print(\"❌ 配置文件未找到,请检查 config/email_config.json\", style=\"red\")\n", " return None\n", "\n", "# 加载回复模板\n", "def load_templates():\n", " try:\n", " with open('templates/reply_templates.json', 'r', encoding='utf-8') as f:\n", " templates = json.load(f)\n", " console.print(\"✅ 回复模板加载成功\", style=\"green\")\n", " return templates\n", " except FileNotFoundError:\n", " console.print(\"❌ 模板文件未找到,请检查 templates/reply_templates.json\", style=\"red\")\n", " return None\n", "\n", "config = load_config()\n", "templates = load_templates()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. 邮件连接和获取类" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class EmailConnector:\n", " def __init__(self, email_config):\n", " self.config = email_config\n", " self.imap_conn = None\n", " self.smtp_conn = None\n", " \n", " def connect_imap(self):\n", " \"\"\"连接IMAP服务器\"\"\"\n", " try:\n", " self.imap_conn = imaplib.IMAP4_SSL(self.config['imap_server'], self.config['imap_port'])\n", " self.imap_conn.login(self.config['email'], self.config['password'])\n", " console.print(f\"✅ IMAP连接成功: {self.config['email']}\", style=\"green\")\n", " return True\n", " except Exception as e:\n", " console.print(f\"❌ IMAP连接失败: {str(e)}\", style=\"red\")\n", " return False\n", " \n", " def get_emails(self, folder='INBOX', limit=50):\n", " \"\"\"获取邮件列表\"\"\"\n", " if not self.imap_conn:\n", " if not self.connect_imap():\n", " return []\n", " \n", " try:\n", " self.imap_conn.select(folder)\n", " status, messages = self.imap_conn.search(None, 'ALL')\n", " \n", " if status != 'OK':\n", " return []\n", " \n", " email_ids = messages[0].split()\n", " # 获取最新的邮件\n", " email_ids = email_ids[-limit:] if len(email_ids) > limit else email_ids\n", " \n", " emails = []\n", " for email_id in tqdm(email_ids, desc=\"获取邮件\"):\n", " status, msg_data = self.imap_conn.fetch(email_id, '(RFC822)')\n", " if status == 'OK':\n", " email_message = email.message_from_bytes(msg_data[0][1])\n", " emails.append(self.parse_email(email_message, email_id.decode()))\n", " \n", " return emails\n", " except Exception as e:\n", " console.print(f\"❌ 获取邮件失败: {str(e)}\", style=\"red\")\n", " return []\n", " \n", " def parse_email(self, email_message, email_id):\n", " \"\"\"解析邮件内容\"\"\"\n", " # 解码邮件头\n", " def decode_mime_words(s):\n", " return ''.join(\n", " word.decode(encoding or 'utf-8') if isinstance(word, bytes) else word\n", " for word, encoding in decode_header(s)\n", " )\n", " \n", " subject = decode_mime_words(email_message['Subject'] or '')\n", " sender = decode_mime_words(email_message['From'] or '')\n", " date = email_message['Date']\n", " \n", " # 获取邮件正文\n", " body = \"\"\n", " if email_message.is_multipart():\n", " for part in email_message.walk():\n", " if part.get_content_type() == \"text/plain\":\n", " try:\n", " body = part.get_payload(decode=True).decode('utf-8')\n", " break\n", " except:\n", " continue\n", " else:\n", " try:\n", " body = email_message.get_payload(decode=True).decode('utf-8')\n", " except:\n", " body = str(email_message.get_payload())\n", " \n", " return {\n", " 'id': email_id,\n", " 'subject': subject,\n", " 'sender': sender,\n", " 'date': date,\n", " 'body': body,\n", " 'raw_message': email_message\n", " }\n", " \n", " def close_connections(self):\n", " \"\"\"关闭连接\"\"\"\n", " if self.imap_conn:\n", " self.imap_conn.close()\n", " self.imap_conn.logout()\n", " if self.smtp_conn:\n", " self.smtp_conn.quit()\n", "\n", "print(\"✅ 邮件连接器类定义完成\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.0" } }, "nbformat": 4, "nbformat_minor": 4 } , { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. 邮件分类器" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class EmailClassifier:\n", " def __init__(self, config):\n", " self.config = config\n", " self.classification_rules = config['classification_rules']\n", " self.priority_rules = config['priority_rules']\n", " \n", " def classify_email_type(self, email_data):\n", " \"\"\"分类邮件类型\"\"\"\n", " subject = email_data['subject'].lower()\n", " body = email_data['body'].lower()\n", " sender = email_data['sender'].lower()\n", " \n", " text_content = f\"{subject} {body}\"\n", " \n", " # 检查垃圾邮件关键词\n", " spam_score = sum(1 for keyword in self.classification_rules['spam_keywords'] \n", " if keyword in text_content)\n", " if spam_score >= 2:\n", " return 'spam'\n", " \n", " # 检查工作邮件关键词\n", " work_score = sum(1 for keyword in self.classification_rules['work_keywords'] \n", " if keyword in text_content)\n", " \n", " # 检查客户咨询关键词\n", " customer_score = sum(1 for keyword in self.classification_rules['customer_keywords'] \n", " if keyword in text_content)\n", " \n", " # 检查个人邮件关键词\n", " personal_score = sum(1 for keyword in self.classification_rules['personal_keywords'] \n", " if keyword in text_content)\n", " \n", " # 根据得分确定类型\n", " scores = {\n", " 'work': work_score,\n", " 'customer': customer_score,\n", " 'personal': personal_score\n", " }\n", " \n", " return max(scores, key=scores.get) if max(scores.values()) > 0 else 'other'\n", " \n", " def classify_priority(self, email_data):\n", " \"\"\"分类邮件优先级\"\"\"\n", " subject = email_data['subject'].lower()\n", " body = email_data['body'].lower()\n", " sender = email_data['sender']\n", " \n", " text_content = f\"{subject} {body}\"\n", " \n", " # 检查高优先级发件人\n", " if any(priority_sender in sender for priority_sender in self.priority_rules['high_priority_senders']):\n", " return 'high'\n", " \n", " # 检查高优先级关键词\n", " high_priority_score = sum(1 for keyword in self.priority_rules['high_priority_keywords'] \n", " if keyword in text_content)\n", " if high_priority_score > 0:\n", " return 'high'\n", " \n", " # 检查低优先级关键词\n", " low_priority_score = sum(1 for keyword in self.priority_rules['low_priority_keywords'] \n", " if keyword in text_content)\n", " if low_priority_score > 0:\n", " return 'low'\n", " \n", " return 'medium'\n", " \n", " def classify_sender_type(self, email_data):\n", " \"\"\"分类发件人类型\"\"\"\n", " sender = email_data['sender'].lower()\n", " \n", " # 简单的发件人分类逻辑\n", " if any(domain in sender for domain in ['@company.com', '@work.com']):\n", " return 'colleague'\n", " elif 'noreply' in sender or 'no-reply' in sender:\n", " return 'system'\n", " elif any(keyword in sender for keyword in ['service', 'support', 'info']):\n", " return 'customer_service'\n", " else:\n", " return 'external'\n", " \n", " def classify_email(self, email_data):\n", " \"\"\"完整的邮件分类\"\"\"\n", " return {\n", " 'type': self.classify_email_type(email_data),\n", " 'priority': self.classify_priority(email_data),\n", " 'sender_type': self.classify_sender_type(email_data)\n", " }\n", "\n", "print(\"✅ 邮件分类器定义完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. 关键信息提取器" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class InformationExtractor:\n", " def __init__(self):\n", " # 时间相关的正则表达式\n", " self.date_patterns = [\n", " r'\\d{4}[-/]\\d{1,2}[-/]\\d{1,2}', # 2024-01-01 或 2024/01/01\n", " r'\\d{1,2}[-/]\\d{1,2}[-/]\\d{4}', # 01-01-2024 或 01/01/2024\n", " r'\\d{1,2}月\\d{1,2}日', # 1月1日\n", " r'\\d{1,2}/\\d{1,2}', # 1/1\n", " ]\n", " \n", " # 时间相关的关键词\n", " self.time_keywords = [\n", " '截止', 'deadline', '到期', '完成时间', '交付时间',\n", " '会议时间', '约定时间', '预定', '安排在'\n", " ]\n", " \n", " # 待办事项关键词\n", " self.todo_keywords = [\n", " '需要', '请', '要求', '完成', '处理', '准备',\n", " 'need', 'please', 'require', 'complete', 'prepare'\n", " ]\n", " \n", " def extract_dates(self, text):\n", " \"\"\"提取文本中的日期\"\"\"\n", " dates = []\n", " \n", " # 使用正则表达式提取日期\n", " for pattern in self.date_patterns:\n", " matches = re.findall(pattern, text)\n", " dates.extend(matches)\n", " \n", " # 使用dateparser解析更复杂的日期表达\n", " sentences = text.split('。')\n", " for sentence in sentences:\n", " if any(keyword in sentence for keyword in self.time_keywords):\n", " parsed_date = dateparser.parse(sentence)\n", " if parsed_date:\n", " dates.append(parsed_date.strftime('%Y-%m-%d'))\n", " \n", " return list(set(dates)) # 去重\n", " \n", " def extract_todos(self, text):\n", " \"\"\"提取待办事项\"\"\"\n", " todos = []\n", " sentences = text.split('。')\n", " \n", " for sentence in sentences:\n", " if any(keyword in sentence for keyword in self.todo_keywords):\n", " # 清理句子\n", " clean_sentence = sentence.strip()\n", " if len(clean_sentence) > 5: # 过滤太短的句子\n", " todos.append(clean_sentence)\n", " \n", " return todos\n", " \n", " def extract_contacts(self, text):\n", " \"\"\"提取联系人信息\"\"\"\n", " contacts = {\n", " 'emails': [],\n", " 'phones': []\n", " }\n", " \n", " # 提取邮箱地址\n", " email_pattern = r'\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b'\n", " contacts['emails'] = re.findall(email_pattern, text)\n", " \n", " # 提取电话号码\n", " phone_patterns = [\n", " r'1[3-9]\\d{9}', # 中国手机号\n", " r'\\d{3}-\\d{4}-\\d{4}', # 格式化电话\n", " r'\\(\\d{3}\\)\\s*\\d{3}-\\d{4}' # 美式电话格式\n", " ]\n", " \n", " for pattern in phone_patterns:\n", " contacts['phones'].extend(re.findall(pattern, text))\n", " \n", " return contacts\n", " \n", " def generate_summary(self, email_data):\n", " \"\"\"生成邮件摘要\"\"\"\n", " subject = email_data['subject']\n", " body = email_data['body']\n", " sender = email_data['sender']\n", " \n", " # 提取关键信息\n", " dates = self.extract_dates(body)\n", " todos = self.extract_todos(body)\n", " contacts = self.extract_contacts(body)\n", " \n", " # 生成摘要\n", " summary = {\n", " 'subject': subject,\n", " 'sender': sender,\n", " 'key_dates': dates,\n", " 'todo_items': todos[:3], # 最多3个待办事项\n", " 'contacts': contacts,\n", " 'body_preview': body[:200] + '...' if len(body) > 200 else body\n", " }\n", " \n", " return summary\n", "\n", "print(\"✅ 信息提取器定义完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6. 智能回复生成器" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ReplyGenerator:\n", " def __init__(self, templates, config):\n", " self.templates = templates\n", " self.config = config\n", " self.reply_settings = config['reply_settings']\n", " \n", " def detect_language(self, text):\n", " \"\"\"检测文本语言\"\"\"\n", " try:\n", " lang = detect(text)\n", " return 'zh' if lang == 'zh-cn' else 'en'\n", " except:\n", " return 'zh' # 默认中文\n", " \n", " def select_template(self, email_classification, email_data):\n", " \"\"\"根据邮件分类选择合适的模板\"\"\"\n", " email_type = email_classification['type']\n", " \n", " # 根据邮件类型选择模板\n", " if email_type == 'work':\n", " if '会议' in email_data['subject'] or 'meeting' in email_data['subject'].lower():\n", " return 'work_meeting'\n", " else:\n", " return 'general_acknowledgment'\n", " elif email_type == 'customer':\n", " return 'customer_inquiry'\n", " else:\n", " return 'general_acknowledgment'\n", " \n", " def generate_reply(self, email_data, email_classification):\n", " \"\"\"生成回复草稿\"\"\"\n", " # 选择模板\n", " template_key = self.select_template(email_classification, email_data)\n", " \n", " # 检测语言\n", " language = self.detect_language(email_data['body'])\n", " \n", " # 确定语气(正式/非正式)\n", " tone = 'formal' if self.reply_settings['formal_tone'] else 'casual'\n", " \n", " # 获取模板\n", " try:\n", " template_text = self.templates[template_key][tone][language]\n", " except KeyError:\n", " # 如果没有找到对应模板,使用通用确认模板\n", " template_text = self.templates['general_acknowledgment']['formal'][language]\n", " \n", " # 准备模板变量\n", " template_vars = {\n", " 'subject': email_data['subject'],\n", " 'timeframe': '24小时' if language == 'zh' else '24 hours',\n", " 'return_date': (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d'),\n", " 'emergency_contact': 'assistant@company.com'\n", " }\n", " \n", " # 渲染模板\n", " template = Template(template_text)\n", " reply_content = template.render(**template_vars)\n", " \n", " # 生成完整回复\n", " reply = {\n", " 'to': email_data['sender'],\n", " 'subject': f\"Re: {email_data['subject']}\",\n", " 'content': reply_content,\n", " 'template_used': template_key,\n", " 'tone': tone,\n", " 'language': language\n", " }\n", " \n", " return reply\n", "\n", "print(\"✅ 回复生成器定义完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7. 提醒管理器" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ReminderManager:\n", " def __init__(self, config):\n", " self.config = config\n", " self.reminder_settings = config['reminder_settings']\n", " self.reminders = []\n", " \n", " def create_reminders(self, email_data, extracted_info):\n", " \"\"\"根据提取的信息创建提醒\"\"\"\n", " reminders = []\n", " \n", " # 为每个关键日期创建提醒\n", " for date_str in extracted_info['key_dates']:\n", " try:\n", " target_date = datetime.strptime(date_str, '%Y-%m-%d')\n", " \n", " # 为每个提前天数创建提醒\n", " for advance_days in self.reminder_settings['advance_days']:\n", " reminder_date = target_date - timedelta(days=advance_days)\n", " \n", " # 只创建未来的提醒\n", " if reminder_date > datetime.now():\n", " reminder = {\n", " 'id': f\"{email_data['id']}_{date_str}_{advance_days}\",\n", " 'email_id': email_data['id'],\n", " 'email_subject': email_data['subject'],\n", " 'reminder_date': reminder_date,\n", " 'target_date': target_date,\n", " 'advance_days': advance_days,\n", " 'message': f\"提醒:{email_data['subject']} - 还有{advance_days}天到期({date_str})\",\n", " 'status': 'pending'\n", " }\n", " reminders.append(reminder)\n", " except ValueError:\n", " continue # 跳过无法解析的日期\n", " \n", " # 为待办事项创建提醒\n", " for todo in extracted_info['todo_items']:\n", " reminder = {\n", " 'id': f\"{email_data['id']}_todo_{hash(todo) % 10000}\",\n", " 'email_id': email_data['id'],\n", " 'email_subject': email_data['subject'],\n", " 'reminder_date': datetime.now() + timedelta(hours=2), # 2小时后提醒\n", " 'target_date': None,\n", " 'advance_days': 0,\n", " 'message': f\"待办事项提醒:{todo}\",\n", " 'status': 'pending'\n", " }\n", " reminders.append(reminder)\n", " \n", " self.reminders.extend(reminders)\n", " return reminders\n", " \n", " def get_pending_reminders(self):\n", " \"\"\"获取待处理的提醒\"\"\"\n", " now = datetime.now()\n", " pending = []\n", " \n", " for reminder in self.reminders:\n", " if (reminder['status'] == 'pending' and \n", " reminder['reminder_date'] <= now):\n", " pending.append(reminder)\n", " \n", " return pending\n", " \n", " def mark_reminder_sent(self, reminder_id):\n", " \"\"\"标记提醒已发送\"\"\"\n", " for reminder in self.reminders:\n", " if reminder['id'] == reminder_id:\n", " reminder['status'] = 'sent'\n", " break\n", " \n", " def get_reminders_summary(self):\n", " \"\"\"获取提醒摘要\"\"\"\n", " total = len(self.reminders)\n", " pending = len([r for r in self.reminders if r['status'] == 'pending'])\n", " sent = len([r for r in self.reminders if r['status'] == 'sent'])\n", " \n", " return {\n", " 'total': total,\n", " 'pending': pending,\n", " 'sent': sent\n", " }\n", "\n", "print(\"✅ 提醒管理器定义完成\")" ] } , { "cell_type": "markdown", "metadata": {}, "source": [ "## 8. 主程序 - 智能邮件助手" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class EmailSmartAssistant:\n", " def __init__(self, config, templates):\n", " self.config = config\n", " self.templates = templates\n", " \n", " # 初始化各个组件\n", " self.connector = None\n", " self.classifier = EmailClassifier(config)\n", " self.extractor = InformationExtractor()\n", " self.reply_generator = ReplyGenerator(templates, config)\n", " self.reminder_manager = ReminderManager(config)\n", " \n", " # 处理结果存储\n", " self.processed_emails = []\n", " self.processing_stats = {\n", " 'total_emails': 0,\n", " 'classified_emails': 0,\n", " 'replies_generated': 0,\n", " 'reminders_created': 0\n", " }\n", " \n", " def connect_email_account(self, account_index=0):\n", " \"\"\"连接邮箱账户\"\"\"\n", " if account_index >= len(self.config['email_accounts']):\n", " console.print(\"❌ 邮箱账户索引超出范围\", style=\"red\")\n", " return False\n", " \n", " account_config = self.config['email_accounts'][account_index]\n", " self.connector = EmailConnector(account_config)\n", " \n", " return self.connector.connect_imap()\n", " \n", " def process_emails(self, limit=20):\n", " \"\"\"处理邮件的主要流程\"\"\"\n", " if not self.connector:\n", " console.print(\"❌ 请先连接邮箱账户\", style=\"red\")\n", " return\n", " \n", " console.print(\"🚀 开始处理邮件...\", style=\"blue\")\n", " \n", " # 获取邮件\n", " emails = self.connector.get_emails(limit=limit)\n", " self.processing_stats['total_emails'] = len(emails)\n", " \n", " if not emails:\n", " console.print(\"📭 没有找到邮件\", style=\"yellow\")\n", " return\n", " \n", " console.print(f\"📧 找到 {len(emails)} 封邮件,开始处理...\", style=\"green\")\n", " \n", " # 处理每封邮件\n", " for email_data in tqdm(emails, desc=\"处理邮件\"):\n", " try:\n", " processed_email = self.process_single_email(email_data)\n", " self.processed_emails.append(processed_email)\n", " except Exception as e:\n", " console.print(f\"❌ 处理邮件失败: {str(e)}\", style=\"red\")\n", " continue\n", " \n", " console.print(\"✅ 邮件处理完成!\", style=\"green\")\n", " self.display_processing_summary()\n", " \n", " def process_single_email(self, email_data):\n", " \"\"\"处理单封邮件\"\"\"\n", " # 1. 邮件分类\n", " classification = self.classifier.classify_email(email_data)\n", " self.processing_stats['classified_emails'] += 1\n", " \n", " # 2. 信息提取\n", " extracted_info = self.extractor.generate_summary(email_data)\n", " \n", " # 3. 生成回复草稿\n", " reply_draft = None\n", " if classification['type'] != 'spam': # 不为垃圾邮件生成回复\n", " reply_draft = self.reply_generator.generate_reply(email_data, classification)\n", " self.processing_stats['replies_generated'] += 1\n", " \n", " # 4. 创建提醒\n", " reminders = []\n", " if classification['priority'] in ['high', 'medium']:\n", " reminders = self.reminder_manager.create_reminders(email_data, extracted_info)\n", " self.processing_stats['reminders_created'] += len(reminders)\n", " \n", " # 组装处理结果\n", " processed_email = {\n", " 'original_email': email_data,\n", " 'classification': classification,\n", " 'extracted_info': extracted_info,\n", " 'reply_draft': reply_draft,\n", " 'reminders': reminders,\n", " 'processed_at': datetime.now().isoformat()\n", " }\n", " \n", " return processed_email\n", " \n", " def display_processing_summary(self):\n", " \"\"\"显示处理摘要\"\"\"\n", " table = Table(title=\"📊 邮件处理摘要\")\n", " table.add_column(\"项目\", style=\"cyan\")\n", " table.add_column(\"数量\", style=\"magenta\")\n", " \n", " table.add_row(\"总邮件数\", str(self.processing_stats['total_emails']))\n", " table.add_row(\"已分类邮件\", str(self.processing_stats['classified_emails']))\n", " table.add_row(\"生成回复草稿\", str(self.processing_stats['replies_generated']))\n", " table.add_row(\"创建提醒\", str(self.processing_stats['reminders_created']))\n", " \n", " console.print(table)\n", " \n", " def get_classification_stats(self):\n", " \"\"\"获取分类统计\"\"\"\n", " if not self.processed_emails:\n", " return {}\n", " \n", " stats = {\n", " 'type': {},\n", " 'priority': {},\n", " 'sender_type': {}\n", " }\n", " \n", " for email in self.processed_emails:\n", " classification = email['classification']\n", " \n", " # 统计类型\n", " email_type = classification['type']\n", " stats['type'][email_type] = stats['type'].get(email_type, 0) + 1\n", " \n", " # 统计优先级\n", " priority = classification['priority']\n", " stats['priority'][priority] = stats['priority'].get(priority, 0) + 1\n", " \n", " # 统计发件人类型\n", " sender_type = classification['sender_type']\n", " stats['sender_type'][sender_type] = stats['sender_type'].get(sender_type, 0) + 1\n", " \n", " return stats\n", " \n", " def save_results(self, output_dir='output'):\n", " \"\"\"保存处理结果\"\"\"\n", " import os\n", " \n", " # 创建输出目录\n", " os.makedirs(f\"{output_dir}/reports\", exist_ok=True)\n", " os.makedirs(f\"{output_dir}/drafts\", exist_ok=True)\n", " \n", " timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')\n", " \n", " # 保存处理报告\n", " report_data = {\n", " 'processing_stats': self.processing_stats,\n", " 'classification_stats': self.get_classification_stats(),\n", " 'reminder_summary': self.reminder_manager.get_reminders_summary(),\n", " 'processed_emails': self.processed_emails,\n", " 'generated_at': datetime.now().isoformat()\n", " }\n", " \n", " with open(f\"{output_dir}/reports/email_report_{timestamp}.json\", 'w', encoding='utf-8') as f:\n", " json.dump(report_data, f, ensure_ascii=False, indent=2)\n", " \n", " # 保存回复草稿\n", " drafts = []\n", " for email in self.processed_emails:\n", " if email['reply_draft']:\n", " drafts.append({\n", " 'original_subject': email['original_email']['subject'],\n", " 'original_sender': email['original_email']['sender'],\n", " 'reply': email['reply_draft']\n", " })\n", " \n", " with open(f\"{output_dir}/drafts/reply_drafts_{timestamp}.json\", 'w', encoding='utf-8') as f:\n", " json.dump(drafts, f, ensure_ascii=False, indent=2)\n", " \n", " console.print(f\"✅ 结果已保存到 {output_dir} 目录\", style=\"green\")\n", "\n", "print(\"✅ 智能邮件助手主程序定义完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 9. 可视化和报告生成" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_visualization(assistant):\n", " \"\"\"创建可视化图表\"\"\"\n", " if not assistant.processed_emails:\n", " console.print(\"❌ 没有处理过的邮件数据\", style=\"red\")\n", " return\n", " \n", " stats = assistant.get_classification_stats()\n", " \n", " # 创建子图\n", " fig, axes = plt.subplots(2, 2, figsize=(15, 10))\n", " fig.suptitle('邮件处理分析报告', fontsize=16, fontweight='bold')\n", " \n", " # 1. 邮件类型分布\n", " if stats['type']:\n", " type_labels = list(stats['type'].keys())\n", " type_values = list(stats['type'].values())\n", " \n", " axes[0, 0].pie(type_values, labels=type_labels, autopct='%1.1f%%', startangle=90)\n", " axes[0, 0].set_title('邮件类型分布')\n", " \n", " # 2. 优先级分布\n", " if stats['priority']:\n", " priority_labels = list(stats['priority'].keys())\n", " priority_values = list(stats['priority'].values())\n", " \n", " colors = {'high': 'red', 'medium': 'orange', 'low': 'green'}\n", " bar_colors = [colors.get(label, 'blue') for label in priority_labels]\n", " \n", " axes[0, 1].bar(priority_labels, priority_values, color=bar_colors)\n", " axes[0, 1].set_title('邮件优先级分布')\n", " axes[0, 1].set_ylabel('数量')\n", " \n", " # 3. 发件人类型分布\n", " if stats['sender_type']:\n", " sender_labels = list(stats['sender_type'].keys())\n", " sender_values = list(stats['sender_type'].values())\n", " \n", " axes[1, 0].bar(sender_labels, sender_values)\n", " axes[1, 0].set_title('发件人类型分布')\n", " axes[1, 0].set_ylabel('数量')\n", " axes[1, 0].tick_params(axis='x', rotation=45)\n", " \n", " # 4. 处理统计\n", " process_labels = ['总邮件', '已分类', '生成回复', '创建提醒']\n", " process_values = [\n", " assistant.processing_stats['total_emails'],\n", " assistant.processing_stats['classified_emails'],\n", " assistant.processing_stats['replies_generated'],\n", " assistant.processing_stats['reminders_created']\n", " ]\n", " \n", " axes[1, 1].bar(process_labels, process_values, color='skyblue')\n", " axes[1, 1].set_title('处理统计')\n", " axes[1, 1].set_ylabel('数量')\n", " axes[1, 1].tick_params(axis='x', rotation=45)\n", " \n", " plt.tight_layout()\n", " plt.show()\n", "\n", "def display_sample_results(assistant, num_samples=3):\n", " \"\"\"显示处理结果样例\"\"\"\n", " if not assistant.processed_emails:\n", " console.print(\"❌ 没有处理过的邮件数据\", style=\"red\")\n", " return\n", " \n", " console.print(\"\\n📋 处理结果样例:\", style=\"bold blue\")\n", " \n", " for i, email in enumerate(assistant.processed_emails[:num_samples]):\n", " console.print(f\"\\n--- 邮件 {i+1} ---\", style=\"yellow\")\n", " \n", " # 原始邮件信息\n", " original = email['original_email']\n", " console.print(f\"主题: {original['subject']}\", style=\"cyan\")\n", " console.print(f\"发件人: {original['sender']}\", style=\"cyan\")\n", " \n", " # 分类结果\n", " classification = email['classification']\n", " console.print(f\"类型: {classification['type']} | 优先级: {classification['priority']} | 发件人类型: {classification['sender_type']}\", style=\"green\")\n", " \n", " # 提取的信息\n", " extracted = email['extracted_info']\n", " if extracted['key_dates']:\n", " console.print(f\"关键日期: {', '.join(extracted['key_dates'])}\", style=\"magenta\")\n", " if extracted['todo_items']:\n", " console.print(f\"待办事项: {extracted['todo_items'][0][:50]}...\", style=\"magenta\")\n", " \n", " # 回复草稿\n", " if email['reply_draft']:\n", " reply = email['reply_draft']\n", " console.print(f\"回复草稿 ({reply['tone']}, {reply['language']}): {reply['content'][:100]}...\", style=\"white\")\n", " \n", " # 提醒\n", " if email['reminders']:\n", " console.print(f\"创建了 {len(email['reminders'])} 个提醒\", style=\"yellow\")\n", "\n", "print(\"✅ 可视化和报告功能定义完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 10. 演示和测试" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 创建演示数据(如果无法连接真实邮箱)\n", "def create_demo_data():\n", " \"\"\"创建演示数据\"\"\"\n", " demo_emails = [\n", " {\n", " 'id': '1',\n", " 'subject': '紧急:项目进度汇报会议安排',\n", " 'sender': 'manager@company.com',\n", " 'date': '2024-01-15 09:00:00',\n", " 'body': '各位同事,请准备明天下午2点的项目进度汇报会议。需要准备本周工作总结和下周计划。截止时间:2024-01-16 14:00。请确认参会。'\n", " },\n", " {\n", " 'id': '2',\n", " 'subject': '客户咨询:产品功能详情',\n", " 'sender': 'customer@client.com',\n", " 'date': '2024-01-15 10:30:00',\n", " 'body': '您好,我对贵公司的产品很感兴趣,希望了解更多功能详情。请问可以安排一次产品演示吗?我的联系方式:13800138000。期待您的回复。'\n", " },\n", " {\n", " 'id': '3',\n", " 'subject': '系统维护通知',\n", " 'sender': 'noreply@system.com',\n", " 'date': '2024-01-15 11:00:00',\n", " 'body': '系统将于2024-01-20 02:00-04:00进行维护升级,期间服务可能中断。请提前做好准备工作。如有疑问请联系技术支持。'\n", " },\n", " {\n", " 'id': '4',\n", " 'subject': '限时优惠!立即购买享受8折优惠',\n", " 'sender': 'promotion@ads.com',\n", " 'date': '2024-01-15 12:00:00',\n", " 'body': '亲爱的用户,我们的产品正在进行限时促销活动!现在购买可享受8折优惠,机会难得,不要错过!点击链接立即购买。'\n", " },\n", " {\n", " 'id': '5',\n", " 'subject': '个人:周末聚餐安排',\n", " 'sender': 'friend@personal.com',\n", " 'date': '2024-01-15 13:00:00',\n", " 'body': '嗨!这个周末我们一起聚餐吧,时间定在周六晚上7点,地点在市中心的那家川菜馆。请确认是否能参加,我好提前订位。'\n", " }\n", " ]\n", " \n", " return demo_emails\n", "\n", "def run_demo():\n", " \"\"\"运行演示程序\"\"\"\n", " console.print(\"🎯 开始演示智能邮件助手\", style=\"bold blue\")\n", " \n", " # 检查配置\n", " if not config or not templates:\n", " console.print(\"❌ 配置或模板加载失败,无法运行演示\", style=\"red\")\n", " return\n", " \n", " # 创建助手实例\n", " assistant = EmailSmartAssistant(config, templates)\n", " \n", " # 使用演示数据\n", " console.print(\"📧 使用演示数据进行测试...\", style=\"yellow\")\n", " demo_emails = create_demo_data()\n", " \n", " # 处理演示邮件\n", " assistant.processing_stats['total_emails'] = len(demo_emails)\n", " \n", " for email_data in tqdm(demo_emails, desc=\"处理演示邮件\"):\n", " try:\n", " processed_email = assistant.process_single_email(email_data)\n", " assistant.processed_emails.append(processed_email)\n", " except Exception as e:\n", " console.print(f\"❌ 处理邮件失败: {str(e)}\", style=\"red\")\n", " continue\n", " \n", " # 显示结果\n", " console.print(\"\\n✅ 演示处理完成!\", style=\"green\")\n", " assistant.display_processing_summary()\n", " \n", " # 显示样例结果\n", " display_sample_results(assistant)\n", " \n", " # 创建可视化\n", " create_visualization(assistant)\n", " \n", " # 保存结果\n", " assistant.save_results()\n", " \n", " return assistant\n", "\n", "print(\"✅ 演示程序准备完成\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 11. 运行智能邮件助手" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 运行演示\n", "assistant = run_demo()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 12. 实际邮箱连接(可选)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 如果要连接真实邮箱,请先配置 config/email_config.json 文件\n", "# 然后取消注释下面的代码\n", "\n", "# def run_with_real_email():\n", "# \"\"\"使用真实邮箱运行\"\"\"\n", "# console.print(\"🔗 连接真实邮箱...\", style=\"blue\")\n", "# \n", "# # 创建助手实例\n", "# assistant = EmailSmartAssistant(config, templates)\n", "# \n", "# # 连接邮箱\n", "# if not assistant.connect_email_account(0): # 使用第一个邮箱账户\n", "# console.print(\"❌ 邮箱连接失败\", style=\"red\")\n", "# return None\n", "# \n", "# # 处理邮件\n", "# assistant.process_emails(limit=10) # 处理最新10封邮件\n", "# \n", "# # 显示结果\n", "# display_sample_results(assistant)\n", "# create_visualization(assistant)\n", "# assistant.save_results()\n", "# \n", "# # 关闭连接\n", "# assistant.connector.close_connections()\n", "# \n", "# return assistant\n", "\n", "# # 运行真实邮箱处理\n", "# real_assistant = run_with_real_email()\n", "\n", "console.print(\"\\n🎉 智能邮件助手演示完成!\", style=\"bold green\")\n", "console.print(\"\\n📝 使用说明:\", style=\"bold yellow\")\n", "console.print(\"1. 修改 config/email_config.json 配置你的邮箱信息\")\n", "console.print(\"2. 取消注释上面的真实邮箱连接代码\")\n", "console.print(\"3. 运行代码开始处理你的邮件\")\n", "console.print(\"4. 查看 output 目录中的处理报告和回复草稿\")" ] }