返回功能详情

邮件自动发送脚本 - 核心示例代码

以下是支持批量发送、个性化内容和附件上传的核心代码,适配 QQ/163 等主流邮箱:

复制成功!
import smtplib
 import csv
 import logging
 from email.mime.text import MIMEText
 from email.mime.multipart import MIMEMultipart
 from email.header import Header
 import time
 import os
 # 配置日志
 logging.basicConfig(
     filename='email_send_log.log',
     level=logging.INFO,
     format='%(asctime)s - %(levelname)s - %(message)s',
     encoding='utf-8'
 )
 # 邮件配置类
 class EmailSender:
     def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
         self.smtp_server = smtp_server
         self.smtp_port = smtp_port
         self.sender_email = sender_email
         self.sender_password = sender_password
         self.server = None
     # 连接 SMTP 服务器
     def connect(self):
         try:
             self.server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
             self.server.login(self.sender_email, self.sender_password)
             logging.info(f"成功连接到 {self.smtp_server}")
             return True
         except Exception as e:
             logging.error(f"连接服务器失败:{str(e)}")
             return False
     # 构建邮件内容(支持 HTML 和附件)
     def build_email(self, to_email, subject, content, attachments=None, is_html=False):
         msg = MIMEMultipart()
         msg['From'] = Header(f"通知中心 <{self.sender_email}>", 'utf-8')
         msg['To'] = to_email
         msg['Subject'] = Header(subject, 'utf-8')
         # 添加正文
         content_type = 'html' if is_html else 'plain'
         msg.attach(MIMEText(content, content_type, 'utf-8'))
         # 添加附件
         if attachments:
             for file_path in attachments:
                 if os.path.exists(file_path):
                     with open(file_path, 'rb') as f:
                         part = MIMEText(f.read(), 'base64', 'utf-8')
                         part['Content-Type'] = 'application/octet-stream'
                         filename = os.path.basename(file_path)
                         part['Content-Disposition'] = f'attachment; filename="{filename}"'
                         msg.attach(part)
                 else:
                     logging.warning(f"附件不存在:{file_path}")
         return msg
     # 发送单封邮件
     def send_single_email(self, to_email, subject, content, attachments=None, is_html=False):
         try:
             msg = self.build_email(to_email, subject, content, attachments, is_html)
             self.server.sendmail(self.sender_email, to_email, msg.as_string())
             logging.info(f"邮件发送成功:{to_email}")
             return True
         except Exception as e:
             logging.error(f"邮件发送失败 {to_email}:{str(e)}")
             return False
     # 批量发送邮件(从 CSV 读取收件人)
     def send_batch_emails(self, csv_path, subject_template, content_template, attachments=None, is_html=False):
         if not self.connect():
             return
         with open(csv_path, 'r', encoding='utf-8') as f:
             reader = csv.DictReader(f)
             for row in reader:
                 # 个性化替换(CSV 列名作为变量)
                 subject = subject_template.format(**row)
                 content = content_template.format(**row)
                 to_email = row.get('email')
                 
                 if to_email:
                     self.send_single_email(to_email, subject, content, attachments, is_html)
                     time.sleep(1)  # 延迟避免触发反垃圾机制
                 else:
                     logging.warning("CSV 中缺少 email 列")
         self.server.quit()
         logging.info("批量发送任务完成")
 # 主函数调用
 if __name__ == '__main__':
     # 邮箱配置(替换为自己的信息)
     smtp_config = {
         'smtp_server': 'smtp.qq.com',  # QQ 邮箱 SMTP 服务器
         'smtp_port': 465,              # SSL 端口
         'sender_email': 'your_email@qq.com',
         'sender_password': 'your_auth_code'  # 邮箱授权码
     }
     # 初始化发送器
     sender = EmailSender(
         smtp_config['smtp_server'],
         smtp_config['smtp_port'],
         smtp_config['sender_email'],
         smtp_config['sender_password']
     )
     # 批量发送配置
     csv_path = 'recipients.csv'  # CSV 包含 email, name 等列
     subject_template = '【通知】{name} 您好,您的订单已更新'
     content_template = '''
     尊敬的 {name}:
     您好!您的订单 {order_id} 已完成处理,详情请查看附件。
     如有疑问,请联系客服:service@example.com
     '''
     attachments = ['files/order_detail.pdf']  # 附件路径
     # 执行批量发送
     sender.send_batch_emails(
         csv_path=csv_path,
         subject_template=subject_template,
         content_template=content_template,
         attachments=attachments,
         is_html=False
     )