|
import math |
|
import re |
|
from statistics import median |
|
from bs4 import BeautifulSoup |
|
from langchain.docstore.document import Document |
|
from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader |
|
|
|
deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip() |
|
|
|
|
|
def process_documents(urls): |
|
snippets = [] |
|
for url in urls: |
|
if url.endswith(".pdf"): |
|
snippets.extend(process_pdf(url)) |
|
else: |
|
snippets.extend(process_web(url)) |
|
for e, snippet in enumerate(snippets): |
|
snippet.metadata["chunk_id"] = e |
|
return snippets |
|
|
|
|
|
def process_pdf(url): |
|
data = PDFMinerPDFasHTMLLoader(url).load()[0] |
|
content = BeautifulSoup(data.page_content, "html.parser").find_all("div") |
|
snippets = get_pdf_snippets(content) |
|
filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4) |
|
median_font_size = math.ceil( |
|
median([font_size for _, font_size in filtered_snippets]) |
|
) |
|
semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size) |
|
document_snippets = [ |
|
Document( |
|
page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]), |
|
metadata={ |
|
"header": " ".join(snip[1]["header_text"].split()[:10]), |
|
"source_url": url, |
|
"source_type": "pdf", |
|
"chunk_id": i, |
|
}, |
|
) |
|
for i, snip in enumerate(semantic_snippets) |
|
] |
|
return document_snippets |
|
|
|
|
|
def get_pdf_snippets(content): |
|
current_font_size = None |
|
current_text = "" |
|
snippets = [] |
|
for cntnt in content: |
|
span = cntnt.find("span") |
|
if not span: |
|
continue |
|
style = span.get("style") |
|
if not style: |
|
continue |
|
font_size = re.findall("font-size:(\d+)px", style) |
|
if not font_size: |
|
continue |
|
font_size = int(font_size[0]) |
|
|
|
if not current_font_size: |
|
current_font_size = font_size |
|
if font_size == current_font_size: |
|
current_text += cntnt.text |
|
else: |
|
snippets.append((current_text, current_font_size)) |
|
current_font_size = font_size |
|
current_text = cntnt.text |
|
snippets.append((current_text, current_font_size)) |
|
return snippets |
|
|
|
|
|
def filter_pdf_snippets(content_list, new_line_threshold_ratio): |
|
filtered_list = [] |
|
for e, (content, font_size) in enumerate(content_list): |
|
newline_count = content.count("\n") |
|
total_chars = len(content) |
|
ratio = newline_count / total_chars |
|
if ratio <= new_line_threshold_ratio: |
|
filtered_list.append((content, font_size)) |
|
return filtered_list |
|
|
|
|
|
def get_pdf_semantic_snippets(filtered_snippets, median_font_size): |
|
semantic_snippets = [] |
|
current_header = None |
|
current_content = [] |
|
header_font_size = None |
|
content_font_sizes = [] |
|
|
|
for content, font_size in filtered_snippets: |
|
if font_size > median_font_size: |
|
if current_header is not None: |
|
metadata = { |
|
"header_font_size": header_font_size, |
|
"content_font_size": ( |
|
median(content_font_sizes) if content_font_sizes else None |
|
), |
|
"header_text": current_header, |
|
} |
|
semantic_snippets.append((current_content, metadata)) |
|
current_content = [] |
|
content_font_sizes = [] |
|
|
|
current_header = content |
|
header_font_size = font_size |
|
else: |
|
content_font_sizes.append(font_size) |
|
if current_content: |
|
current_content += " " + content |
|
else: |
|
current_content = content |
|
|
|
if current_header is not None: |
|
metadata = { |
|
"header_font_size": header_font_size, |
|
"content_font_size": ( |
|
median(content_font_sizes) if content_font_sizes else None |
|
), |
|
"header_text": current_header, |
|
} |
|
semantic_snippets.append((current_content, metadata)) |
|
return semantic_snippets |
|
|
|
|
|
def process_web(url): |
|
data = WebBaseLoader(url).load()[0] |
|
document_snippets = [ |
|
Document( |
|
page_content=deep_strip(data.page_content), |
|
metadata={ |
|
"header": data.metadata["title"], |
|
"source_url": url, |
|
"source_type": "web", |
|
"chunk_id": 0, |
|
}, |
|
) |
|
] |
|
return document_snippets |
|
|