|
import re |
|
import yaml |
|
import requests |
|
from http.server import HTTPServer, BaseHTTPRequestHandler |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
def extract_and_transform_proxies(input_text): |
|
print("原始输入数据:") |
|
print(input_text) |
|
print("------------------------") |
|
|
|
|
|
try: |
|
data = yaml.safe_load(input_text) |
|
if isinstance(data, dict) and 'proxies' in data: |
|
proxies_list = data['proxies'] |
|
else: |
|
proxies_list = data |
|
except yaml.YAMLError: |
|
|
|
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) |
|
if proxies_match: |
|
proxies_text = proxies_match.group(1) |
|
try: |
|
proxies_list = yaml.safe_load(proxies_text) |
|
except yaml.YAMLError: |
|
|
|
proxies_list = re.findall(r'{(.*?)}', proxies_text, re.DOTALL) |
|
proxies_list = [dict(item.strip().split(': ') for item in proxy.split(',')) for proxy in proxies_list] |
|
else: |
|
return "未找到proxies部分" |
|
|
|
if not proxies_list: |
|
return "未找到有效的代理配置" |
|
|
|
transformed_proxies = [] |
|
|
|
for proxy in proxies_list: |
|
if isinstance(proxy, str): |
|
|
|
try: |
|
proxy = dict(item.strip().split(': ') for item in proxy.split(',')) |
|
except ValueError: |
|
continue |
|
|
|
if proxy.get('type') == 'ss': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = proxy.get('port', '').strip() |
|
cipher = proxy.get('cipher', '').strip() |
|
password = proxy.get('password', '').strip() |
|
udp = 'true' if proxy.get('udp') in [True, 'true', 'True'] else 'false' |
|
|
|
transformed = f"{name} = ss, {server}, {port}, encrypt-method={cipher}, password={password}, udp-relay={udp}" |
|
transformed_proxies.append(transformed) |
|
|
|
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS代理配置" |
|
|
|
class RequestHandler(BaseHTTPRequestHandler): |
|
def do_GET(self): |
|
parsed_path = urlparse(self.path) |
|
query_params = parse_qs(parsed_path.query) |
|
|
|
if 'url' in query_params: |
|
url = query_params['url'][0] |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
input_text = response.text |
|
result = extract_and_transform_proxies(input_text) |
|
self.send_response(200) |
|
self.send_header('Content-type', 'text/plain; charset=utf-8') |
|
self.end_headers() |
|
self.wfile.write(result.encode('utf-8')) |
|
except requests.RequestException as e: |
|
self.send_error(500, f"Error fetching data: {str(e)}") |
|
else: |
|
self.send_error(400, "Missing 'url' parameter") |
|
|
|
def log_message(self, format, *args): |
|
|
|
pass |
|
|
|
def run_server(port=8080): |
|
server_address = ('', port) |
|
httpd = HTTPServer(server_address, RequestHandler) |
|
print(f"Server running on port {port}") |
|
httpd.serve_forever() |
|
|
|
if __name__ == "__main__": |
|
run_server() |
|
|