import gradio as gr import copy import re from threading import Thread from transformers import TextIteratorStreamer from qwen_vl_utils import process_vision_info def _parse_text(text): lines = text.split("\n") lines = [line for line in lines if line != ""] count = 0 for i, line in enumerate(lines): if "```" in line: count += 1 items = line.split("`") if count % 2 == 1: lines[i] = f'
'
            else:
                lines[i] = "
" else: if i > 0: if count % 2 == 1: line = line.replace("`", r"\`") line = line.replace("<", "<") line = line.replace(">", ">") line = line.replace(" ", " ") line = line.replace("*", "*") line = line.replace("_", "_") line = line.replace("-", "-") line = line.replace(".", ".") line = line.replace("!", "!") line = line.replace("(", "(") line = line.replace(")", ")") line = line.replace("$", "$") lines[i] = "
" + line text = "".join(lines) return text def _remove_image_special(text): text = text.replace("", "").replace("", "") return re.sub(r".*?(|$)", "", text) def is_video_file(filename): video_extensions = [".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg"] return any(filename.lower().endswith(ext) for ext in video_extensions) def transform_messages(original_messages): transformed_messages = [] for message in original_messages: new_content = [] for item in message["content"]: if "image" in item: new_item = {"type": "image", "image": item["image"]} elif "text" in item: new_item = {"type": "text", "text": item["text"]} elif "video" in item: new_item = {"type": "video", "video": item["video"]} else: continue new_content.append(new_item) new_message = {"role": message["role"], "content": new_content} transformed_messages.append(new_message) return transformed_messages def make_demo(model, processor): def call_local_model(model, processor, messages): messages = transform_messages(messages) text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs = process_vision_info(messages) inputs = processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt").to(model.device) tokenizer = processor.tokenizer streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) gen_kwargs = {"max_new_tokens": 512, "streamer": streamer, **inputs} thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() generated_text = "" for new_text in streamer: generated_text += new_text yield generated_text def create_predict_fn(): def predict(_chatbot, task_history): chat_query = _chatbot[-1][0] query = task_history[-1][0] if len(chat_query) == 0: _chatbot.pop() task_history.pop() return _chatbot print("User: " + _parse_text(query)) history_cp = copy.deepcopy(task_history) full_response = "" messages = [] content = [] for q, a in history_cp: if isinstance(q, (tuple, list)): if is_video_file(q[0]): content.append({"video": f"file://{q[0]}"}) else: content.append({"image": f"file://{q[0]}"}) else: content.append({"text": q}) messages.append({"role": "user", "content": content}) messages.append({"role": "assistant", "content": [{"text": a}]}) content = [] messages.pop() for response in call_local_model(model, processor, messages): _chatbot[-1] = (_parse_text(chat_query), _remove_image_special(_parse_text(response))) yield _chatbot full_response = _parse_text(response) task_history[-1] = (query, full_response) print("Qwen-VL-Chat: " + _parse_text(full_response)) yield _chatbot return predict def create_regenerate_fn(): def regenerate(_chatbot, task_history): if not task_history: return _chatbot item = task_history[-1] if item[1] is None: return _chatbot task_history[-1] = (item[0], None) chatbot_item = _chatbot.pop(-1) if chatbot_item[0] is None: _chatbot[-1] = (_chatbot[-1][0], None) else: _chatbot.append((chatbot_item[0], None)) _chatbot_gen = predict(_chatbot, task_history) for _chatbot in _chatbot_gen: yield _chatbot return regenerate predict = create_predict_fn() regenerate = create_regenerate_fn() def add_text(history, task_history, text): task_text = text history = history if history is not None else [] task_history = task_history if task_history is not None else [] history = history + [(_parse_text(text), None)] task_history = task_history + [(task_text, None)] return history, task_history, "" def add_file(history, task_history, file): history = history if history is not None else [] task_history = task_history if task_history is not None else [] history = history + [((file.name,), None)] task_history = task_history + [((file.name,), None)] return history, task_history def reset_user_input(): return gr.update(value="") def reset_state(task_history): task_history.clear() return [] with gr.Blocks() as demo: gr.Markdown("""
Qwen2-VL OpenVINO demo
""") chatbot = gr.Chatbot(label="Qwen2-VL", elem_classes="control-height", height=500) query = gr.Textbox(lines=2, label="Input") task_history = gr.State([]) with gr.Row(): addfile_btn = gr.UploadButton("📁 Upload (上传文件)", file_types=["image", "video"]) submit_btn = gr.Button("🚀 Submit (发送)") regen_btn = gr.Button("🤔️ Regenerate (重试)") empty_bin = gr.Button("🧹 Clear History (清除历史)") submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( predict, [chatbot, task_history], [chatbot], show_progress=True ) submit_btn.click(reset_user_input, [], [query]) empty_bin.click(reset_state, [task_history], [chatbot], show_progress=True) regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) gr.Markdown( """\ Note: This demo is governed by the original license of Qwen2-VL. \ We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, \ including hate speech, violence, pornography, deception, etc. \ (注:本演示受Qwen2-VL的许可协议限制。我们强烈建议,用户不应传播及不应允许他人传播以下内容,\ 包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息。)""" ) return demo