finance_rag / app.py
poemsforaphrodite's picture
Create app.py
68c6779 verified
raw
history blame
9.08 kB
import os
import streamlit as st
from openai import OpenAI
from PyPDF2 import PdfReader
import requests
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound
from urllib.parse import urlparse, parse_qs
from pinecone import Pinecone
import uuid
from dotenv import load_dotenv
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from itertools import islice
load_dotenv()
# Set up OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Set up Pinecone
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
# Get index name and URL from .env
index_name = os.getenv("PINECONE_INDEX_NAME")
index_url = os.getenv("PINECONE_INDEX_URL")
index = pc.Index(index_name, url=index_url)
def get_embedding(text):
response = client.embeddings.create(input=text, model="text-embedding-3-large")
return response.data[0].embedding
def chunk_text(text, content_type):
if content_type == "YouTube":
return [text] # Return the entire text as a single chunk for YouTube
else: # Default for PDF and Web Link
chunk_size = 2000
content_length = len(text)
return [text[i:i+chunk_size] for i in range(0, content_length, chunk_size)]
def process_pdf(file):
reader = PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return chunk_text(text, "PDF")
def process_web_link(url):
response = requests.get(url)
return chunk_text(response.text, "Web")
def process_youtube_link(url):
try:
video_id = extract_video_id(url)
transcript = YouTubeTranscriptApi.get_transcript(video_id)
full_text = " ".join([entry['text'] for entry in transcript])
print("Transcript obtained from YouTube API")
return [full_text] # Return the full text as a single chunk
except NoTranscriptFound:
print("No transcript found for this YouTube video.")
return []
def extract_video_id(url):
parsed_url = urlparse(url)
if parsed_url.hostname == 'youtu.be':
return parsed_url.path[1:]
if parsed_url.hostname in ('www.youtube.com', 'youtube.com'):
if parsed_url.path == '/watch':
return parse_qs(parsed_url.query)['v'][0]
if parsed_url.path[:7] == '/embed/':
return parsed_url.path.split('/')[2]
if parsed_url.path[:3] == '/v/':
return parsed_url.path.split('/')[2]
return None
def process_upload(upload_type, file_or_link, file_name=None):
print(f"Starting process_upload for {upload_type}")
doc_id = str(uuid.uuid4())
print(f"Generated doc_id: {doc_id}")
if upload_type == "PDF":
chunks = process_pdf(file_or_link)
doc_name = file_name or "Uploaded PDF"
elif upload_type == "Web Link":
chunks = process_web_link(file_or_link)
doc_name = file_or_link
elif upload_type == "YouTube Link":
chunks = process_youtube_link(file_or_link)
doc_name = f"YouTube: {file_or_link}"
else:
print("Invalid upload type")
return "Invalid upload type"
vectors = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_chunk, chunk, doc_id, i, upload_type, doc_name) for i, chunk in enumerate(chunks)]
for future in as_completed(futures):
vectors.append(future.result())
# Update progress
progress = len(vectors) / len(chunks)
st.session_state.upload_progress.progress(progress)
print(f"Generated {len(vectors)} vectors")
# Upsert vectors in batches
batch_size = 100 # Adjust this value as needed
for i in range(0, len(vectors), batch_size):
batch = list(islice(vectors, i, i + batch_size))
index.upsert(vectors=batch)
print(f"Upserted batch {i//batch_size + 1} of {len(vectors)//batch_size + 1}")
print("All vectors upserted to Pinecone")
return f"Processing complete for {upload_type}. Document Name: {doc_name}"
def process_chunk(chunk, doc_id, i, upload_type, doc_name):
embedding = get_embedding(chunk)
return (f"{doc_id}_{i}", embedding, {
"text": chunk,
"type": upload_type,
"doc_id": doc_id,
"doc_name": doc_name,
"chunk_index": i
})
def get_relevant_context(query, top_k=5):
print(f"Getting relevant context for query: {query}")
query_embedding = get_embedding(query)
search_results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
print(f"Found {len(search_results['matches'])} relevant results")
# Sort results by similarity score (higher is better)
sorted_results = sorted(search_results['matches'], key=lambda x: x['score'], reverse=True)
context = "\n".join([result['metadata']['text'] for result in sorted_results])
return context, sorted_results
def chat_with_ai(message):
print(f"Chatting with AI, message: {message}")
context, results = get_relevant_context(message)
print(f"Retrieved context, length: {len(context)}")
messages = [
{"role": "system", "content": "You are a helpful assistant. Use the following information to answer the user's question, but don't mention the context directly in your response. If the information isn't in the context, say you don't know."},
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": message}
]
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
print("Received response from OpenAI")
ai_response = response.choices[0].message.content
# Prepare source information
sources = [
{
"doc_id": result['metadata']['doc_id'],
"doc_name": result['metadata']['doc_name'],
"chunk_index": result['metadata']['chunk_index'],
"text": result['metadata']['text'],
"type": result['metadata']['type'],
"score": result['score']
}
for result in results
]
return ai_response, sources
def clear_database():
print("Clearing database...")
index.delete(delete_all=True)
print("Database cleared")
return "Database cleared successfully."
# Streamlit UI
st.set_page_config(layout="wide")
st.title("Upload and Chat with PDFs, Web Links, and YouTube Videos")
# Create three columns
col1, col2, col3 = st.columns([1, 1, 1])
with col1:
st.header("Upload")
# PDF upload
uploaded_files = st.file_uploader("Choose one or more PDF files", type="pdf", accept_multiple_files=True)
# Web Link input
web_link = st.text_input("Enter a Web Link")
# YouTube Link input
youtube_link = st.text_input("Enter a YouTube Link")
if st.button("Process All"):
st.session_state.upload_progress = st.progress(0)
with st.spinner("Processing uploads..."):
results = []
if uploaded_files:
for file in uploaded_files:
pdf_result = process_upload("PDF", file, file.name)
results.append(pdf_result)
if web_link:
web_result = process_upload("Web Link", web_link)
results.append(web_result)
if youtube_link:
youtube_result = process_upload("YouTube Link", youtube_link)
results.append(youtube_result)
if results:
for result in results:
st.success(result)
else:
st.warning("No content uploaded. Please provide at least one input.")
st.session_state.upload_progress.empty()
if st.button("Clear Database"):
result = clear_database()
st.success(result)
with col2:
st.header("Chat")
user_input = st.text_input("Ask a question about the uploaded content:")
if st.button("Send"):
if user_input:
print(f"Sending user input: {user_input}")
st.session_state.chat_progress = st.progress(0)
response, sources = chat_with_ai(user_input)
st.session_state.chat_progress.progress(1.0)
st.markdown("**You:** " + user_input)
st.markdown("**AI:** " + response)
# Store sources in session state for display in col3
st.session_state.sources = sources
st.session_state.chat_progress.empty()
else:
print("Empty user input")
st.warning("Please enter a question.")
with col3:
st.header("Source Chunks")
if 'sources' in st.session_state and st.session_state.sources:
for i, source in enumerate(st.session_state.sources, 1):
with st.expander(f"Source {i} - {source['type']} ({source['doc_name']}) - Score: {source['score']}"):
st.markdown(f"**Chunk Index:** {source['chunk_index']}")
st.text(source['text'])
else:
st.info("Ask a question to see source chunks here.")