|
import re |
|
import yaml |
|
import requests |
|
from http.server import HTTPServer, BaseHTTPRequestHandler |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
def extract_and_transform_proxies(input_text): |
|
print("原始输入数据:") |
|
print(input_text) |
|
print("------------------------") |
|
|
|
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) |
|
if not proxies_match: |
|
return "未找到proxies部分" |
|
|
|
proxies_text = proxies_match.group(1) |
|
|
|
try: |
|
proxies_list = yaml.safe_load(proxies_text) |
|
except yaml.YAMLError: |
|
|
|
proxies_list = [] |
|
for proxy_str in re.findall(r'{(.*?)}', proxies_text, re.DOTALL): |
|
proxy_dict = {} |
|
for item in proxy_str.split(','): |
|
key, value = item.split(':', 1) |
|
proxy_dict[key.strip()] = value.strip() |
|
proxies_list.append(proxy_dict) |
|
|
|
if not proxies_list: |
|
return "未找到有效的代理配置" |
|
|
|
transformed_proxies = [] |
|
|
|
for proxy in proxies_list: |
|
if proxy.get('type') == 'ss': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('port', '')).strip() |
|
cipher = proxy.get('cipher', '').strip() |
|
password = proxy.get('password', '').strip() |
|
udp = 'true' if proxy.get('udp') in [True, 'true', 'True'] else 'false' |
|
|
|
transformed = f"{name} = ss, {server}, {port}, encrypt-method={cipher}, password={password}, udp-relay={udp}" |
|
transformed_proxies.append(transformed) |
|
|
|
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS代理配置" |
|
|
|
class RequestHandler(BaseHTTPRequestHandler): |
|
def do_GET(self): |
|
parsed_path = urlparse(self.path) |
|
query_params = parse_qs(parsed_path.query) |
|
|
|
if 'url' in query_params: |
|
url = query_params['url'][0] |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
input_text = response.text |
|
result = extract_and_transform_proxies(input_text) |
|
self.send_response(200) |
|
self.send_header('Content-type', 'text/plain; charset=utf-8') |
|
self.end_headers() |
|
self.wfile.write(result.encode('utf-8')) |
|
except requests.RequestException as e: |
|
self.send_error(500, f"Error fetching data: {str(e)}") |
|
else: |
|
self.send_error(400, "Missing 'url' parameter") |
|
|
|
def log_message(self, format, *args): |
|
|
|
pass |
|
|
|
def run_server(port=8080): |
|
server_address = ('', port) |
|
httpd = HTTPServer(server_address, RequestHandler) |
|
print(f"Server running on port {port}") |
|
httpd.serve_forever() |
|
|
|
if __name__ == "__main__": |
|
run_server() |
|
|