import multiprocessing import time import gradio as gr import pandas as pd from distilabel.distiset import Distiset from huggingface_hub import whoami from src.distilabel_dataset_generator.pipelines.sft import ( DEFAULT_DATASET, DEFAULT_DATASET_DESCRIPTION, DEFAULT_SYSTEM_PROMPT, MODEL, PROMPT_CREATION_PROMPT, get_pipeline, get_prompt_generation_step, ) def _run_pipeline(result_queue, num_turns, num_rows, system_prompt): pipeline = get_pipeline( num_turns, num_rows, system_prompt, ) distiset: Distiset = pipeline.run(use_cache=False) result_queue.put(distiset) def generate_system_prompt(dataset_description, progress=gr.Progress()): progress(0.1, desc="Initializing text generation") generate_description = get_prompt_generation_step() progress(0.4, desc="Loading model") generate_description.load() progress(0.7, desc="Generating system prompt") result = next( generate_description.process( [ { "system_prompt": PROMPT_CREATION_PROMPT, "instruction": dataset_description, } ] ) )[0]["generation"] progress(1.0, desc="System prompt generated") return result def generate_sample_dataset(system_prompt, progress=gr.Progress()): progress(0.1, desc="Initializing sample dataset generation") result = generate_dataset(system_prompt, num_turns=1, num_rows=2, progress=progress) progress(1.0, desc="Sample dataset generated") return result def generate_dataset( system_prompt, num_turns=1, num_rows=5, private=True, repo_id=None, token=None, progress=gr.Progress(), ): if repo_id is not None: if not repo_id: raise gr.Error("Please provide a dataset name to push the dataset to.") try: whoami(token=token) except Exception: raise gr.Error( "Provide a Hugging Face to be able to push the dataset to the Hub." ) if num_turns > 4: num_turns = 4 gr.Info("You can only generate a dataset with 4 or fewer turns. Setting to 4.") if num_rows > 5000: num_rows = 5000 gr.Info( "You can only generate a dataset with 5000 or fewer rows. Setting to 5000." ) if num_rows < 50: duration = 60 elif num_rows < 250: duration = 300 elif num_rows < 1000: duration = 500 else: duration = 1000 gr.Info( "Dataset generation started. This might take a while. Don't close the page.", duration=duration, ) result_queue = multiprocessing.Queue() p = multiprocessing.Process( target=_run_pipeline, args=(result_queue, num_turns, num_rows, system_prompt), ) try: p.start() total_steps = 100 for step in range(total_steps): if not p.is_alive() or p._popen.poll() is not None: break progress( (step + 1) / total_steps, desc=f"Generating dataset with {num_rows} rows", ) time.sleep(0.5) # Adjust this value based on your needs p.join() except Exception as e: raise gr.Error(f"An error occurred during dataset generation: {str(e)}") distiset = result_queue.get() if repo_id is not None: progress(0.95, desc="Pushing dataset to Hugging Face Hub.") distiset.push_to_hub( repo_id=repo_id, private=private, include_script=False, token=token, ) gr.Info( f'Dataset pushed to Hugging Face Hub: https://huggingface.co/datasets/{repo_id}' ) # If not pushing to hub generate the dataset directly distiset = distiset["default"]["train"] if num_turns == 1: outputs = distiset.to_pandas()[["prompt", "completion"]] else: outputs = distiset.to_pandas()[["messages"]] progress(1.0, desc="Dataset generation completed") return pd.DataFrame(outputs) def generate_pipeline_code(system_prompt): code = f""" from distilabel.pipeline import Pipeline from distilabel.steps import KeepColumns from distilabel.steps.tasks import MagpieGenerator from distilabel.llms import InferenceEndpointsLLM MODEL = "{MODEL}" SYSTEM_PROMPT = "{system_prompt}" # increase this to generate multi-turn conversations NUM_TURNS = 1 # increase this to generate a larger dataset NUM_ROWS = 100 with Pipeline(name="sft") as pipeline: magpie = MagpieGenerator( llm=InferenceEndpointsLLM( model_id=MODEL, tokenizer_id=MODEL, magpie_pre_query_template="llama3", generation_kwargs={{ "temperature": 0.8, "do_sample": True, "max_new_tokens": 2048, "stop_sequences": [ "<|eot_id|>", "<|end_of_text|>", "<|start_header_id|>", "<|end_header_id|>", "assistant", ], }} ), n_turns=NUM_TURNS, num_rows=NUM_ROWS, system_prompt=SYSTEM_PROMPT, ) if __name__ == "__main__": distiset = pipeline.run() """ return code def update_pipeline_code(system_prompt): return generate_pipeline_code(system_prompt) with gr.Blocks( title="⚗️ Distilabel Dataset Generator", head="⚗️ Distilabel Dataset Generator", ) as app: gr.Markdown("## Iterate on a sample dataset") dataset_description = gr.TextArea( label="Provide a description of the dataset", value=DEFAULT_DATASET_DESCRIPTION, ) with gr.Row(): gr.Column(scale=1) btn_generate_system_prompt = gr.Button(value="Generate sample dataset") gr.Column(scale=1) system_prompt = gr.TextArea( label="If you want to improve the dataset, you can tune the system prompt and regenerate the sample", value=DEFAULT_SYSTEM_PROMPT, ) with gr.Row(): gr.Column(scale=1) btn_generate_sample_dataset = gr.Button( value="Regenerate sample dataset", ) gr.Column(scale=1) with gr.Row(): table = gr.DataFrame( value=DEFAULT_DATASET, interactive=False, wrap=True, ) result = btn_generate_system_prompt.click( fn=generate_system_prompt, inputs=[dataset_description], outputs=[system_prompt], show_progress=True, ).then( fn=generate_sample_dataset, inputs=[system_prompt], outputs=[table], show_progress=True, ) btn_generate_sample_dataset.click( fn=generate_sample_dataset, inputs=[system_prompt], outputs=[table], show_progress=True, ) # Add a header for the full dataset generation section gr.Markdown("## Generate full dataset") gr.Markdown( "Once you're satisfied with the sample, generate a larger dataset and push it to the hub. Get a Hugging Face token with write access to the organization you want to push the dataset to." ) with gr.Column() as push_to_hub_ui: with gr.Row(variant="panel"): num_turns = gr.Number( value=1, label="Number of turns in the conversation", minimum=1, maximum=4, step=1, info="Choose between 1 (single turn with 'instruction-response' columns) and 2-4 (multi-turn conversation with a 'conversation' column).", ) num_rows = gr.Number( value=100, label="Number of rows in the dataset", minimum=1, maximum=5000, info="The number of rows in the dataset. Note that you are able to generate more rows at once but that this will take time.", ) with gr.Row(variant="panel"): hf_token = gr.Textbox(label="HF token", type="password") repo_id = gr.Textbox(label="HF repo ID", placeholder="owner/dataset_name") private = gr.Checkbox(label="Private dataset", value=True, interactive=True) btn_generate_full_dataset = gr.Button( value="⚗️ Generate Full Dataset", variant="primary" ) # Add this line here, before the button click event success_message = gr.Markdown(visible=False) def show_success_message(repo_id_value): return gr.update( value=f"""

Dataset Published Successfully!

Your dataset is now available at: https://huggingface.co/datasets/{repo_id_value}

""", visible=True, ) btn_generate_full_dataset.click( fn=generate_dataset, inputs=[system_prompt, num_turns, num_rows, private, repo_id, hf_token], outputs=[table], show_progress=True, ).then(fn=show_success_message, inputs=[repo_id], outputs=[success_message]) gr.Markdown("## Or run this pipeline locally with distilabel") with gr.Accordion("Run this pipeline on Distilabel", open=False): pipeline_code = gr.Code(language="python", label="Distilabel Pipeline Code") system_prompt.change( fn=update_pipeline_code, inputs=[system_prompt], outputs=[pipeline_code], )