import os import pandas as pd from pydub import AudioSegment import numpy as np from moviepy.editor import * import time import pickle import audioread import librosa # install numba==0.49.1 # setup A: numba 0.51.2, librosa 0.6.3, llvmlite: 0.34.0 # setupB: numba==0.49.1, llvmlite-0.32.1 from src.music.config import RATE_AUDIO_SAVE import hashlib import unicodedata import re # from src.music.piano_detection_model.piano_detection_model import SR def clean_removed_mp3_from_csv(path): print(f"Cleaning meta_data.csv using files from the folder, in {path}") files = os.listdir(path) indexes_to_remove = [] meta_data = pd.read_csv(path + 'meta_data.csv') for i, fn in enumerate(meta_data['filename']): if fn not in files: indexes_to_remove.append(i) meta_data = meta_data.drop(indexes_to_remove) meta_data.to_csv(path + 'meta_data.csv', index=False) print('\tDone.') def clean_removed_csv_from_folder(path): print(f"Cleaning files from folder using meta_data.csv listed file, in {path}") files = os.listdir(path) meta_data = pd.read_csv(path + 'meta_data.csv') hashes = set(meta_data['hash']) count = 0 for f in files: if f not in ['meta_data.csv', 'url.txt']: if f[:-4] not in hashes: count += 1 print(count) # os.remove(path + f) stop = 1 print('\tDone.') # def convert_mp3_to_mono_16k(path): # print(f"\n\n\t\tConverting mp3 to mono and 16k sample rate, in {path}\n") # if '.mp3' == path[-4:]: # audio = AudioFileClip(path) # audio.write_audiofile(path[:-4] + '.mp3', # verbose=False, # logger=None, # fps=FPS, # ffmpeg_params=["-ac", "1"]) # else: # list_files = os.listdir(path) # for i, f in enumerate(list_files): # print(compute_progress(i, len(list_files))) # if ".mp3" in f: # audio = AudioFileClip(path + f) # audio.write_audiofile(path + f[:-4] + '.mp3', # verbose=False, # logger=None, # fps=FPS, # 16000 sr # ffmpeg_params=["-ac", "1"] # make it mono # ) # print('\tDone.') def load_audio(path, sr=22050, mono=True, offset=0.0, duration=None, dtype=np.float32, res_type='kaiser_best', backends=[audioread.ffdec.FFmpegAudioFile]): """Load audio. Copied from librosa.core.load() except that ffmpeg backend is always used in this function. Code from piano_transcription_inference""" y = [] with audioread.audio_open(os.path.realpath(path), backends=backends) as input_file: sr_native = input_file.samplerate n_channels = input_file.channels s_start = int(np.round(sr_native * offset)) * n_channels if duration is None: s_end = np.inf else: s_end = s_start + (int(np.round(sr_native * duration)) * n_channels) n = 0 for frame in input_file: frame = librosa.core.audio.util.buf_to_float(frame, dtype=dtype) n_prev = n n = n + len(frame) if n < s_start: # offset is after the current frame # keep reading continue if s_end < n_prev: # we're off the end. stop reading break if s_end < n: # the end is in this frame. crop. frame = frame[:s_end - n_prev] if n_prev <= s_start <= n: # beginning is in this frame frame = frame[(s_start - n_prev):] # tack on the current frame y.append(frame) if y: y = np.concatenate(y) if n_channels > 1: y = y.reshape((-1, n_channels)).T if mono: y = librosa.core.audio.to_mono(y) if sr is not None: y = librosa.core.audio.resample(y, sr_native, sr, res_type=res_type) else: sr = sr_native # Final cleanup for dtype and contiguity y = np.ascontiguousarray(y, dtype=dtype) return (y, sr) def compute_progress(iter, total): return f"{int((iter+ 1) / total * 100)}%" def compute_progress_and_eta(times, iter, total, n_av=3000): av_time = np.mean(times[-n_av:]) progress = int(((iter + 1) / total) * 100) eta_h = int(av_time * (total - iter) // 3600) eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60) eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60)) eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S." return eta def crop_mp3_from_meta_data_constraints(path, clean_constraints=True): print(f"Cropping mp3 using constraints from meta_data.csv, in {path}") meta_data = pd.read_csv(path + 'meta_data.csv') constraint_start = meta_data['constraint_start'].copy() length = meta_data['length'].copy() constraint_end = meta_data['constraint_end'].copy() filenames = meta_data['filename'].copy() times = [5] for i, c_start, c_end, fn, l in zip(range(len(constraint_start)), constraint_start, constraint_end, filenames, length): if c_start != 0 or c_end != l: i_time = time.time() print(compute_progress_and_eta(times, i, len(constraint_start), n_av=100)) song = AudioSegment.from_mp3(path + fn) extract = song[c_start*1000:c_end*1000] extract.export(path + fn, format="mp3") if clean_constraints: constraint_start[i] = 0 constraint_end[i] = length[i] meta_data['constraint_start'] = constraint_start meta_data['constraint_end'] = constraint_end meta_data.to_csv(path + 'meta_data.csv', index=False) times.append(time.time() - i_time) print('\tDone.') def get_all_subfiles_with_extension(path, max_depth=3, extension='.*', current_depth=0): folders = [f for f in os.listdir(path) if os.path.isdir(path + f)] # get all files in current folder with a given extension if isinstance(extension, list): assert all([isinstance(e, str) for e in extension]), 'extension can be a str or a list' files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and any([ext == f[-len(ext):] for ext in extension])] elif isinstance(extension, str): assert extension[0] == '.', 'extension should be an extension or a list of extensions' if extension == '.*': files = [path + f for f in os.listdir(path) if os.path.isfile(path + f)] else: files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and f[-len(extension):]==extension] else: print('Error: extension should be either a str or a list') raise ValueError if current_depth < max_depth: for fold in folders: files += get_all_subfiles_with_extension(path + fold + '/', max_depth=max_depth, extension=extension, current_depth=current_depth+1) return files def get_out_path(in_path, in_word, out_word, out_extension, exclude_paths=()): splitted_in_path = in_path.split('/') for i in range(len(splitted_in_path)): if splitted_in_path[i] == in_word: splitted_in_path[i] = out_word playlist_index = i + 1 file_index = len(splitted_in_path) - 1 if splitted_in_path[playlist_index] in exclude_paths: to_exclude = True return None, to_exclude, None else: to_exclude = False if out_word != 'midi': splitted_in_path[playlist_index] = '_'.join(splitted_in_path[playlist_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word else: splitted_in_path[playlist_index] += '_' + out_word if 'fake' not in splitted_in_path: os.makedirs('/'.join(splitted_in_path[:playlist_index + 1]), exist_ok=True) if out_word != 'midi': new_filename = '_'.join(splitted_in_path[file_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension else: new_filename = '.'.join(splitted_in_path[file_index].split('.')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension splitted_in_path[file_index] = new_filename splitted_in_path = splitted_in_path[:playlist_index + 1] + [splitted_in_path[file_index]] out_path = '/'.join(splitted_in_path) return out_path, to_exclude, splitted_in_path[playlist_index] def set_all_seeds(seed): import random import numpy as np import torch torch.manual_seed(seed) random.seed(seed) np.random.seed(seed) def get_paths_in_and_out(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): # find all files with the in_extension in subfolders of in_path up to max_depth. # for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) indexes_not_transcribed = [] all_out_paths = [] all_playlists = [] for i_path, in_path in enumerate(all_in_paths): out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) if not to_exclude: indexes_not_transcribed.append(i_path) all_out_paths.append(out_path) all_playlists.append(playlist) all_in_paths = [in_path for i, in_path in enumerate(all_in_paths) if i in indexes_not_transcribed] assert len(all_out_paths) == len(all_in_paths) return all_in_paths, all_out_paths, all_playlists def get_path_and_filter_existing(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): # find all files with the in_extension in subfolders of in_path up to max_depth. # for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) indexes_to_process = [] all_out_paths = [] all_playlists = [] for i_path, in_path in enumerate(all_in_paths): out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) if not to_exclude: if not os.path.exists(out_path): indexes_to_process.append(i_path) all_out_paths.append(out_path) all_playlists.append(playlist) all_in_paths = list(np.array(all_in_paths)[indexes_to_process])#[in_path for i, in_path in enumerate(all_in_paths) if i in indexes_to_process] assert len(all_out_paths) == len(all_in_paths) return all_in_paths, all_out_paths, all_playlists def md5sum(filename, blocksize=65536): hash = hashlib.md5() with open(filename, "rb") as f: for block in iter(lambda: f.read(blocksize), b""): hash.update(block) return hash.hexdigest() emoji_pattern = re.compile("[" u"\U0001F600-\U0001F64F" # emoticons u"\U0001F300-\U0001F5FF" # symbols & pictographs u"\U0001F680-\U0001F6FF" # transport & map symbols u"\U0001F1E0-\U0001F1FF" # flags (iOS) "]+", flags=re.UNICODE) def slugify(value, allow_unicode=False): """ Taken from https://github.com/django/django/blob/master/django/utils/text.py Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated dashes to single dashes. Remove characters that aren't alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace, dashes, and underscores. """ value = str(value).lower() if allow_unicode: value = unicodedata.normalize('NFKC', value) else: value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') value = re.sub(r'[^\w\s-]', '', value.lower()) value = emoji_pattern.sub(r'', value) value = re.sub(r'[-\s]+', '_', value).strip('-_') # if value == '': # for i in range(10): # value += str(np.random.choice(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])) return value if __name__ == '__main__': path = "/home/cedric/Documents/pianocktail/data/midi/street_piano/" # for folder in ['my_sheet_music_transcriptions']:#os.listdir(path): # print('\n\n\t\t', folder) # convert_mp4_to_mp3(path + folder + '/') clean_removed_csv_from_folder(path) # folder = 'street_piano/' # for folder in ['street_piano/']: # clean_removed_mp3_from_csv(path + folder)