#!/usr/bin/env python from __future__ import annotations import os import string import gradio as gr import PIL.Image import spaces import torch from transformers import AutoProcessor, BitsAndBytesConfig, Blip2ForConditionalGeneration from transformers import BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_8bit=True, ) DESCRIPTION = "# [BLIP-2](https://github.com/salesforce/LAVIS/tree/main/projects/blip2)" if not torch.cuda.is_available(): DESCRIPTION += "\n
Running on CPU 🥶 This demo does not work on CPU.
" device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") MODEL_ID_OPT_2_7B = "Salesforce/blip2-opt-2.7b" MODEL_ID_OPT_6_7B = "Salesforce/blip2-opt-6.7b" MODEL_ID_FLAN_T5_XL = "Salesforce/blip2-flan-t5-xl" MODEL_ID_FLAN_T5_XXL = "Salesforce/blip2-flan-t5-xxl" MODEL_ID_FLAN_T5_XL_COCO = "Salesforce/blip2-flan-t5-xl-coco" MODEL_ID = MODEL_ID_FLAN_T5_XL_COCO assert MODEL_ID in [MODEL_ID_OPT_2_7B, MODEL_ID_OPT_6_7B, MODEL_ID_FLAN_T5_XL, MODEL_ID_FLAN_T5_XXL, MODEL_ID_FLAN_T5_XL_COCO] if torch.cuda.is_available(): processor = AutoProcessor.from_pretrained(MODEL_ID) model = Blip2ForConditionalGeneration.from_pretrained(MODEL_ID, device_map="auto", quantization_config=bnb_config) @spaces.GPU def generate_caption( image: PIL.Image.Image, decoding_method: str = "Nucleus sampling", temperature: float = 1.0, length_penalty: float = 1.0, repetition_penalty: float = 1.5, max_length: int = 50, min_length: int = 1, num_beams: int = 5, top_p: float = 0.9, ) -> str: inputs = processor(images=[image], return_tensors="pt").to(device, dtype=torch.float16) generated_ids = model.generate( pixel_values=inputs.pixel_values, do_sample=decoding_method == "Nucleus sampling", temperature=temperature, length_penalty=length_penalty, repetition_penalty=repetition_penalty, max_length=max_length, min_length=min_length, num_beams=num_beams, top_p=top_p, ) result = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() return result @spaces.GPU def generate_captions( images: list[PIL.Image.Image], decoding_method: str = "Nucleus sampling", temperature: float = 1.0, length_penalty: float = 1.0, repetition_penalty: float = 1.5, max_length: int = 50, min_length: int = 1, num_beams: int = 5, top_p: float = 0.9, ) -> list[str]: inputs = processor(images=images, return_tensors="pt").to(device, dtype=torch.float16) generated_ids = model.generate( pixel_values=inputs.pixel_values, do_sample=decoding_method == "Nucleus sampling", temperature=temperature, length_penalty=length_penalty, repetition_penalty=repetition_penalty, max_length=max_length, min_length=min_length, num_beams=num_beams, top_p=top_p, ) results = processor.batch_decode(generated_ids, skip_special_tokens=True) return [result.strip() for result in results] @spaces.GPU def answer_question( image: PIL.Image.Image, prompt: str, decoding_method: str = "Nucleus sampling", temperature: float = 1.0, length_penalty: float = 1.0, repetition_penalty: float = 1.5, max_length: int = 50, min_length: int = 1, num_beams: int = 5, top_p: float = 0.9, ) -> str: inputs = processor(images=[image], text=prompt, return_tensors="pt").to(device, dtype=torch.float16) generated_ids = model.generate( **inputs, do_sample=decoding_method == "Nucleus sampling", temperature=temperature, length_penalty=length_penalty, repetition_penalty=repetition_penalty, max_length=max_length, min_length=min_length, num_beams=num_beams, top_p=top_p, ) result = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() return result def postprocess_output(output: str) -> str: if output and output[-1] not in string.punctuation: output += "." return output def chat( image: PIL.Image.Image, text: str, decoding_method: str = "Nucleus sampling", temperature: float = 1.0, length_penalty: float = 1.0, repetition_penalty: float = 1.5, max_length: int = 50, min_length: int = 1, num_beams: int = 5, top_p: float = 0.9, history_orig: list[str] = [], history_qa: list[str] = [], ) -> tuple[list[tuple[str, str]], list[str], list[str]]: history_orig.append(text) text_qa = f"Question: {text} Answer:" history_qa.append(text_qa) prompt = " ".join(history_qa) output = answer_question( image=image, prompt=prompt, decoding_method=decoding_method, temperature=temperature, length_penalty=length_penalty, repetition_penalty=repetition_penalty, max_length=max_length, min_length=min_length, num_beams=num_beams, top_p=top_p, ) output = postprocess_output(output) history_orig.append(output) history_qa.append(output) chat_val = list(zip(history_orig[0::2], history_orig[1::2])) return chat_val, history_orig, history_qa examples = [ [ "images/house.png", "How could someone get out of the house?", ], [ "images/flower.jpg", "What is this flower and where is it's origin?", ], [ "images/pizza.jpg", "What are steps to cook it?", ], [ "images/sunset.jpg", "Here is a romantic message going along the photo:", ], [ "images/forbidden_city.webp", "In what dynasties was this place built?", ], ] with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) gr.DuplicateButton( value="Duplicate Space for private use", elem_id="duplicate-button", visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", ) with gr.Tabs(): with gr.Tab(label="Single Image"): with gr.Group(): image = gr.Image(type="pil") with gr.Tabs(): with gr.Tab(label="Image Captioning"): caption_button = gr.Button("Caption it!") caption_output = gr.Textbox(label="Caption Output", show_label=False, container=False) with gr.Tab(label="Visual Question Answering"): chatbot = gr.Chatbot(label="VQA Chat", show_label=False) history_orig = gr.State(value=[]) history_qa = gr.State(value=[]) vqa_input = gr.Text(label="Chat Input", show_label=False, max_lines=1, container=False) with gr.Row(): clear_chat_button = gr.Button("Clear") chat_button = gr.Button("Submit", variant="primary") with gr.Accordion(label="Advanced settings", open=False): text_decoding_method = gr.Radio( label="Text Decoding Method", choices=["Beam search", "Nucleus sampling"], value="Nucleus sampling", ) temperature = gr.Slider( label="Temperature", info="Used with nucleus sampling.", minimum=0.5, maximum=1.0, step=0.1, value=1.0, ) length_penalty = gr.Slider( label="Length Penalty", info="Set to larger for longer sequence, used with beam search.", minimum=-1.0, maximum=2.0, step=0.2, value=1.0, ) repetition_penalty = gr.Slider( label="Repetition Penalty", info="Larger value prevents repetition.", minimum=1.0, maximum=5.0, step=0.5, value=1.5, ) max_length = gr.Slider( label="Max Length", minimum=20, maximum=512, step=1, value=50, ) min_length = gr.Slider( label="Minimum Length", minimum=1, maximum=100, step=1, value=1, ) num_beams = gr.Slider( label="Number of Beams", minimum=1, maximum=10, step=1, value=5, ) top_p = gr.Slider( label="Top P", info="Used with nucleus sampling.", minimum=0.5, maximum=1.0, step=0.1, value=0.9, ) with gr.Tab(label="Batch Image"): with gr.Group(): batch_images = gr.Files(label="Batch Process", interactive=True, elem_id="extras_image_batch") with gr.Tabs(): with gr.Tab(label="Image Captioning"): batch_caption_button = gr.Button("Caption it!") batch_caption_output = gr.JSON(label="Caption Output") with gr.Accordion(label="Advanced settings", open=False): text_decoding_method = gr.Radio( label="Text Decoding Method", choices=["Beam search", "Nucleus sampling"], value="Nucleus sampling", ) temperature = gr.Slider( label="Temperature", info="Used with nucleus sampling.", minimum=0.5, maximum=1.0, step=0.1, value=1.0, ) length_penalty = gr.Slider( label="Length Penalty", info="Set to larger for longer sequence, used with beam search.", minimum=-1.0, maximum=2.0, step=0.2, value=1.0, ) repetition_penalty = gr.Slider( label="Repetition Penalty", info="Larger value prevents repetition.", minimum=1.0, maximum=5.0, step=0.5, value=1.5, ) max_length = gr.Slider( label="Max Length", minimum=20, maximum=512, step=1, value=50, ) min_length = gr.Slider( label="Minimum Length", minimum=1, maximum=100, step=1, value=1, ) num_beams = gr.Slider( label="Number of Beams", minimum=1, maximum=10, step=1, value=5, ) top_p = gr.Slider( label="Top P", info="Used with nucleus sampling.", minimum=0.5, maximum=1.0, step=0.1, value=0.9, ) gr.Examples( examples=examples, inputs=[image, vqa_input], outputs=caption_output, fn=generate_caption, ) caption_button.click( fn=generate_caption, inputs=[ image, text_decoding_method, temperature, length_penalty, repetition_penalty, max_length, min_length, num_beams, top_p, ], outputs=caption_output, api_name="caption", ) batch_caption_button.click( fn=generate_captions, inputs=[ batch_images, text_decoding_method, temperature, length_penalty, repetition_penalty, max_length, min_length, num_beams, top_p, ], outputs=batch_caption_output, api_name="caption", ) chat_inputs = [ image, vqa_input, text_decoding_method, temperature, length_penalty, repetition_penalty, max_length, min_length, num_beams, top_p, history_orig, history_qa, ] chat_outputs = [ chatbot, history_orig, history_qa, ] vqa_input.submit( fn=chat, inputs=chat_inputs, outputs=chat_outputs, ).success( fn=lambda: "", outputs=vqa_input, queue=False, api_name=False, ) chat_button.click( fn=chat, inputs=chat_inputs, outputs=chat_outputs, api_name="chat", ).success( fn=lambda: "", outputs=vqa_input, queue=False, api_name=False, ) clear_chat_button.click( fn=lambda: ("", [], [], []), inputs=None, outputs=[ vqa_input, chatbot, history_orig, history_qa, ], queue=False, api_name="clear", ) image.change( fn=lambda: ("", [], [], []), inputs=None, outputs=[ caption_output, chatbot, history_orig, history_qa, ], queue=False, ) if __name__ == "__main__": demo.queue(max_size=10).launch()