import math import re from statistics import median from bs4 import BeautifulSoup from langchain.docstore.document import Document from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader import tiktoken deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip() def process_documents(urls): snippets = [] documents = {} for source_id, url in enumerate(urls): snippet = ( process_pdf(url, source_id) if url.endswith(".pdf") else process_web(url, source_id) ) snippets.extend(snippet) documents[str(source_id)] = Document( page_content="\n".join([snip.page_content for snip in snippet]), metadata={ "source_url": url, "source_type": "pdf" if url.endswith(".pdf") else "web", "source_id": source_id, "chunk_id": source_id, }, ) for snip in snippet: documents[snip.metadata["chunk_id"]] = snip return snippets, documents def process_web(url, source_id): data = WebBaseLoader(f"https://r.jina.ai/{url}").load()[0] try: header = re.search(r"Title: (.*)?", data.page_content).group(1) page_content = data.page_content[data.page_content.index("Markdown Content:") + len("Markdown Content:"):].strip() except Exception as e: header = "" page_content = data.page_content.strip() document_snippets = [ Document( page_content=page_content, metadata={ "header": header, "source_url": url, "source_type": "web", "chunk_id": source_id, "source_id": source_id, }, ) ] return document_snippets def process_pdf(url, source_id): data = PDFMinerPDFasHTMLLoader(url).load()[0] content = BeautifulSoup(data.page_content, "html.parser").find_all("div") snippets = get_pdf_snippets(content) filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4) median_font_size = math.ceil( median([font_size for _, font_size in filtered_snippets]) ) semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size) document_snippets = [ Document( page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]), metadata={ "header": " ".join(snip[1]["header_text"].split()[:10]), "source_url": url, "source_type": "pdf", "chunk_id": f"{source_id}_{i:02d}", "source_id": source_id, }, ) for i, snip in enumerate(semantic_snippets) ] return document_snippets def get_pdf_snippets(content): current_font_size = None current_text = "" snippets = [] for cntnt in content: span = cntnt.find("span") if not span: continue style = span.get("style") if not style: continue font_size = re.findall("font-size:(\d+)px", style) if not font_size: continue font_size = int(font_size[0]) if not current_font_size: current_font_size = font_size if font_size == current_font_size: current_text += cntnt.text else: snippets.append((current_text, current_font_size)) current_font_size = font_size current_text = cntnt.text snippets.append((current_text, current_font_size)) return snippets def filter_pdf_snippets(content_list, new_line_threshold_ratio): filtered_list = [] for e, (content, font_size) in enumerate(content_list): newline_count = content.count("\n") total_chars = len(content) ratio = newline_count / total_chars if ratio <= new_line_threshold_ratio: filtered_list.append((content, font_size)) return filtered_list def get_pdf_semantic_snippets(filtered_snippets, median_font_size): semantic_snippets = [] current_header = None current_content = [] header_font_size = None content_font_sizes = [] for content, font_size in filtered_snippets: if font_size > median_font_size: if current_header is not None: metadata = { "header_font_size": header_font_size, "content_font_size": ( median(content_font_sizes) if content_font_sizes else None ), "header_text": current_header, } semantic_snippets.append((current_content, metadata)) current_content = [] content_font_sizes = [] current_header = content header_font_size = font_size else: content_font_sizes.append(font_size) if current_content: current_content += " " + content else: current_content = content if current_header is not None: metadata = { "header_font_size": header_font_size, "content_font_size": ( median(content_font_sizes) if content_font_sizes else None ), "header_text": current_header, } semantic_snippets.append((current_content, metadata)) return semantic_snippets def num_tokens(string): return len(tiktoken.get_encoding("cl100k_base").encode(string, disallowed_special=()))