notdiamond2api / auth_utils.py
smgc's picture
Create auth_utils.py
7340102 verified
raw
history blame
4.3 kB
import logging
import re
import requests
from typing import Dict, Any
class AuthManager:
"""
AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
"""
def __init__(self, email: str, password: str):
self.email = email
self.password = password
self.api_key: str = ""
self.user_info: Dict[str, Any] = {}
self.refresh_token: str = ""
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
with requests.Session() as self.session:
self.login()
self.fetch_apikey()
self.log_values()
def log_values(self) -> None:
"""记录刷新令牌到日志中。"""
self.logger.info(f"\033[92mRefresh Token: {self.refresh_token}\033[0m")
def fetch_apikey(self) -> str:
"""获取API密钥。"""
if self.api_key:
return self.api_key
try:
url = "https://chat.notdiamond.ai/login"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
if not match:
self.logger.warning("未找到匹配的脚本标签")
return ""
js_url = f"https://chat.notdiamond.ai{match.group(1)}"
js_response = self.session.get(js_url, headers=headers)
js_response.raise_for_status()
api_key_match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', js_response.text)
if api_key_match:
self.api_key = api_key_match.group(1)
return self.api_key
else:
self.logger.error("未能匹配API key")
return ""
except requests.RequestException as e:
self.logger.error(f"请求JS文件时发生错误: {e}")
return ""
def login(self) -> None:
"""使用电子邮件和密码进行用户登录,并获取用户信息。"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=password"
headers = {
'apikey': self.fetch_apikey(),
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Content-Type': 'application/json'
}
data = {
"email": self.email,
"password": self.password,
"gotrue_meta_security": {}
}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
except requests.RequestException as e:
self.logger.error(f"\033[91m登录请求错误: {e}\033[0m")
def refresh_user_token(self) -> None:
"""使用刷新令牌来请求一个新的访问令牌并更新实例变量。"""
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=refresh_token"
headers = {
'apikey': self.fetch_apikey(),
'content-type': 'application/json;charset=UTF-8',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
data = {"refresh_token": self.refresh_token}
try:
response = self.session.post(url, headers=headers, json=data)
response.raise_for_status()
self.user_info = response.json()
self.refresh_token = self.user_info.get('refresh_token', '')
except requests.RequestException as e:
self.logger.error(f"刷新令牌请求错误: {e}")
def get_jwt_value(self) -> str:
"""返回访问令牌。"""
return self.user_info.get('access_token', '')