import re import yaml import requests from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs def extract_and_transform_proxies(input_text): proxies_match = re.search(r'proxies:\s*\n((?:{.*\n)*)', input_text) if not proxies_match: return "未找到proxies部分" proxies_text = proxies_match.group(1) try: # 尝试解析YAML proxies_list = yaml.safe_load(proxies_text) # 检查proxies_list是否为None或不是列表 if proxies_list is None or not isinstance(proxies_list, list): # 如果不是有效的YAML列表,尝试使用正则表达式解析 proxies_list = re.findall(r'{(.*?)}', proxies_text, re.DOTALL) proxies_list = [dict(item.split(': ') for item in proxy.split(', ')) for proxy in proxies_list] except yaml.YAMLError as e: return f"YAML解析错误: {str(e)}" if not proxies_list: return "未找到有效的代理配置" transformed_proxies = [] for proxy in proxies_list: if isinstance(proxy, str): # 如果proxy是字符串,说明使用了正则表达式解析,需要进一步处理 proxy = dict(item.split(': ') for item in proxy.split(', ')) if proxy.get('type') == 'ss': name = proxy.get('name', '') server = proxy.get('server', '') port = proxy.get('port', '') cipher = proxy.get('cipher', '') password = proxy.get('password', '') udp = 'true' if proxy.get('udp') == 'true' else 'false' transformed = f"{name} = ss, {server}, {port}, encrypt-method={cipher}, password={password}, udp-relay={udp}" transformed_proxies.append(transformed) return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS代理配置" class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): parsed_path = urlparse(self.path) query_params = parse_qs(parsed_path.query) if 'url' in query_params: url = query_params['url'][0] try: response = requests.get(url) response.raise_for_status() # 这将抛出HTTPError,如果响应状态码不是200 input_text = response.text result = extract_and_transform_proxies(input_text) self.send_response(200) self.send_header('Content-type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write(result.encode('utf-8')) except requests.RequestException as e: self.send_error(500, f"Error fetching data: {str(e)}") else: self.send_error(400, "Missing 'url' parameter") def log_message(self, format, *args): # 重写此方法以禁用默认的服务器日志 pass def run_server(port=8080): server_address = ('', port) httpd = HTTPServer(server_address, RequestHandler) print(f"Server running on port {port}") httpd.serve_forever() if __name__ == "__main__": run_server()