import numpy as np from sentence_transformers import SentenceTransformer, util from open_clip import create_model_from_pretrained, get_tokenizer import torch from datasets import load_dataset from sklearn.metrics.pairwise import cosine_similarity import torch.nn as nn import boto3 import streamlit as st from PIL import Image from PIL import ImageDraw from io import BytesIO import pandas as pd from typing import List, Union import concurrent.futures # Initialize the model globally to avoid reloading each time model, preprocess = create_model_from_pretrained('hf-hub:timm/ViT-SO400M-14-SigLIP-384') tokenizer = get_tokenizer('hf-hub:timm/ViT-SO400M-14-SigLIP-384') #what model do we use? def encode_query(query: Union[str, Image.Image]) -> torch.Tensor: """ Encode the query using the OpenCLIP model. Parameters ---------- query : Union[str, Image.Image] The query, which can be a text string or an Image object. Returns ------- torch.Tensor The encoded query vector. """ if isinstance(query, Image.Image): query = preprocess(query).unsqueeze(0) # Preprocess the image and add batch dimension with torch.no_grad(): query_embedding = model.encode_image(query) # Get image embedding elif isinstance(query, str): text = tokenizer(query, context_length=model.context_length) with torch.no_grad(): query_embedding = model.encode_text(text) # Get text embedding else: raise ValueError("Query must be either a string or an Image.") return query_embedding def load_dataset_with_limit(dataset_name, dataset_subset, search_in_small_objects,limit=1000): """ Load a dataset from Hugging Face and limit the number of rows. """ if search_in_small_objects: split = f'Splits_{dataset_subset}' else: split = f'Main_{dataset_subset}' dataset_name = f"quasara-io/{dataset_name}" dataset = load_dataset(dataset_name, split=split) total_rows = dataset.num_rows # Convert to DataFrame and sample if limit is provided if limit is not None: df = dataset.to_pandas().sample(n=limit, random_state=42) else: df = dataset.to_pandas() return df,total_rows def get_image_vectors(df): # Get the image vectors from the dataframe image_vectors = np.vstack(df['Vector'].to_numpy()) return torch.tensor(image_vectors, dtype=torch.float32) def search(query, df, limit, search_in_images = True): if search_in_images: # Encode the image query query_vector = encode_query(query) # Get the image vectors from the dataframe image_vectors = get_image_vectors(df) # Calculate the cosine similarity between the query vector and each image vector query_vector = query_vector[0, :].detach().numpy() # Detach and convert to a NumPy array image_vectors = image_vectors.detach().numpy() # Convert the image vectors to a NumPy array cosine_similarities = cosine_similarity([query_vector], image_vectors) # Get the top K indices of the most similar image vectors top_k_indices = np.argsort(-cosine_similarities[0])[:limit] # Return the top K indices return top_k_indices #Try Batch Search def batch_search(query, df, batch_size=100000, limit=10): top_k_indices = [] # Get the image vectors from the dataframe and ensure they are NumPy arrays vectors = get_image_vectors(df).numpy() # Convert to NumPy array if it's a tensor # Encode the query and ensure it's a NumPy array query_vector = encode_query(query)[0].detach().numpy() # Assuming the first element is the query embedding # Iterate over the batches and compute cosine similarities for i in range(0, len(vectors), batch_size): batch_vectors = vectors[i:i + batch_size] # Extract a batch of vectors # Compute cosine similarity between the query vector and the batch batch_similarities = cosine_similarity([query_vector], batch_vectors) # Get the top-k similar vectors within this batch top_k_indices.extend(np.argsort(-batch_similarities[0])[:limit]) return top_k_indices def get_file_paths(df, top_k_indices, column_name = 'File_Path'): """ Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. Parameters: - df: pandas DataFrame containing the data - top_k_indices: numpy array of the top K indices - column_name: str, the name of the column to fetch (e.g., 'ImagePath') Returns: - top_k_paths: list of file paths or values from the specified column """ # Fetch the specific column corresponding to the top K indices top_k_paths = df.iloc[top_k_indices][column_name].tolist() return top_k_paths def get_cordinates(df, top_k_indices, column_name = 'Coordinate'): """ Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. Parameters: - df: pandas DataFrame containing the data - top_k_indices: numpy array of the top K indices - column_name: str, the name of the column to fetch (e.g., 'ImagePath') Returns: - top_k_paths: list of file paths or values from the specified column """ # Fetch the specific column corresponding to the top K indices top_k_paths = df.iloc[top_k_indices][column_name].tolist() return top_k_paths def get_images_from_s3_to_display(bucket_name, file_paths, AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, folder_name): """ Retrieve and display images from AWS S3 in a Streamlit app. Parameters: - bucket_name: str, the name of the S3 bucket - file_paths: list, a list of file paths to retrieve from S3 Returns: - None (directly displays images in the Streamlit app) """ # Initialize S3 client s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) # Iterate over file paths and display each image for file_path in file_paths: # Retrieve the image from S3 s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") img_data = s3_object['Body'].read() # Open the image using PIL and display it using Streamlit img = Image.open(BytesIO(img_data)) st.image(img, caption=file_path, use_column_width=True) def get_images_with_bounding_boxes_from_s3(bucket_name, file_paths, bounding_boxes, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, folder_name): """ Retrieve and display images from AWS S3 with corresponding bounding boxes in a Streamlit app. Parameters: - bucket_name: str, the name of the S3 bucket - file_paths: list, a list of file paths to retrieve from S3 - bounding_boxes: list of numpy arrays or lists, each containing coordinates of bounding boxes (in the form [x_min, y_min, x_max, y_max]) - AWS_ACCESS_KEY_ID: str, AWS access key ID for authentication - AWS_SECRET_ACCESS_KEY: str, AWS secret access key for authentication - folder_name: str, the folder prefix in S3 bucket where the images are stored Returns: - None (directly displays images in the Streamlit app with bounding boxes) """ # Initialize S3 client s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) # Iterate over file paths and corresponding bounding boxes for file_path, box_coords in zip(file_paths, bounding_boxes): # Retrieve the image from S3 s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") img_data = s3_object['Body'].read() # Open the image using PIL img = Image.open(BytesIO(img_data)) # Draw bounding boxes on the image draw = ImageDraw.Draw(img) # Ensure box_coords is iterable, in case it's a single numpy array or float value if isinstance(box_coords, (np.ndarray, list)): # Check if we have multiple bounding boxes or a single one if len(box_coords) > 0 and isinstance(box_coords[0], (np.ndarray, list)): # Multiple bounding boxes for box in box_coords: x_min, y_min, x_max, y_max = map(int, box) # Convert to integers draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) else: # Single bounding box x_min, y_min, x_max, y_max = map(int, box_coords) draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) else: raise ValueError(f"Bounding box data for {file_path} is not in an iterable format.") # Display the image with bounding boxes using Streamlit st.image(img, caption=file_path, use_column_width=True)