File size: 5,850 Bytes
1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 933eaf9 1457e31 933eaf9 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 f762231 1457e31 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from utils import *
from config import *
temp_examples = get_temps_examples(taskType)
user_examples = get_user_examples(taskType)
showcase_examples = get_showcase_examples(taskType)
user_recorder = UserRecorder()
css = """
.gradio-container {width: 85% !important}
"""
def onClick(temp_image, user_image, caption_text, token_text, request: gr.Request):
print("======> temp_image ", type(temp_image), temp_image)
print("======> user_image ", type(user_image))
print("======> caption_text ", type(caption_text))
if temp_image is None:
yield None, "please choose a template background!!!"
return None, "please choose a template background!!!"
if user_image is None:
yield None, "please upload a product photo!!!"
return None, "please upload a product photo!!!"
try:
client_ip = request.client.host
x_forwarded_for = dict(request.headers).get('x-forwarded-for')
if x_forwarded_for: client_ip = x_forwarded_for
if not check_region_warp(client_ip):
return None, "Failed !!! Our server is under maintenance, please try again later"
# 检查是否可以继续试用
check_res, info = user_recorder.check_record(ip=client_ip, token=token_text)
if not check_res:
yield None, info
return None, info
# 上传用户照片
upload_url = upload_user_img(client_ip, user_image)
if len(upload_url)==0:
yield None, "fail to upload"
return None, "fail to upload"
# 发布任务
taskId = publicSelfitTask(upload_url, temp_image, caption_text)
if not taskId:
yield None, "fail to public task..."
return None, "fail to public task..."
# taskId = "95a19e33-1ff5-446e-ac37-d3a8dd59eba8-e1"
max_try = 30
wait_s = 3
time.sleep(5)
for i in range(max_try):
time.sleep(wait_s)
taskStatus = getTaskRes(taskId)
if taskStatus is None: continue
user_recorder.save_record(taskStatus, ip=client_ip, token=token_text)
status = taskStatus['status']
if status in ['FAILED', 'CANCELLED', 'TIMED_OUT', ]:
yield None, f"task failed, query {i}, status {status}"
return None, f"task failed, query {i}, status {status}"
elif status in ['IN_QUEUE', 'IN_PROGRESS', 'IN_QUEUE', ]:
yield None, f"task is on processing, query {i}, status {status}"
elif status=='COMPLETED':
out = taskStatus['output']['job_results']['output1']
yield out, f"task is COMPLETED"
return out, f"{i} task COMPLETED"
yield None, "fail to query task.."
return None, "fail to query task.."
except Exception as e:
print(e)
raise e
yield None, "fail to create task"
return None, "fail to create task"
def onLoad(token_text, request: gr.Request):
client_ip = request.client.host
x_forwarded_for = dict(request.headers).get('x-forwarded-for')
if x_forwarded_for:
client_ip = x_forwarded_for
his_datas, total_n, msg = user_recorder.get_record(ip=client_ip, token=token_text)
left_n = max(0, LimitTask-total_n)
his_datas.append(msg)
his_datas.append(f"Submit ({left_n} attempts left)")
return his_datas
with gr.Blocks(css=css) as demo:
gr.Markdown(title)
gr.Markdown(description)
with gr.Row():
with gr.Column():
with gr.Column():
temp_image = gr.Image(sources='clipboard', type="filepath", label=TempLabel,
value=temp_examples[0][0], visible=TempVisible)
temp_example = gr.Examples(inputs=[temp_image], examples_per_page=9,
examples=temp_examples, visible=TempVisible)
with gr.Column():
with gr.Column():
user_image = gr.Image(value=None, type="numpy", label=UserLabel)
caption_text = gr.Textbox(value="", interactive=True, label=CaptionLabel, visible=CapVisible)
with gr.Column():
with gr.Column():
res_image = gr.Image(label="generate image", value=None, type="filepath")
info_text = gr.Markdown(value="", label='Runtime Info') # 创建 Markdown 输出组件
run_button = gr.Button(value="Submit")
token_text = gr.Textbox(value="", interactive=True,
label='Enter Your Api Key (optional)', visible=is_show_token)
with gr.Column():
show_case = gr.Examples(examples=showcase_examples,
inputs=[temp_image, user_image, res_image, ],label=None)
with gr.Tab('history'):
with gr.Row(): # 用 Row 包裹按钮
with gr.Column(scale=0.5): # Button 占用 Row 的一半
refresh_button = gr.Button("Refresh History", size="small")
MK02 = gr.Markdown(value="") # 示例 Markdown 内容
with gr.Row():
his_input1 = gr.HTML()
his_output1 = gr.HTML()
with gr.Row():
his_input2 = gr.HTML()
his_output2 = gr.HTML()
with gr.Row():
his_input3 = gr.HTML()
his_output3 = gr.HTML()
outputs_onload = [his_input1, his_output1, his_input2, his_output2, his_input3, his_output3,
MK02, run_button]
run_button.click(fn=onClick, inputs=[temp_image, user_image, caption_text, token_text],
outputs=[res_image, info_text])
refresh_button.click(fn=onLoad, inputs=[token_text], outputs=outputs_onload)
demo.load(onLoad, inputs=[token_text], outputs=outputs_onload)
if __name__ == "__main__":
demo.queue(max_size=50)
demo.launch(server_name='0.0.0.0')
|