import json import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from duckduckgo_search import DDGS import os emailkey = os.getenv("EMAIL_KEY") FUNCTIONS = [ { "name": "search_duckduckgo", "description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", "parameters": { "type": "object", "properties": { "keywords": { "type": "array", "items": {"type": "string"}, "description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" } }, "required": ["keywords"] } }, { "name": "search_papers", "description": "使用Crossref API搜索学术论文。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串。例如:'climate change'。" } }, "required": ["query"] } }, { "name": "send_email", "description": "发送电子邮件。", "parameters": { "type": "object", "properties": { "to": { "type": "string", "description": "收件人邮箱地址" }, "subject": { "type": "string", "description": "邮件主题" }, "content": { "type": "string", "description": "邮件内容" } }, "required": ["to", "subject", "content"] } } ] FUNCTIONS_GROUP_1 = [FUNCTIONS[0], FUNCTIONS[1]] # search_duckduckgo, search_papers FUNCTIONS_GROUP_2 = [FUNCTIONS[2]] # send_email def search_duckduckgo(keywords): search_term = " ".join(keywords) with DDGS() as ddgs: results = list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) return [{"title": result['title'], "body": result['body'].replace('\n', ' ')} for result in results] def search_papers(query): url = f"https://api.crossref.org/works?query={query}" response = requests.get(url) if response.status_code == 200: data = response.json() papers = data['message']['items'] processed_papers = [] for paper in papers: processed_paper = { "标题": paper.get('title', [''])[0], "作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), "DOI": paper.get('DOI', ''), "摘要": paper.get('abstract', '').replace('

', '').replace('

', '').replace('', '*').replace('', '*') } processed_papers.append(processed_paper) return processed_papers else: return [] def send_email(to, subject, content): try: with smtplib.SMTP('106.15.184.28', 8025) as smtp: smtp.login("jwt", emailkey) message = MIMEMultipart() message['From'] = "Me " message['To'] = to message['Subject'] = subject message.attach(MIMEText(content, 'html')) smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) return True except Exception as e: print(f"发送邮件时出错: {str(e)}") return False def process_function_call(function_name, function_args): if function_name == "search_duckduckgo": keywords = function_args.get('keywords', []) if not keywords: return "搜索关键词为空,无法执行搜索。" return search_duckduckgo(keywords) elif function_name == "search_papers": query = function_args.get('query', '') if not query: return "搜索查询为空,无法执行论文搜索。" return search_papers(query) elif function_name == "send_email": to = function_args.get('to', '') subject = function_args.get('subject', '') content = function_args.get('content', '') if not to or not subject or not content: return "邮件信息不完整,无法发送邮件。" success = send_email(to, subject, content) return { "success": success, "message": "邮件发送成功" if success else "邮件发送失败", "to": to, "subject": subject, "content": content, "is_email": True } else: return "未知的函数调用。"