Zeta / process_documents.py
Ritvik19's picture
Update process_documents.py
5d2b3d1 verified
raw
history blame
5.58 kB
import math
import re
from statistics import median
from bs4 import BeautifulSoup
from langchain.docstore.document import Document
from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader
import tiktoken
deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip()
def process_documents(urls):
snippets = []
documents = {}
for source_id, url in enumerate(urls):
snippet = (
process_pdf(url, source_id)
if url.endswith(".pdf")
else process_web(url, source_id)
)
snippets.extend(snippet)
documents[str(source_id)] = Document(
page_content="\n".join([snip.page_content for snip in snippet]),
metadata={
"source_url": url,
"source_type": "pdf" if url.endswith(".pdf") else "web",
"source_id": source_id,
"chunk_id": source_id,
},
)
for snip in snippet:
documents[snip.metadata["chunk_id"]] = snip
return snippets, documents
def process_web(url, source_id):
data = WebBaseLoader(f"https://r.jina.ai/{url}").load()[0]
try:
header = re.search(r"Title: (.*)?", data.page_content).group(1)
page_content = data.page_content[data.page_content.index("Markdown Content:") + len("Markdown Content:"):].strip()
except Exception as e:
header = ""
page_content = data.page_content.strip()
document_snippets = [
Document(
page_content=page_content,
metadata={
"header": header,
"source_url": url,
"source_type": "web",
"chunk_id": source_id,
"source_id": source_id,
},
)
]
return document_snippets
def process_pdf(url, source_id):
data = PDFMinerPDFasHTMLLoader(url).load()[0]
content = BeautifulSoup(data.page_content, "html.parser").find_all("div")
snippets = get_pdf_snippets(content)
filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4)
median_font_size = math.ceil(
median([font_size for _, font_size in filtered_snippets])
)
semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size)
document_snippets = [
Document(
page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]),
metadata={
"header": " ".join(snip[1]["header_text"].split()[:10]),
"source_url": url,
"source_type": "pdf",
"chunk_id": f"{source_id}_{i:02d}",
"source_id": source_id,
},
)
for i, snip in enumerate(semantic_snippets)
]
return document_snippets
def get_pdf_snippets(content):
current_font_size = None
current_text = ""
snippets = []
for cntnt in content:
span = cntnt.find("span")
if not span:
continue
style = span.get("style")
if not style:
continue
font_size = re.findall("font-size:(\d+)px", style)
if not font_size:
continue
font_size = int(font_size[0])
if not current_font_size:
current_font_size = font_size
if font_size == current_font_size:
current_text += cntnt.text
else:
snippets.append((current_text, current_font_size))
current_font_size = font_size
current_text = cntnt.text
snippets.append((current_text, current_font_size))
return snippets
def filter_pdf_snippets(content_list, new_line_threshold_ratio):
filtered_list = []
for e, (content, font_size) in enumerate(content_list):
newline_count = content.count("\n")
total_chars = len(content)
ratio = newline_count / total_chars
if ratio <= new_line_threshold_ratio:
filtered_list.append((content, font_size))
return filtered_list
def get_pdf_semantic_snippets(filtered_snippets, median_font_size):
semantic_snippets = []
current_header = None
current_content = []
header_font_size = None
content_font_sizes = []
for content, font_size in filtered_snippets:
if font_size > median_font_size:
if current_header is not None:
metadata = {
"header_font_size": header_font_size,
"content_font_size": (
median(content_font_sizes) if content_font_sizes else None
),
"header_text": current_header,
}
semantic_snippets.append((current_content, metadata))
current_content = []
content_font_sizes = []
current_header = content
header_font_size = font_size
else:
content_font_sizes.append(font_size)
if current_content:
current_content += " " + content
else:
current_content = content
if current_header is not None:
metadata = {
"header_font_size": header_font_size,
"content_font_size": (
median(content_font_sizes) if content_font_sizes else None
),
"header_text": current_header,
}
semantic_snippets.append((current_content, metadata))
return semantic_snippets
def num_tokens(string):
return len(tiktoken.get_encoding("cl100k_base").encode(string, disallowed_special=()))