Spaces:
Running
Running
Create auth_utils.py
Browse files- auth_utils.py +106 -0
auth_utils.py
ADDED
@@ -0,0 +1,106 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import logging
|
2 |
+
import re
|
3 |
+
import requests
|
4 |
+
from typing import Dict, Any
|
5 |
+
|
6 |
+
class AuthManager:
|
7 |
+
"""
|
8 |
+
AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
|
9 |
+
"""
|
10 |
+
|
11 |
+
def __init__(self, email: str, password: str):
|
12 |
+
self.email = email
|
13 |
+
self.password = password
|
14 |
+
self.api_key: str = ""
|
15 |
+
self.user_info: Dict[str, Any] = {}
|
16 |
+
self.refresh_token: str = ""
|
17 |
+
|
18 |
+
self.logger = logging.getLogger(__name__)
|
19 |
+
logging.basicConfig(level=logging.INFO)
|
20 |
+
|
21 |
+
with requests.Session() as self.session:
|
22 |
+
self.login()
|
23 |
+
self.fetch_apikey()
|
24 |
+
self.log_values()
|
25 |
+
|
26 |
+
def log_values(self) -> None:
|
27 |
+
"""记录刷新令牌到日志中。"""
|
28 |
+
self.logger.info(f"\033[92mRefresh Token: {self.refresh_token}\033[0m")
|
29 |
+
|
30 |
+
def fetch_apikey(self) -> str:
|
31 |
+
"""获取API密钥。"""
|
32 |
+
if self.api_key:
|
33 |
+
return self.api_key
|
34 |
+
|
35 |
+
try:
|
36 |
+
url = "https://chat.notdiamond.ai/login"
|
37 |
+
headers = {
|
38 |
+
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
|
39 |
+
}
|
40 |
+
response = self.session.get(url, headers=headers)
|
41 |
+
response.raise_for_status()
|
42 |
+
|
43 |
+
match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
|
44 |
+
if not match:
|
45 |
+
self.logger.warning("未找到匹配的脚本标签")
|
46 |
+
return ""
|
47 |
+
|
48 |
+
js_url = f"https://chat.notdiamond.ai{match.group(1)}"
|
49 |
+
js_response = self.session.get(js_url, headers=headers)
|
50 |
+
js_response.raise_for_status()
|
51 |
+
|
52 |
+
api_key_match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', js_response.text)
|
53 |
+
if api_key_match:
|
54 |
+
self.api_key = api_key_match.group(1)
|
55 |
+
return self.api_key
|
56 |
+
else:
|
57 |
+
self.logger.error("未能匹配API key")
|
58 |
+
return ""
|
59 |
+
|
60 |
+
except requests.RequestException as e:
|
61 |
+
self.logger.error(f"请求JS文件时发生错误: {e}")
|
62 |
+
return ""
|
63 |
+
|
64 |
+
def login(self) -> None:
|
65 |
+
"""使用电子邮件和密码进行用户登录,并获取用户信息。"""
|
66 |
+
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=password"
|
67 |
+
headers = {
|
68 |
+
'apikey': self.fetch_apikey(),
|
69 |
+
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
|
70 |
+
'Content-Type': 'application/json'
|
71 |
+
}
|
72 |
+
data = {
|
73 |
+
"email": self.email,
|
74 |
+
"password": self.password,
|
75 |
+
"gotrue_meta_security": {}
|
76 |
+
}
|
77 |
+
|
78 |
+
try:
|
79 |
+
response = self.session.post(url, headers=headers, json=data)
|
80 |
+
response.raise_for_status()
|
81 |
+
self.user_info = response.json()
|
82 |
+
self.refresh_token = self.user_info.get('refresh_token', '')
|
83 |
+
except requests.RequestException as e:
|
84 |
+
self.logger.error(f"\033[91m登录请求错误: {e}\033[0m")
|
85 |
+
|
86 |
+
def refresh_user_token(self) -> None:
|
87 |
+
"""使用刷新令牌来请求一个新的访问令牌并更新实例变量。"""
|
88 |
+
url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=refresh_token"
|
89 |
+
headers = {
|
90 |
+
'apikey': self.fetch_apikey(),
|
91 |
+
'content-type': 'application/json;charset=UTF-8',
|
92 |
+
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
|
93 |
+
}
|
94 |
+
data = {"refresh_token": self.refresh_token}
|
95 |
+
|
96 |
+
try:
|
97 |
+
response = self.session.post(url, headers=headers, json=data)
|
98 |
+
response.raise_for_status()
|
99 |
+
self.user_info = response.json()
|
100 |
+
self.refresh_token = self.user_info.get('refresh_token', '')
|
101 |
+
except requests.RequestException as e:
|
102 |
+
self.logger.error(f"刷新令牌请求错误: {e}")
|
103 |
+
|
104 |
+
def get_jwt_value(self) -> str:
|
105 |
+
"""返回访问令牌。"""
|
106 |
+
return self.user_info.get('access_token', '')
|