Spaces:
Sleeping
Sleeping
from pdfminer.high_level import extract_pages | |
from pdfminer.layout import LTTextContainer | |
from tqdm import tqdm | |
import re | |
import gradio as gr | |
import os | |
import accelerate | |
import spaces | |
import subprocess | |
from huggingface_hub import hf_hub_download | |
from llama_cpp import Llama | |
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType | |
from llama_cpp_agent.providers import LlamaCppPythonProvider | |
from llama_cpp_agent.chat_history import BasicChatHistory | |
from llama_cpp_agent.chat_history.messages import Roles | |
# from vllm import LLM, SamplingParams | |
# subprocess.run('pip install llama-cpp-python==0.2.75 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu124', shell=True) | |
# subprocess.run('pip install llama-cpp-agent==0.2.10', shell=True) | |
hf_hub_download( | |
repo_id="QuantFactory/Meta-Llama-3-8B-Instruct-GGUF", | |
filename="Meta-Llama-3-8B-Instruct.Q8_0.gguf", | |
local_dir = "./models" | |
) | |
# hf_hub_download( | |
# repo_id="bartowski/Meta-Llama-3-70B-Instruct-GGUF", | |
# filename="Meta-Llama-3-70B-Instruct-Q3_K_M.gguf", | |
# local_dir = "./models" | |
# ) | |
def process_document(pdf_path, page_ids=None): | |
extracted_pages = extract_pages(pdf_path, page_numbers=page_ids) | |
page2content = {} | |
for extracted_page in tqdm(extracted_pages): | |
page_id = extracted_page.pageid | |
content = process_page(extracted_page) | |
page2content[page_id] = content | |
return page2content | |
def process_page(extracted_page): | |
content = [] | |
elements = [element for element in extracted_page._objs] | |
elements.sort(key=lambda a: a.y1, reverse=True) | |
for i, element in enumerate(elements): | |
if isinstance(element, LTTextContainer): | |
line_text = extract_text_and_normalize(element) | |
content.append(line_text) | |
content = re.sub('\n+', ' ', ''.join(content)) | |
return content | |
def extract_text_and_normalize(element): | |
# Extract text from line and split it with new lines | |
line_texts = element.get_text().split('\n') | |
norm_text = '' | |
for line_text in line_texts: | |
line_text = line_text.strip() | |
if not line_text: | |
line_text = '\n' | |
else: | |
line_text = re.sub('\s+', ' ', line_text) | |
if not re.search('[\w\d\,\-]', line_text[-1]): | |
line_text += '\n' | |
else: | |
line_text += ' ' | |
norm_text += line_text | |
return norm_text | |
def txt_to_html(text): | |
html_content = "<html><body>" | |
for line in text.split('\n'): | |
html_content += "<p>{}</p>".format(line.strip()) | |
html_content += "</body></html>" | |
return html_content | |
def deidentify_doc(pdftext, maxtokens, temperature, top_probability): | |
prompt = "In the following text replace any person name and any address with term [redacted], replace any Date of Birth and NHS number with term [redacted]. Output the modified text." | |
llm = Llama( | |
model_path="models/Meta-Llama-3-8B-Instruct.Q8_0.gguf", | |
flash_attn=True, | |
n_gpu_layers=81, | |
n_batch=1024, | |
n_ctx=8192, | |
) | |
output = llm.create_chat_completion( | |
messages=[ | |
{"role": "assistant", "content": prompt}, | |
{ | |
"role": "user", | |
"content": pdftext | |
} | |
], | |
max_tokens=maxtokens, | |
temperature=temperature | |
) | |
output = output['choices'][0]['message']['content'] | |
prompt = "Perform the following actions on given text: 1. Replace any person age with term [redacted] 2. DO NOT REPLACE ANY MEDICAL MEASUREMENTS 3. Replace only the CALENDAR DATES of format 'day/month/year' with term [redacted]. Output the modified text." | |
output = llm.create_chat_completion( | |
messages=[ | |
{"role": "assistant", "content": prompt}, | |
{ | |
"role": "user", | |
"content": output | |
} | |
], | |
max_tokens=maxtokens, | |
temperature=temperature | |
) | |
output = output['choices'][0]['message']['content'] | |
return output | |
def pdf_to_text(files, maxtokens=2048, temperature=0, top_probability=0.95): | |
files=[files] | |
for file in files: | |
if not file: | |
return 'Please provide a valid PDF' | |
file_name = os.path.basename(file) | |
file_name_splt = file_name.split('.') | |
if (len(file_name_splt) > 1 and file_name_splt[1] == 'pdf'): | |
page2content = process_document(file, page_ids=[0]) | |
pdftext = page2content[1] | |
if (pdftext): #shift this if block to right later | |
anonymized_text = deidentify_doc(pdftext, maxtokens, temperature, top_probability) | |
else: | |
anonymized_text = 'PDF file appears to be corrupted.' | |
return anonymized_text | |
css = ".gradio-container {background: 'logo.png'}" | |
temp_slider = gr.Slider(minimum=0, maximum=2, value=0.9, label="Temperature Value") | |
prob_slider = gr.Slider(minimum=0, maximum=1, value=0.95, label="Max Probability Value") | |
max_tokens = gr.Number(value=600, label="Max Tokens") | |
input_folder = gr.File(file_count='multiple') | |
input_folder_text = gr.Textbox(label='Enter output folder path') | |
output_text = gr.Textbox() | |
output_path_component = gr.File(label="Select Output Path") | |
iface = gr.Interface( | |
fn=pdf_to_text, | |
inputs=['file'], | |
# css = css, | |
outputs=output_text, | |
title='DeID: Endoscopy Report De-Identification', | |
description="This application assists to remove personal information from the uploaded clinical report", | |
theme=gr.themes.Soft(), | |
) | |
iface.launch() |