import os import pandas as pd import json import pickle import pprint import textwrap import time from tqdm.autonotebook import tqdm from pinecone import Pinecone, ServerlessSpec pc = Pinecone(api_key=os.getenv('PINECONE_API_KEY')) index_name = "prorata-postman-ds-128-v2" index = pc.Index(index_name) from openai import OpenAI openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) def get_embedding(text, model="text-embedding-3-small"): text = text.replace("\n", " ") return openai_client.embeddings.create(input = [text], model=model).data[0].embedding from transformers import AutoTokenizer, AutoModel # Load the tokenizer and the model tokenizer = AutoTokenizer.from_pretrained("colbert-ir/colbertv2.0") model = AutoModel.from_pretrained("colbert-ir/colbertv2.0") with open('colbertv2_pc_128_d.pkl', 'rb') as f: colbertv2_pc_128_d = pickle.load(f) version_notes = colbertv2_pc_128_d['version_notes'] chunkid_to_colbertv2 = colbertv2_pc_128_d['chunkid_to_colbertv2'] import torch # Function to compute MaxSim def maxsim(query_embedding, document_embedding): # Expand dimensions for broadcasting # Query: [batch_size, query_length, embedding_size] -> [batch_size, query_length, 1, embedding_size] # Document: [batch_size, doc_length, embedding_size] -> [batch_size, 1, doc_length, embedding_size] expanded_query = query_embedding.unsqueeze(2) expanded_doc = document_embedding.unsqueeze(1) # Compute cosine similarity across the embedding dimension sim_matrix = torch.nn.functional.cosine_similarity(expanded_query, expanded_doc, dim=-1) # Take the maximum similarity for each query token (across all document tokens) # sim_matrix shape: [batch_size, query_length, doc_length] max_sim_scores, _ = torch.max(sim_matrix, dim=2) # Average these maximum scores across all query tokens avg_max_sim = torch.mean(max_sim_scores, dim=1) return avg_max_sim def get_matches_reranked(q, k=20): matches = index.query(vector=get_embedding(q), top_k=k, include_metadata=True)['matches'] q_encoding = tokenizer(q, return_tensors='pt') q_embedding = model(**q_encoding).last_hidden_state.mean(dim=1) # Calculate MaxSim scores for match in matches: score = maxsim(q_embedding.unsqueeze(0), chunkid_to_colbertv2[match['id']]) match['colbertv2_score'] = score.item() matches_colbertv2 = sorted(matches, key=lambda x: x['colbertv2_score'], reverse=True) return matches_colbertv2 def filter_matches(matches_colbertv2, score_thr=0.0): matches_colbertv2_f = [] url_to_chunk_l = {} for idx, match in enumerate(matches_colbertv2): if match['colbertv2_score'] > score_thr: _url = match['metadata']['url'] if not _url in url_to_chunk_l: url_to_chunk_l[_url] = [] url_to_chunk_l[_url].append(match) matches_colbertv2_f.append(match) return matches_colbertv2_f style_str = """ """ import gradio as gr from io import StringIO from urllib.parse import urlparse def output_chunks_reranked(msg): matches_colbertv2 = get_matches_reranked(msg, k=20) matches_colbertv2 = filter_matches(matches_colbertv2, score_thr=0.55) _out = StringIO() if not matches_colbertv2: print(style_str, file=_out) print(f"