File size: 5,579 Bytes
7e4014b
 
 
 
 
 
9bb602c
7e4014b
 
 
 
 
 
c323312
5f9938a
c323312
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7793370
7e4014b
 
5f9938a
b517eef
0c85aeb
5d2b3d1
0c85aeb
 
5d2b3d1
0c85aeb
5f9938a
 
0c85aeb
5f9938a
5d2b3d1
5f9938a
 
c323312
5f9938a
 
 
 
 
 
 
 
7e4014b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c323312
5f9938a
7e4014b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9bb602c
 
 
143152b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import math
import re
from statistics import median
from bs4 import BeautifulSoup
from langchain.docstore.document import Document
from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader
import tiktoken

deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip()


def process_documents(urls):
    snippets = []
    documents = {}
    for source_id, url in enumerate(urls):
        snippet = (
            process_pdf(url, source_id)
            if url.endswith(".pdf")
            else process_web(url, source_id)
        )
        snippets.extend(snippet)
        documents[str(source_id)] = Document(
            page_content="\n".join([snip.page_content for snip in snippet]),
            metadata={
                "source_url": url,
                "source_type": "pdf" if url.endswith(".pdf") else "web",
                "source_id": source_id,
                "chunk_id": source_id,
            },
        )
        for snip in snippet:
            documents[snip.metadata["chunk_id"]] = snip
    return snippets, documents


def process_web(url, source_id):
    data = WebBaseLoader(f"https://r.jina.ai/{url}").load()[0]
    try:
        header = re.search(r"Title: (.*)?", data.page_content).group(1)
        page_content = data.page_content[data.page_content.index("Markdown Content:") + len("Markdown Content:"):].strip()
    except Exception as e:
        header = ""
        page_content = data.page_content.strip()
    document_snippets = [
        Document(
            page_content=page_content,
            metadata={
                "header": header,
                "source_url": url,
                "source_type": "web",
                "chunk_id": source_id,
                "source_id": source_id,
            },
        )
    ]
    return document_snippets


def process_pdf(url, source_id):
    data = PDFMinerPDFasHTMLLoader(url).load()[0]
    content = BeautifulSoup(data.page_content, "html.parser").find_all("div")
    snippets = get_pdf_snippets(content)
    filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4)
    median_font_size = math.ceil(
        median([font_size for _, font_size in filtered_snippets])
    )
    semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size)
    document_snippets = [
        Document(
            page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]),
            metadata={
                "header": " ".join(snip[1]["header_text"].split()[:10]),
                "source_url": url,
                "source_type": "pdf",
                "chunk_id": f"{source_id}_{i:02d}",
                "source_id": source_id,
            },
        )
        for i, snip in enumerate(semantic_snippets)
    ]
    return document_snippets


def get_pdf_snippets(content):
    current_font_size = None
    current_text = ""
    snippets = []
    for cntnt in content:
        span = cntnt.find("span")
        if not span:
            continue
        style = span.get("style")
        if not style:
            continue
        font_size = re.findall("font-size:(\d+)px", style)
        if not font_size:
            continue
        font_size = int(font_size[0])

        if not current_font_size:
            current_font_size = font_size
        if font_size == current_font_size:
            current_text += cntnt.text
        else:
            snippets.append((current_text, current_font_size))
            current_font_size = font_size
            current_text = cntnt.text
    snippets.append((current_text, current_font_size))
    return snippets


def filter_pdf_snippets(content_list, new_line_threshold_ratio):
    filtered_list = []
    for e, (content, font_size) in enumerate(content_list):
        newline_count = content.count("\n")
        total_chars = len(content)
        ratio = newline_count / total_chars
        if ratio <= new_line_threshold_ratio:
            filtered_list.append((content, font_size))
    return filtered_list


def get_pdf_semantic_snippets(filtered_snippets, median_font_size):
    semantic_snippets = []
    current_header = None
    current_content = []
    header_font_size = None
    content_font_sizes = []

    for content, font_size in filtered_snippets:
        if font_size > median_font_size:
            if current_header is not None:
                metadata = {
                    "header_font_size": header_font_size,
                    "content_font_size": (
                        median(content_font_sizes) if content_font_sizes else None
                    ),
                    "header_text": current_header,
                }
                semantic_snippets.append((current_content, metadata))
                current_content = []
                content_font_sizes = []

            current_header = content
            header_font_size = font_size
        else:
            content_font_sizes.append(font_size)
            if current_content:
                current_content += " " + content
            else:
                current_content = content

    if current_header is not None:
        metadata = {
            "header_font_size": header_font_size,
            "content_font_size": (
                median(content_font_sizes) if content_font_sizes else None
            ),
            "header_text": current_header,
        }
        semantic_snippets.append((current_content, metadata))
    return semantic_snippets


def num_tokens(string):
    return len(tiktoken.get_encoding("cl100k_base").encode(string, disallowed_special=()))