Spaces:
Running
on
Zero
Running
on
Zero
# -*- coding:UTF-8 -*- | |
# !/usr/bin/env python | |
import spaces | |
import numpy as np | |
import gradio as gr | |
import gradio.exceptions | |
import roop.globals | |
from roop.core import ( | |
start, | |
decode_execution_providers, | |
) | |
from roop.processors.frame.core import get_frame_processors_modules | |
from roop.utilities import normalize_output_path | |
import os | |
from PIL import Image | |
import uuid | |
import onnxruntime as ort | |
import cv2 | |
from roop.face_analyser import get_one_face | |
from cryptography.hazmat.primitives.asymmetric import rsa, padding | |
from cryptography.hazmat.primitives import serialization, hashes | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives.asymmetric import utils | |
import base64 | |
import json | |
import datetime | |
def load_public_key_from_file(file_path): | |
with open(file_path, "rb") as key_file: | |
public_key = serialization.load_pem_public_key( | |
key_file.read(), | |
backend=default_backend() | |
) | |
return public_key | |
def verify_signature(public_key, data, signature): | |
""" | |
Verify a signature with a public key. | |
Converts the data to bytes if it's not already in byte format. | |
""" | |
# Ensure the data is in bytes. If it's a string, encode it to UTF-8. | |
if isinstance(data, str): | |
data = data.encode('utf-8') | |
try: | |
# Verify the signature | |
public_key.verify( | |
signature, | |
data, | |
padding.PSS( | |
mgf=padding.MGF1(hashes.SHA256()), | |
salt_length=padding.PSS.MAX_LENGTH | |
), | |
hashes.SHA256() | |
) | |
return True | |
except Exception as e: | |
print("Verification failed:", e) | |
return False | |
public_key = load_public_key_from_file("./nsfwais.pubkey.pem") | |
def swap_face(source_file, target_file, doFaceEnhancer, skey): | |
skey = json.loads(skey) | |
#first validate skey | |
signature = base64.b64decode(skey["s"]) | |
if not verify_signature(public_key, skey["t"], signature): | |
raise Exception("verify authkey failed.") | |
timestamp_requested = int(skey["t"]) | |
timestamp_now = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp()) | |
if timestamp_now - timestamp_requested > 600: | |
raise Exception(f"authkey timeout, {timestamp_now - timestamp_requested}") | |
print(f"authkey pass, {timestamp_now - timestamp_requested}") | |
session_id = str(uuid.uuid4()) # Tạo một UUID duy nhất cho mỗi phiên làm việc | |
session_dir = f"temp/{session_id}" | |
os.makedirs(session_dir, exist_ok=True) | |
source_path = os.path.join(session_dir, "input.jpg") | |
target_path = os.path.join(session_dir, "target.jpg") | |
source_image = Image.fromarray(source_file) | |
source_image.save(source_path) | |
target_image = Image.fromarray(target_file) | |
target_image.save(target_path) | |
print("source_path: ", source_path) | |
print("target_path: ", target_path) | |
# Check if a face is detected in the source image | |
source_face = get_one_face(cv2.imread(source_path)) | |
if source_face is None: | |
raise gradio.exceptions.Error("No face in source path detected.") | |
# Check if a face is detected in the target image | |
target_face = get_one_face(cv2.imread(target_path)) | |
if target_face is None: | |
raise gradio.exceptions.Error("No face in target path detected.") | |
output_path = os.path.join(session_dir, "output.jpg") | |
normalized_output_path = normalize_output_path(source_path, target_path, output_path) | |
frame_processors = ["face_swapper", "face_enhancer"] if doFaceEnhancer else ["face_swapper"] | |
for frame_processor in get_frame_processors_modules(frame_processors): | |
if not frame_processor.pre_check(): | |
print(f"Pre-check failed for {frame_processor}") | |
raise gradio.exceptions.Error(f"Pre-check failed for {frame_processor}") | |
roop.globals.source_path = source_path | |
roop.globals.target_path = target_path | |
roop.globals.output_path = normalized_output_path | |
roop.globals.frame_processors = frame_processors | |
roop.globals.headless = True | |
roop.globals.keep_fps = True | |
roop.globals.keep_audio = True | |
roop.globals.keep_frames = False | |
roop.globals.many_faces = False | |
roop.globals.video_encoder = "libx264" | |
roop.globals.video_quality = 18 | |
roop.globals.execution_providers = ["CUDAExecutionProvider"] | |
roop.globals.reference_face_position = 0 | |
roop.globals.similar_face_distance = 0.6 | |
roop.globals.max_memory = 60 | |
roop.globals.execution_threads = 50 | |
start() | |
return normalized_output_path | |
app = gr.Interface( | |
fn=swap_face, | |
inputs=[ | |
gr.Image(), | |
gr.Image(), | |
gr.Checkbox(label="Face Enhancer?", info="Do face enhancement?"), | |
gr.Textbox(visible=False) | |
], | |
outputs="image" | |
) | |
app.launch() |