|
import gradio as gr |
|
from datasets import load_dataset |
|
import numpy as np |
|
from model2vec import StaticModel |
|
from reach import Reach |
|
from difflib import ndiff |
|
|
|
|
|
model = StaticModel.from_pretrained("minishlab/M2V_base_output") |
|
|
|
|
|
default_dataset_name = "sst2" |
|
default_dataset1_split = "train" |
|
default_dataset2_split = "test" |
|
default_text_column = "sentence" |
|
default_threshold = 0.9 |
|
|
|
def deduplicate_embeddings( |
|
embeddings_a: np.ndarray, |
|
embeddings_b: np.ndarray = None, |
|
threshold: float = 0.9, |
|
batch_size: int = 1024, |
|
progress=None |
|
) -> tuple[np.ndarray, dict[int, int]]: |
|
""" |
|
Deduplicate embeddings within one dataset or across two datasets. |
|
|
|
:param embeddings_a: Embeddings of Dataset 1. |
|
:param embeddings_b: Optional, embeddings of Dataset 2. |
|
:param threshold: Similarity threshold for deduplication. |
|
:param batch_size: Batch size for similarity computation. |
|
:param progress: Gradio progress tracker for feedback. |
|
:return: Deduplicated indices and a mapping of removed indices to their original counterparts. |
|
""" |
|
if embeddings_b is None: |
|
reach = Reach(vectors=embeddings_a, items=[str(i) for i in range(len(embeddings_a))]) |
|
duplicate_to_original = {} |
|
results = reach.nearest_neighbor_threshold( |
|
embeddings_a, threshold=threshold, batch_size=batch_size, show_progressbar=False |
|
) |
|
for i, similar_items in enumerate(progress.tqdm(results, desc="Processing duplicates", total=len(embeddings_a))): |
|
for sim_idx, _ in similar_items: |
|
sim_idx = int(sim_idx) |
|
if sim_idx != i and sim_idx not in duplicate_to_original: |
|
duplicate_to_original[sim_idx] = i |
|
deduplicated_indices = set(range(len(embeddings_a))) - set(duplicate_to_original.keys()) |
|
return deduplicated_indices, duplicate_to_original |
|
else: |
|
reach = Reach(vectors=embeddings_a, items=[str(i) for i in range(len(embeddings_a))]) |
|
duplicate_indices_in_b = [] |
|
duplicate_to_original = {} |
|
results = reach.nearest_neighbor_threshold( |
|
embeddings_b, threshold=threshold, batch_size=batch_size, show_progressbar=False |
|
) |
|
for i, similar_items in enumerate(progress.tqdm(results, desc="Processing duplicates", total=len(embeddings_b))): |
|
if similar_items: |
|
duplicate_indices_in_b.append(i) |
|
duplicate_to_original[i] = int(similar_items[0][0]) |
|
return duplicate_indices_in_b, duplicate_to_original |
|
|
|
def display_word_differences(x: str, y: str) -> str: |
|
""" |
|
Display the word-level differences between two texts, formatted to avoid |
|
misinterpretation of Markdown syntax. |
|
|
|
:param x: First text. |
|
:param y: Second text. |
|
:return: A string showing word-level differences, wrapped in a code block. |
|
""" |
|
diff = ndiff(x.split(), y.split()) |
|
formatted_diff = "\n".join(word for word in diff if word.startswith(("+", "-"))) |
|
return f"```\n{formatted_diff}\n```" |
|
|
|
def load_dataset_texts(dataset_name: str, dataset_split: str, text_column: str) -> list[str]: |
|
""" |
|
Load texts from a specified dataset and split. |
|
|
|
:param dataset_name: Name of the dataset. |
|
:param dataset_split: Split of the dataset (e.g., 'train', 'validation', 'test'). |
|
:param text_column: Name of the text column. |
|
:return: A list of texts from the dataset. |
|
""" |
|
ds = load_dataset(dataset_name, split=dataset_split) |
|
return [example[text_column] for example in ds] |
|
|
|
def perform_deduplication( |
|
deduplication_type: str, |
|
dataset1_name: str, |
|
dataset1_split: str, |
|
dataset1_text_column: str, |
|
dataset2_name: str = "", |
|
dataset2_split: str = "", |
|
dataset2_text_column: str = "", |
|
threshold: float = default_threshold, |
|
progress: gr.Progress = gr.Progress(track_tqdm=True) |
|
): |
|
""" |
|
Perform deduplication on one or two datasets based on the deduplication type. |
|
|
|
:param deduplication_type: 'Single dataset' or 'Cross-dataset'. |
|
:param dataset1_name: Name of the first dataset. |
|
:param dataset1_split: Split of the first dataset. |
|
:param dataset1_text_column: Text column of the first dataset. |
|
:param dataset2_name: Optional, name of the second dataset (for cross-dataset deduplication). |
|
:param dataset2_split: Optional, split of the second dataset. |
|
:param dataset2_text_column: Optional, text column of the second dataset. |
|
:param threshold: Similarity threshold for deduplication. |
|
:param progress: Gradio progress tracker. |
|
:return: Status updates and result text for the Gradio interface. |
|
""" |
|
try: |
|
threshold = float(threshold) |
|
|
|
|
|
yield "Loading Dataset 1...", "" |
|
texts1 = load_dataset_texts(dataset1_name, dataset1_split, dataset1_text_column) |
|
yield "Computing embeddings for Dataset 1...", "" |
|
embeddings1 = model.encode(texts1, show_progressbar=True) |
|
|
|
if deduplication_type == "Single dataset": |
|
|
|
yield "Deduplicating within Dataset 1...", "" |
|
deduplicated_indices, duplicate_mapping = deduplicate_embeddings( |
|
embeddings1, threshold=threshold, progress=progress |
|
) |
|
|
|
num_duplicates = len(duplicate_mapping) |
|
result_text = ( |
|
f"**Total documents:** {len(texts1)}\n\n" |
|
f"**Duplicates found:** {num_duplicates}\n\n" |
|
f"**Unique documents after deduplication:** {len(deduplicated_indices)}\n\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
|
|
if num_duplicates > 0: |
|
result_text += "**Sample duplicates:**\n\n" |
|
for dup_idx, orig_idx in list(duplicate_mapping.items())[:5]: |
|
orig_text = texts1[orig_idx] |
|
dup_text = texts1[dup_idx] |
|
differences = display_word_differences(orig_text, dup_text) |
|
result_text += ( |
|
f"**Original:**\n{orig_text}\n\n" |
|
f"**Duplicate:**\n{dup_text}\n\n" |
|
f"**Differences:**\n{differences}\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
else: |
|
result_text += "No duplicates found." |
|
|
|
yield "Deduplication completed.", result_text |
|
|
|
else: |
|
|
|
yield "Loading Dataset 2...", "" |
|
texts2 = load_dataset_texts(dataset2_name, dataset2_split, dataset2_text_column) |
|
yield "Computing embeddings for Dataset 2...", "" |
|
embeddings2 = model.encode(texts2, show_progressbar=True) |
|
|
|
|
|
yield "Deduplicating Dataset 2 against Dataset 1...", "" |
|
duplicate_indices, duplicate_mapping = deduplicate_embeddings( |
|
embeddings1, embeddings_b=embeddings2, threshold=threshold, progress=progress |
|
) |
|
|
|
num_duplicates = len(duplicate_indices) |
|
result_text = ( |
|
f"**Total documents in {dataset2_name}/{dataset2_split}:** {len(texts2)}\n\n" |
|
f"**Duplicates found in Dataset 2:** {num_duplicates}\n\n" |
|
f"**Unique documents after deduplication:** {len(texts2) - num_duplicates}\n\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
|
|
if num_duplicates > 0: |
|
result_text += "**Sample duplicates from Dataset 2:**\n\n" |
|
for idx in duplicate_indices[:5]: |
|
orig_text = texts1[duplicate_mapping[idx]] |
|
dup_text = texts2[idx] |
|
differences = display_word_differences(orig_text, dup_text) |
|
result_text += ( |
|
f"**Original (Dataset 1):**\n{orig_text}\n\n" |
|
f"**Duplicate (Dataset 2):**\n{dup_text}\n\n" |
|
f"**Differences:**\n{differences}\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
else: |
|
result_text += "No duplicates found." |
|
|
|
yield "Deduplication completed.", result_text |
|
|
|
except Exception as e: |
|
yield f"An error occurred: {e}", "" |
|
raise e |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Ocean(), css="#status_output { height: 50px; overflow: auto; }") as demo: |
|
gr.Markdown("# Semantic Deduplication") |
|
gr.Markdown(""" |
|
This demo showcases semantic deduplication using Model2Vec for HuggingFace datasets. |
|
It can be used to identify duplicate texts within a single dataset or across two datasets. |
|
You can adjust the similarity threshold to control the strictness of the deduplication.\n |
|
NOTE: this demo runs on a free CPU backend, so it may be slow for large datasets. For faster results, please run the code locally. |
|
""") |
|
|
|
deduplication_type = gr.Radio( |
|
choices=["Cross-dataset", "Single dataset"], |
|
label="Deduplication Type", |
|
value="Cross-dataset", |
|
) |
|
|
|
with gr.Row(): |
|
dataset1_name = gr.Textbox(value=default_dataset_name, label="Dataset 1 Name") |
|
dataset1_split = gr.Textbox(value=default_dataset1_split, label="Dataset 1 Split") |
|
dataset1_text_column = gr.Textbox(value=default_text_column, label="Text Column Name") |
|
|
|
dataset2_inputs = gr.Column(visible=True) |
|
with dataset2_inputs: |
|
with gr.Row(): |
|
dataset2_name = gr.Textbox(value=default_dataset_name, label="Dataset 2 Name") |
|
dataset2_split = gr.Textbox(value=default_dataset2_split, label="Dataset 2 Split") |
|
dataset2_text_column = gr.Textbox(value=default_text_column, label="Text Column Name") |
|
|
|
threshold = gr.Slider(0.0, 1.0, value=default_threshold, label="Similarity Threshold") |
|
|
|
with gr.Row(): |
|
compute_button = gr.Button("Deduplicate") |
|
|
|
status_output = gr.Markdown(elem_id="status_output") |
|
result_output = gr.Markdown() |
|
|
|
def update_visibility(choice: str): |
|
return gr.update(visible=choice == "Cross-dataset") |
|
|
|
deduplication_type.change(update_visibility, inputs=deduplication_type, outputs=dataset2_inputs) |
|
|
|
compute_button.click( |
|
fn=perform_deduplication, |
|
inputs=[ |
|
deduplication_type, |
|
dataset1_name, |
|
dataset1_split, |
|
dataset1_text_column, |
|
dataset2_name, |
|
dataset2_split, |
|
dataset2_text_column, |
|
threshold, |
|
], |
|
outputs=[status_output, result_output], |
|
) |
|
|
|
|
|
demo.launch() |
|
|