Spaces:
Runtime error
Runtime error
import pynput | |
import sys | |
sys.path.append('../../') | |
from src.music.config import SYNTH_RECORDED_AUDIO_PATH, RATE_AUDIO_SAVE | |
from datetime import datetime | |
import numpy as np | |
import os | |
import wave | |
from ctypes import * | |
from contextlib import contextmanager | |
import pyaudio | |
ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) | |
def py_error_handler(filename, line, function, err, fmt): | |
pass | |
c_error_handler = ERROR_HANDLER_FUNC(py_error_handler) | |
def noalsaerr(): | |
asound = cdll.LoadLibrary('libasound.so') | |
asound.snd_lib_error_set_handler(c_error_handler) | |
yield | |
asound.snd_lib_error_set_handler(None) | |
global KEY_PRESSED | |
KEY_PRESSED = None | |
def on_press(key): | |
global KEY_PRESSED | |
try: | |
KEY_PRESSED = key.name | |
except: | |
pass | |
def on_release(key): | |
global KEY_PRESSED | |
KEY_PRESSED = None | |
def is_pressed(key): | |
global KEY_PRESSED | |
return KEY_PRESSED == key | |
# keyboard listener | |
listener = pynput.keyboard.Listener(on_press=on_press, on_release=on_release) | |
listener.start() | |
LEN_RECORDINGS = 40 | |
class AudioRecorder: | |
def __init__(self, chunk=2**10, rate=44100, place='', len_recording=LEN_RECORDINGS, drop_beginning=0.5): | |
self.chunk = chunk | |
self.rate = rate | |
with noalsaerr(): | |
self.audio = pyaudio.PyAudio() | |
self.channels = 1 | |
self.format = pyaudio.paInt16 | |
self.stream = self.audio.open(format=self.format, | |
channels=self.channels, | |
rate=rate, | |
input=True, | |
frames_per_buffer=chunk) | |
self.stream.stop_stream() | |
self.drop_beginning_chunks = int(drop_beginning * self.rate / self.chunk) | |
self.place = place | |
self.len_recordings = len_recording | |
def get_filename(self): | |
now = datetime.now() | |
return self.place + '_' + now.strftime("%b_%d_%Y_%Hh%Mm%Ss") + '.mp3' | |
def read_last_chunk(self): | |
return self.stream.read(self.chunk) | |
def live_read(self): | |
if self.stream.is_stopped(): | |
self.stream.start_stream() | |
i = 0 | |
while not is_pressed('esc'): | |
data = np.frombuffer(self.stream.read(self.chunk), dtype=np.int16) | |
peak = np.average(np.abs(data)) * 2 | |
bars = "#"*int(50 * peak / 2 ** 16) | |
i += 1 | |
print("%04d %05d %s"%(i,peak,bars)) | |
self.stream.stop_stream() | |
def record_next_N_seconds(self, n=None, saving_path=None): | |
if saving_path is None: | |
saving_path = SYNTH_RECORDED_AUDIO_PATH + self.get_filename() | |
if n is None: | |
n = self.len_recordings | |
print(f'Recoding the next {n} secs.' | |
# f'\n\tRecording starts when the first key is pressed;' | |
f'\n\tPress Enter to end the recording;' | |
f'\n\tPress BackSpace (<--) to cancel the recording;' | |
f'\n\tSaving to {saving_path}') | |
try: | |
self.stream.start_stream() | |
backspace_pressed = False | |
self.recording = [] | |
i_chunk = 0 | |
while not is_pressed('enter') and self.chunk / self.rate * i_chunk < n: | |
self.recording.append(self.read_last_chunk()) | |
i_chunk += 1 | |
if is_pressed('backspace'): | |
backspace_pressed = True | |
print('\n \t--> Recording cancelled! (you pressed BackSpace)') | |
break | |
self.stream.stop_stream() | |
# save the file | |
if not backspace_pressed: | |
self.recording = self.recording[self.drop_beginning_chunks:] # drop first chunks to remove keyboard sound | |
with wave.open(saving_path[:-4] + '.wav', 'wb') as waveFile: | |
waveFile.setnchannels(self.channels) | |
waveFile.setsampwidth(self.audio.get_sample_size(self.format)) | |
waveFile.setframerate(self.rate) | |
waveFile.writeframes(b''.join(self.recording)) | |
os.system(f'ffmpeg -i "{saving_path[:-4] + ".wav"}" -vn -loglevel panic -y -ac 1 -ar {int(RATE_AUDIO_SAVE)} -b:a 320k "{saving_path}" ') | |
os.remove(saving_path[:-4] + '.wav') | |
print(f'\n--> Recording saved, duration: {self.chunk / self.rate * i_chunk:.2f} secs.') | |
return saving_path | |
except: | |
print('\n --> The recording failed.') | |
return None | |
def record_one(self): | |
ready_msg = False | |
print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') | |
while True: | |
if not ready_msg: | |
print('-------\nReady to record!') | |
print('Press space to start a recording\n') | |
ready_msg = True | |
if is_pressed('space'): | |
saving_path = self.record_next_N_seconds() | |
break | |
return saving_path | |
def run(self): | |
# with pynput.Listener( | |
# on_press=self.on_press) as listener: | |
# listener.join() | |
ready_msg = False | |
print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') | |
while True: | |
if not ready_msg: | |
print('-------\nReady to record!') | |
print('Press space to start a recording\n') | |
ready_msg = True | |
if is_pressed('space'): | |
self.record_next_N_seconds() | |
ready_msg = False | |
if is_pressed('esc'): | |
print('End of the recording session. See you soon!') | |
self.close() | |
break | |
def close(self): | |
self.stream.close() | |
self.audio.terminate() | |
if __name__ == '__main__': | |
audio_recorder = AudioRecorder(place='home') | |
audio_recorder.record_one() | |