TastyPiano / src /music /pipeline /synth2audio.py
Cédric Colas
initial commit
e775f6d
raw
history blame
6.07 kB
import pynput
import sys
sys.path.append('../../')
from src.music.config import SYNTH_RECORDED_AUDIO_PATH, RATE_AUDIO_SAVE
from datetime import datetime
import numpy as np
import os
import wave
from ctypes import *
from contextlib import contextmanager
import pyaudio
ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p)
def py_error_handler(filename, line, function, err, fmt):
pass
c_error_handler = ERROR_HANDLER_FUNC(py_error_handler)
@contextmanager
def noalsaerr():
asound = cdll.LoadLibrary('libasound.so')
asound.snd_lib_error_set_handler(c_error_handler)
yield
asound.snd_lib_error_set_handler(None)
global KEY_PRESSED
KEY_PRESSED = None
def on_press(key):
global KEY_PRESSED
try:
KEY_PRESSED = key.name
except:
pass
def on_release(key):
global KEY_PRESSED
KEY_PRESSED = None
def is_pressed(key):
global KEY_PRESSED
return KEY_PRESSED == key
# keyboard listener
listener = pynput.keyboard.Listener(on_press=on_press, on_release=on_release)
listener.start()
LEN_RECORDINGS = 40
class AudioRecorder:
def __init__(self, chunk=2**10, rate=44100, place='', len_recording=LEN_RECORDINGS, drop_beginning=0.5):
self.chunk = chunk
self.rate = rate
with noalsaerr():
self.audio = pyaudio.PyAudio()
self.channels = 1
self.format = pyaudio.paInt16
self.stream = self.audio.open(format=self.format,
channels=self.channels,
rate=rate,
input=True,
frames_per_buffer=chunk)
self.stream.stop_stream()
self.drop_beginning_chunks = int(drop_beginning * self.rate / self.chunk)
self.place = place
self.len_recordings = len_recording
def get_filename(self):
now = datetime.now()
return self.place + '_' + now.strftime("%b_%d_%Y_%Hh%Mm%Ss") + '.mp3'
def read_last_chunk(self):
return self.stream.read(self.chunk)
def live_read(self):
if self.stream.is_stopped():
self.stream.start_stream()
i = 0
while not is_pressed('esc'):
data = np.frombuffer(self.stream.read(self.chunk), dtype=np.int16)
peak = np.average(np.abs(data)) * 2
bars = "#"*int(50 * peak / 2 ** 16)
i += 1
print("%04d %05d %s"%(i,peak,bars))
self.stream.stop_stream()
def record_next_N_seconds(self, n=None, saving_path=None):
if saving_path is None:
saving_path = SYNTH_RECORDED_AUDIO_PATH + self.get_filename()
if n is None:
n = self.len_recordings
print(f'Recoding the next {n} secs.'
# f'\n\tRecording starts when the first key is pressed;'
f'\n\tPress Enter to end the recording;'
f'\n\tPress BackSpace (<--) to cancel the recording;'
f'\n\tSaving to {saving_path}')
try:
self.stream.start_stream()
backspace_pressed = False
self.recording = []
i_chunk = 0
while not is_pressed('enter') and self.chunk / self.rate * i_chunk < n:
self.recording.append(self.read_last_chunk())
i_chunk += 1
if is_pressed('backspace'):
backspace_pressed = True
print('\n \t--> Recording cancelled! (you pressed BackSpace)')
break
self.stream.stop_stream()
# save the file
if not backspace_pressed:
self.recording = self.recording[self.drop_beginning_chunks:] # drop first chunks to remove keyboard sound
with wave.open(saving_path[:-4] + '.wav', 'wb') as waveFile:
waveFile.setnchannels(self.channels)
waveFile.setsampwidth(self.audio.get_sample_size(self.format))
waveFile.setframerate(self.rate)
waveFile.writeframes(b''.join(self.recording))
os.system(f'ffmpeg -i "{saving_path[:-4] + ".wav"}" -vn -loglevel panic -y -ac 1 -ar {int(RATE_AUDIO_SAVE)} -b:a 320k "{saving_path}" ')
os.remove(saving_path[:-4] + '.wav')
print(f'\n--> Recording saved, duration: {self.chunk / self.rate * i_chunk:.2f} secs.')
return saving_path
except:
print('\n --> The recording failed.')
return None
def record_one(self):
ready_msg = False
print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)')
while True:
if not ready_msg:
print('-------\nReady to record!')
print('Press space to start a recording\n')
ready_msg = True
if is_pressed('space'):
saving_path = self.record_next_N_seconds()
break
return saving_path
def run(self):
# with pynput.Listener(
# on_press=self.on_press) as listener:
# listener.join()
ready_msg = False
print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)')
while True:
if not ready_msg:
print('-------\nReady to record!')
print('Press space to start a recording\n')
ready_msg = True
if is_pressed('space'):
self.record_next_N_seconds()
ready_msg = False
if is_pressed('esc'):
print('End of the recording session. See you soon!')
self.close()
break
def close(self):
self.stream.close()
self.audio.terminate()
if __name__ == '__main__':
audio_recorder = AudioRecorder(place='home')
audio_recorder.record_one()