File size: 4,296 Bytes
7340102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging
import re
import requests
from typing import Dict, Any

class AuthManager:
    """
    AuthManager类用于管理身份验证过程,包括获取API密钥、用户信息和处理刷新令牌等操作。
    """
    
    def __init__(self, email: str, password: str):
        self.email = email
        self.password = password
        self.api_key: str = ""
        self.user_info: Dict[str, Any] = {}
        self.refresh_token: str = ""
        
        self.logger = logging.getLogger(__name__)
        logging.basicConfig(level=logging.INFO)
        
        with requests.Session() as self.session:
            self.login()
            self.fetch_apikey()
            self.log_values()
        
    def log_values(self) -> None:
        """记录刷新令牌到日志中。"""
        self.logger.info(f"\033[92mRefresh Token: {self.refresh_token}\033[0m")
        
    def fetch_apikey(self) -> str:
        """获取API密钥。"""
        if self.api_key:
            return self.api_key

        try:
            url = "https://chat.notdiamond.ai/login"
            headers = {
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
            }
            response = self.session.get(url, headers=headers)
            response.raise_for_status()

            match = re.search(r'<script src="(/_next/static/chunks/app/layout-[^"]+\.js)"', response.text)
            if not match:
                self.logger.warning("未找到匹配的脚本标签")
                return ""

            js_url = f"https://chat.notdiamond.ai{match.group(1)}"
            js_response = self.session.get(js_url, headers=headers)
            js_response.raise_for_status()
            
            api_key_match = re.search(r'\("https://spuckhogycrxcbomznwo\.supabase\.co","([^"]+)"\)', js_response.text)
            if api_key_match:
                self.api_key = api_key_match.group(1)
                return self.api_key
            else:
                self.logger.error("未能匹配API key")
                return ""

        except requests.RequestException as e:
            self.logger.error(f"请求JS文件时发生错误: {e}")
            return ""

    def login(self) -> None:
        """使用电子邮件和密码进行用户登录,并获取用户信息。"""
        url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=password"
        headers = {
            'apikey': self.fetch_apikey(),
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
            'Content-Type': 'application/json'
        }
        data = {
            "email": self.email,
            "password": self.password,
            "gotrue_meta_security": {}
        }

        try:
            response = self.session.post(url, headers=headers, json=data)
            response.raise_for_status()
            self.user_info = response.json()
            self.refresh_token = self.user_info.get('refresh_token', '')
        except requests.RequestException as e:
            self.logger.error(f"\033[91m登录请求错误: {e}\033[0m")

    def refresh_user_token(self) -> None:
        """使用刷新令牌来请求一个新的访问令牌并更新实例变量。"""
        url = "https://spuckhogycrxcbomznwo.supabase.co/auth/v1/token?grant_type=refresh_token"
        headers = {
            'apikey': self.fetch_apikey(),
            'content-type': 'application/json;charset=UTF-8',
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
        }
        data = {"refresh_token": self.refresh_token}

        try:
            response = self.session.post(url, headers=headers, json=data)
            response.raise_for_status()
            self.user_info = response.json()
            self.refresh_token = self.user_info.get('refresh_token', '')
        except requests.RequestException as e:
            self.logger.error(f"刷新令牌请求错误: {e}")

    def get_jwt_value(self) -> str:
        """返回访问令牌。"""
        return self.user_info.get('access_token', '')