inie2003's picture
Update helper.py
7de098c verified
raw
history blame
9.07 kB
import numpy as np
from sentence_transformers import SentenceTransformer, util
from open_clip import create_model_from_pretrained, get_tokenizer
import torch
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity
import torch.nn as nn
import boto3
import streamlit as st
from PIL import Image
from PIL import ImageDraw
from io import BytesIO
import pandas as pd
from typing import List, Union
import concurrent.futures
# Initialize the model globally to avoid reloading each time
model, preprocess = create_model_from_pretrained('hf-hub:timm/ViT-SO400M-14-SigLIP-384')
tokenizer = get_tokenizer('hf-hub:timm/ViT-SO400M-14-SigLIP-384')
#what model do we use?
def encode_query(query: Union[str, Image.Image]) -> torch.Tensor:
"""
Encode the query using the OpenCLIP model.
Parameters
----------
query : Union[str, Image.Image]
The query, which can be a text string or an Image object.
Returns
-------
torch.Tensor
The encoded query vector.
"""
if isinstance(query, Image.Image):
query = preprocess(query).unsqueeze(0) # Preprocess the image and add batch dimension
with torch.no_grad():
query_embedding = model.encode_image(query) # Get image embedding
elif isinstance(query, str):
text = tokenizer(query, context_length=model.context_length)
with torch.no_grad():
query_embedding = model.encode_text(text) # Get text embedding
else:
raise ValueError("Query must be either a string or an Image.")
return query_embedding
def load_dataset_with_limit(dataset_name, dataset_subset, search_in_small_objects,limit=1000):
"""
Load a dataset from Hugging Face and limit the number of rows.
"""
if search_in_small_objects:
split = f'Splits_{dataset_subset}'
else:
split = f'Main_{dataset_subset}'
dataset_name = f"quasara-io/{dataset_name}"
dataset = load_dataset(dataset_name, split=split)
total_rows = dataset.num_rows
# Convert to DataFrame and sample if limit is provided
if limit is not None:
df = dataset.to_pandas().sample(n=limit, random_state=42)
else:
df = dataset.to_pandas()
return df,total_rows
def get_image_vectors(df):
# Get the image vectors from the dataframe
image_vectors = np.vstack(df['Vector'].to_numpy())
return torch.tensor(image_vectors, dtype=torch.float32)
def search(query, df, limit, search_in_images = True):
if search_in_images:
# Encode the image query
query_vector = encode_query(query)
# Get the image vectors from the dataframe
image_vectors = get_image_vectors(df)
# Calculate the cosine similarity between the query vector and each image vector
query_vector = query_vector[0, :].detach().numpy() # Detach and convert to a NumPy array
image_vectors = image_vectors.detach().numpy() # Convert the image vectors to a NumPy array
cosine_similarities = cosine_similarity([query_vector], image_vectors)
# Get the top K indices of the most similar image vectors
top_k_indices = np.argsort(-cosine_similarities[0])[:limit]
# Return the top K indices
return top_k_indices
#Try Batch Search
def batch_search(query, df, batch_size=100000, limit=10):
top_k_indices = []
# Get the image vectors from the dataframe and ensure they are NumPy arrays
vectors = get_image_vectors(df).numpy() # Convert to NumPy array if it's a tensor
# Encode the query and ensure it's a NumPy array
query_vector = encode_query(query)[0].detach().numpy() # Assuming the first element is the query embedding
# Iterate over the batches and compute cosine similarities
for i in range(0, len(vectors), batch_size):
batch_vectors = vectors[i:i + batch_size] # Extract a batch of vectors
# Compute cosine similarity between the query vector and the batch
batch_similarities = cosine_similarity([query_vector], batch_vectors)
# Get the top-k similar vectors within this batch
top_k_indices.extend(np.argsort(-batch_similarities[0])[:limit])
return top_k_indices
def get_file_paths(df, top_k_indices, column_name = 'File_Path'):
"""
Retrieve the file paths (or any specific column) from the DataFrame using the top K indices.
Parameters:
- df: pandas DataFrame containing the data
- top_k_indices: numpy array of the top K indices
- column_name: str, the name of the column to fetch (e.g., 'ImagePath')
Returns:
- top_k_paths: list of file paths or values from the specified column
"""
# Fetch the specific column corresponding to the top K indices
top_k_paths = df.iloc[top_k_indices][column_name].tolist()
return top_k_paths
def get_cordinates(df, top_k_indices, column_name = 'Coordinate'):
"""
Retrieve the file paths (or any specific column) from the DataFrame using the top K indices.
Parameters:
- df: pandas DataFrame containing the data
- top_k_indices: numpy array of the top K indices
- column_name: str, the name of the column to fetch (e.g., 'ImagePath')
Returns:
- top_k_paths: list of file paths or values from the specified column
"""
# Fetch the specific column corresponding to the top K indices
top_k_paths = df.iloc[top_k_indices][column_name].tolist()
return top_k_paths
def get_images_from_s3_to_display(bucket_name, file_paths, AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, folder_name):
"""
Retrieve and display images from AWS S3 in a Streamlit app.
Parameters:
- bucket_name: str, the name of the S3 bucket
- file_paths: list, a list of file paths to retrieve from S3
Returns:
- None (directly displays images in the Streamlit app)
"""
# Initialize S3 client
s3 = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
# Iterate over file paths and display each image
for file_path in file_paths:
# Retrieve the image from S3
s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}")
img_data = s3_object['Body'].read()
# Open the image using PIL and display it using Streamlit
img = Image.open(BytesIO(img_data))
st.image(img, caption=file_path, use_column_width=True)
def get_images_with_bounding_boxes_from_s3(bucket_name, file_paths, bounding_boxes, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, folder_name):
"""
Retrieve and display images from AWS S3 with corresponding bounding boxes in a Streamlit app.
Parameters:
- bucket_name: str, the name of the S3 bucket
- file_paths: list, a list of file paths to retrieve from S3
- bounding_boxes: list of numpy arrays or lists, each containing coordinates of bounding boxes (in the form [x_min, y_min, x_max, y_max])
- AWS_ACCESS_KEY_ID: str, AWS access key ID for authentication
- AWS_SECRET_ACCESS_KEY: str, AWS secret access key for authentication
- folder_name: str, the folder prefix in S3 bucket where the images are stored
Returns:
- None (directly displays images in the Streamlit app with bounding boxes)
"""
# Initialize S3 client
s3 = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
# Iterate over file paths and corresponding bounding boxes
for file_path, box_coords in zip(file_paths, bounding_boxes):
# Retrieve the image from S3
s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}")
img_data = s3_object['Body'].read()
# Open the image using PIL
img = Image.open(BytesIO(img_data))
# Draw bounding boxes on the image
draw = ImageDraw.Draw(img)
# Ensure box_coords is iterable, in case it's a single numpy array or float value
if isinstance(box_coords, (np.ndarray, list)):
# Check if we have multiple bounding boxes or a single one
if len(box_coords) > 0 and isinstance(box_coords[0], (np.ndarray, list)):
# Multiple bounding boxes
for box in box_coords:
x_min, y_min, x_max, y_max = map(int, box) # Convert to integers
draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3)
else:
# Single bounding box
x_min, y_min, x_max, y_max = map(int, box_coords)
draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3)
else:
raise ValueError(f"Bounding box data for {file_path} is not in an iterable format.")
# Display the image with bounding boxes using Streamlit
st.image(img, caption=file_path, use_column_width=True)