Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header | |
from pydantic import BaseModel | |
import os | |
from pymongo import MongoClient | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain_community.vectorstores import MongoDBAtlasVectorSearch | |
import uvicorn | |
from dotenv import load_dotenv | |
from fastapi.middleware.cors import CORSMiddleware | |
from uuid import uuid4 | |
# import httpx | |
from tensorflow import keras | |
from tensorflow.keras.models import load_model | |
import joblib | |
import librosa | |
import numpy as np | |
import pandas as pd | |
import numpy as np | |
import librosa.display | |
import soundfile as sf | |
import opensmile | |
import ffmpeg | |
import noisereduce as nr | |
import json | |
# Path to the JSON file | |
json_filepath = 'app/reciters.json' | |
def load_json_data(filepath): | |
"""Load JSON data from a file.""" | |
with open(filepath, 'r', encoding='utf-8') as file: | |
return json.load(file) | |
# Load the JSON data from file | |
json_reciters = load_json_data(json_filepath) | |
def find_reciter_by_name(name): | |
"""Search for a reciter by name in the loaded JSON data.""" | |
for reciter in json_reciters['reciters']: | |
if reciter['name'] == name: | |
return reciter | |
return None # Return None if no match is found | |
default_sample_rate=22050 | |
def load(file_name, skip_seconds=0): | |
return librosa.load(file_name, sr=None, res_type='kaiser_fast') | |
# def preprocess_audio(audio_data, rate): | |
# # Apply preprocessing steps | |
# audio_data = nr.reduce_noise(y=audio_data, sr=rate) | |
# audio_data = librosa.util.normalize(audio_data) | |
# audio_data, _ = librosa.effects.trim(audio_data) | |
# audio_data = librosa.resample(audio_data, orig_sr=rate, target_sr=default_sample_rate) | |
# # audio_data = fix_length(audio_data) | |
# rate = default_sample_rate | |
# return audio_data, rate | |
def extract_features(X, sample_rate): | |
# Generate Mel-frequency cepstral coefficients (MFCCs) from a time series | |
mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T,axis=0) | |
# Generates a Short-time Fourier transform (STFT) to use in the chroma_stft | |
stft = np.abs(librosa.stft(X)) | |
# Computes a chromagram from a waveform or power spectrogram. | |
chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0) | |
# Computes a mel-scaled spectrogram. | |
mel = np.mean(librosa.feature.melspectrogram(y=X, sr=sample_rate).T,axis=0) | |
# Computes spectral contrast | |
contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate).T,axis=0) | |
# Computes the tonal centroid features (tonnetz) | |
tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(X),sr=sample_rate).T,axis=0) | |
# Concatenate all feature arrays into a single 1D array | |
combined_features = np.hstack([mfccs, chroma, mel, contrast, tonnetz]) | |
return combined_features | |
load_dotenv() | |
# MongoDB connection | |
MONGODB_ATLAS_CLUSTER_URI = os.getenv("MONGODB_ATLAS_CLUSTER_URI", None) | |
client = MongoClient(MONGODB_ATLAS_CLUSTER_URI) | |
DB_NAME = "quran_db" | |
COLLECTION_NAME = "tafsir" | |
ATLAS_VECTOR_SEARCH_INDEX_NAME = "langchain_index" | |
MONGODB_COLLECTION = client[DB_NAME][COLLECTION_NAME] | |
embeddings = SentenceTransformerEmbeddings(model_name="BAAI/bge-m3") | |
vector_search = MongoDBAtlasVectorSearch.from_connection_string( | |
MONGODB_ATLAS_CLUSTER_URI, | |
DB_NAME + "." + COLLECTION_NAME, | |
embeddings, | |
index_name=ATLAS_VECTOR_SEARCH_INDEX_NAME, | |
) | |
df = pd.read_csv('app/quran.csv') | |
# FastAPI application setup | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def index_file(filepath): | |
""" Index each block in a file separated by double newlines for quick search. | |
Returns a dictionary with key as content and value as block number. """ | |
index = {} | |
with open(filepath, 'r', encoding='utf-8') as file: | |
content = file.read() # Read the whole file at once | |
blocks = content.split("\n\n") # Split the content by double newlines | |
for block_number, block in enumerate(blocks, 1): # Starting block numbers at 1 for human readability | |
# Replace single newlines within blocks with space and strip leading/trailing whitespace | |
formatted_block = ' '.join(block.split('\n')).strip() | |
index[formatted_block] = block_number | |
# if(block_number == 100): | |
# print(formatted_block) # Print the 5th block | |
return index | |
def get_text_by_block_number(filepath, block_numbers): | |
""" Retrieve specific blocks from a file based on block numbers, where each block is separated by '\n\n'. """ | |
blocks_text = [] | |
with open(filepath, 'r', encoding='utf-8') as file: | |
content = file.read() # Read the whole file at once | |
blocks = content.split("\n\n\n") # Split the content by double newlines | |
for block_number, block in enumerate(blocks, 1): # Starting block numbers at 1 for human readability | |
if block_number in block_numbers: | |
splitted = block.split('\n') | |
ayah = splitted[0] | |
tafsir = splitted[1] | |
print(block_number-1) | |
print(df.iloc[block_number - 1]) | |
# Replace single newlines within blocks with space and strip leading/trailing whitespace | |
# ayah_info = await get_ayah_info(ayah) # This makes the API call | |
row_data = df.iloc[block_number - 1].to_dict() | |
blocks_text.append({ | |
"tafsir": tafsir, | |
"surah_no": row_data['surah_no'], | |
"surah_name_en": row_data['surah_name_en'], | |
"surah_name_ar": row_data['surah_name_ar'], | |
"surah_name_roman": row_data['surah_name_roman'], | |
"ayah_no_surah": row_data['ayah_no_surah'], | |
"ayah_no_quran": row_data['ayah_no_quran'], | |
"ayah_ar": row_data['ayah_ar'], | |
"ayah_en": row_data['ayah_en'] | |
}) | |
if len(blocks_text) == len(block_numbers): # Stop reading once all required blocks are retrieved | |
break | |
return blocks_text | |
# Existing API endpoints | |
async def read_root(): | |
return {"message": "Welcome to our app"} | |
# New Query model for the POST request body | |
class Item(BaseModel): | |
question: str | |
EXPECTED_TOKEN = os.getenv("API_TOKEN") | |
def verify_token(authorization: str = Header(None)): | |
""" | |
Dependency to verify the Authorization header contains the correct Bearer token. | |
""" | |
# Prefix for bearer token in the Authorization header | |
prefix = "Bearer " | |
# Check if the Authorization header is present and correctly formatted | |
if not authorization or not authorization.startswith(prefix): | |
raise HTTPException(status_code=401, detail="Unauthorized: Missing or invalid token") | |
# Extract the token from the Authorization header | |
token = authorization[len(prefix):] | |
# Compare the extracted token to the expected token value | |
if token != EXPECTED_TOKEN: | |
raise HTTPException(status_code=401, detail="Unauthorized: Incorrect token") | |
# New API endpoint to get an answer using the chain | |
async def get_answer(item: Item): | |
try: | |
# Perform the similarity search with the provided question | |
matching_docs = vector_search.similarity_search(item.question, k=3) | |
clean_answers = [doc.page_content.replace("\n", " ").strip() for doc in matching_docs] | |
# Assuming 'search_file.txt' is where we want to search answers | |
answers_index = index_file('app/quran_tafseer_formatted.txt') | |
# Collect line numbers based on answers found | |
line_numbers = [answers_index[answer] for answer in clean_answers if answer in answers_index] | |
# Assuming 'retrieve_file.txt' is where we retrieve lines based on line numbers | |
result_text = get_text_by_block_number('app/quran_tafseer.txt', line_numbers) | |
print(result_text) | |
return {"result_text": result_text} | |
except Exception as e: | |
# If there's an error, return a 500 error with the error's details | |
raise HTTPException(status_code=500, detail=str(e)) | |
# ------- CNN | |
# Constants | |
TARGET_DURATION = 3 # seconds for each audio clip | |
SAMPLE_RATE = 44100 # sample rate to use | |
N_MELS = 128 # number of Mel bands to generate | |
HOP_LENGTH = 512 # number of samples between successive frames | |
def preprocess_audio_cnn(file_path): | |
try: | |
# Load the audio file | |
audio, sr = librosa.load(file_path, sr=SAMPLE_RATE) | |
audio_length = len(audio)/SAMPLE_RATE | |
except FileNotFoundError: | |
print(f"Error: File '{file_path}' not found.") | |
return None | |
except Exception as e: | |
print(f"Error loading audio file: {e}") | |
return None | |
# Check if audio signal is None | |
if audio is None: | |
print(f"Error: Audio signal is None for file '{file_path}'.") | |
return None | |
audio, _ = librosa.effects.trim(audio, top_db = 25) | |
audio = nr.reduce_noise(y = audio, sr=SAMPLE_RATE, thresh_n_mult_nonstationary=1,stationary=False) | |
# Determine how many 20-second clips can be made from the audio | |
if audio_length < TARGET_DURATION: | |
# If audio is shorter than 20 seconds, pad it | |
pad_length = int((TARGET_DURATION - audio_length) * sr) | |
padded_audio = np.pad(audio, (0, pad_length), mode='constant') | |
return [padded_audio] # Return as a list for consistent output format | |
else: | |
# If audio is longer than or equal to 20 seconds, split it into 20-second clips | |
clip_length = TARGET_DURATION * sr | |
clips = [] | |
for start in range(0, len(audio), clip_length): | |
end = start + clip_length | |
# Ensure the last clip has enough samples | |
if end > len(audio): | |
# Here you can choose to pad the last clip or simply not use it if it's too short | |
last_clip = np.pad(audio[start:], (0, end - len(audio)), mode='constant') | |
clips.append(last_clip) | |
else: | |
clips.append(audio[start:end]) | |
return clips | |
def generate_spectrogram(audio): | |
# Generate a Mel-scaled spectrogram | |
S = librosa.feature.melspectrogram(y=audio, sr=SAMPLE_RATE, n_mels=N_MELS, hop_length=HOP_LENGTH) | |
S_dB = librosa.power_to_db(S, ref=np.max) | |
# Normalize the spectrogram to be between 0 and 1 | |
S_dB_norm = librosa.util.normalize(S_dB) | |
return S_dB_norm | |
cnn_model = load_model('app/apr23.h5') | |
cnn_label_encoder = joblib.load('app/apr23_label.pkl') | |
async def handle_cnn(file: UploadFile = File(...)): | |
try: | |
print("got into request") | |
print(file.content_type) | |
# Ensure that we are handling an MP3 file | |
if file.content_type in ["audio/mpeg", "audio/mp3", "application/octet-stream"]: | |
file_extension = ".mp3" | |
elif file.content_type == "audio/wav": | |
file_extension = ".wav" | |
else: | |
raise HTTPException(status_code=400, detail="Invalid file type. Supported types: MP3, WAV.") | |
# Read the file's content | |
contents = await file.read() | |
temp_filename = f"app/{uuid4().hex}{file_extension}" | |
# Save file to a temporary file if needed or process directly from memory | |
with open(temp_filename, "wb") as f: | |
f.write(contents) | |
print(f"File saved as {temp_filename}") | |
spectrograms = [] | |
clips = preprocess_audio_cnn(temp_filename) | |
for clip in clips: | |
spectrogram = generate_spectrogram(clip) | |
if np.isnan(spectrogram).any() or np.isinf(spectrogram).any(): | |
print("Invalid spectrogram detected") | |
continue | |
spectrograms.append(spectrogram) | |
X = np.array(spectrograms) | |
X = X[..., np.newaxis] | |
# Make predictions | |
predictions = cnn_model.predict(X) | |
print('predictions', predictions) | |
# Convert predictions to label indexes | |
predicted_label_indexes = np.argmax(predictions, axis=1) | |
print(predicted_label_indexes) | |
unique_labels, counts = np.unique(predicted_label_indexes, return_counts=True) | |
# Step 2: Find the index of the maximum count | |
index_of_max_freq = np.argmax(counts) | |
# Step 3: Retrieve the most frequent item (index) | |
most_frequent_label_index = unique_labels[index_of_max_freq] | |
# predicted_label_indexes = np.argmax(predicted_label_indexes) | |
# Convert label indexes to actual label names | |
predicted_labels = cnn_label_encoder.inverse_transform([most_frequent_label_index]) | |
print('decoded', predicted_labels) | |
reciter_name = predicted_labels[0] | |
# Find the reciter by name | |
reciter_object = find_reciter_by_name(reciter_name) | |
# Clean up the temporary file | |
os.remove(temp_filename) | |
# Return a successful response with decoded predictions | |
return reciter_object | |
except Exception as e: | |
print(e) | |
# Handle possible exceptions | |
raise HTTPException(status_code=500, detail=str(e)) | |
# random forest | |
model = joblib.load('app/1713661391.0946255_trained_model.joblib') | |
pca = joblib.load('app/pca.pkl') | |
scaler = joblib.load('app/1713661464.8205004_scaler.joblib') | |
label_encoder = joblib.load('app/1713661470.6730225_label_encoder.joblib') | |
def preprocess_audio(audio_data, rate): | |
audio_data = nr.reduce_noise(y=audio_data, sr=rate) | |
# remove silence | |
# intervals = librosa.effects.split(audio_data, top_db=20) | |
# # Concatenate non-silent intervals | |
# audio_data = np.concatenate([audio_data[start:end] for start, end in intervals]) | |
audio_data = librosa.util.normalize(audio_data) | |
audio_data, _ = librosa.effects.trim(audio_data) | |
audio_data = librosa.resample(audio_data, orig_sr=rate, target_sr=default_sample_rate) | |
rate = default_sample_rate | |
return audio_data, rate | |
def repair_mp3_with_ffmpeg_python(input_path, output_path): | |
"""Attempt to repair an MP3 file using FFmpeg.""" | |
try: | |
# Define the audio stream with the necessary conversion parameters | |
audio = ( | |
ffmpeg | |
.input(input_path, nostdin=None, y=None) | |
.output(output_path, vn=None, acodec='libmp3lame', ar='44100', ac='1', b='192k', af='aresample=44100') | |
.global_args('-nostdin', '-y') # Applying global arguments | |
.overwrite_output() | |
) | |
# Execute the FFmpeg command | |
ffmpeg.run(audio) | |
print(f"File repaired and saved as {output_path}") | |
except ffmpeg.Error as e: | |
print(f"Failed to repair file {input_path}: {str(e.stderr)}") | |
async def handle_audio(file: UploadFile = File(...)): | |
try: | |
# Ensure that we are handling an MP3 file | |
if file.content_type == "audio/mpeg" or file.content_type == "audio/mp3": | |
file_extension = ".mp3" | |
elif file.content_type == "audio/wav": | |
file_extension = ".wav" | |
else: | |
raise HTTPException(status_code=400, detail="Invalid file type. Supported types: MP3, WAV.") | |
# Read the file's content | |
contents = await file.read() | |
temp_filename = f"app/{uuid4().hex}{file_extension}" | |
# Save file to a temporary file if needed or process directly from memory | |
with open(temp_filename, "wb") as f: | |
f.write(contents) | |
audio_data, sr = load(temp_filename, skip_seconds=5) | |
print("finished loading ", temp_filename) | |
# Preprocess data | |
audio_data, sr = preprocess_audio(audio_data, sr) | |
print("finished processing ", temp_filename) | |
# Extract features | |
features = extract_features(audio_data, sr) | |
features = features.reshape(1, -1) | |
features = scaler.transform(features) | |
# proceed with an inference | |
results = model.predict(features) | |
# decoded_predictions = [label_encoder.classes_[i] for i in results] | |
# Decode the predictions using the label encoder | |
decoded_predictions = label_encoder.inverse_transform(results) | |
print('decoded', decoded_predictions[0]) | |
# Clean up the temporary file | |
os.remove(temp_filename) | |
print({"message": "File processed successfully", "sheikh": decoded_predictions[0]}) | |
# Return a successful response with decoded predictions | |
return {"message": "File processed successfully", "sheikh": decoded_predictions[0]} | |
except Exception as e: | |
print(e) | |
# Handle possible exceptions | |
raise HTTPException(status_code=500, detail=str(e)) | |
# if __name__ == "__main__": | |
# uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=False) | |