import pynput import sys sys.path.append('../../') from src.music.config import SYNTH_RECORDED_AUDIO_PATH, RATE_AUDIO_SAVE from datetime import datetime import numpy as np import os import wave from ctypes import * from contextlib import contextmanager import pyaudio ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p) def py_error_handler(filename, line, function, err, fmt): pass c_error_handler = ERROR_HANDLER_FUNC(py_error_handler) @contextmanager def noalsaerr(): asound = cdll.LoadLibrary('libasound.so') asound.snd_lib_error_set_handler(c_error_handler) yield asound.snd_lib_error_set_handler(None) global KEY_PRESSED KEY_PRESSED = None def on_press(key): global KEY_PRESSED try: KEY_PRESSED = key.name except: pass def on_release(key): global KEY_PRESSED KEY_PRESSED = None def is_pressed(key): global KEY_PRESSED return KEY_PRESSED == key # keyboard listener listener = pynput.keyboard.Listener(on_press=on_press, on_release=on_release) listener.start() LEN_RECORDINGS = 40 class AudioRecorder: def __init__(self, chunk=2**10, rate=44100, place='', len_recording=LEN_RECORDINGS, drop_beginning=0.5): self.chunk = chunk self.rate = rate with noalsaerr(): self.audio = pyaudio.PyAudio() self.channels = 1 self.format = pyaudio.paInt16 self.stream = self.audio.open(format=self.format, channels=self.channels, rate=rate, input=True, frames_per_buffer=chunk) self.stream.stop_stream() self.drop_beginning_chunks = int(drop_beginning * self.rate / self.chunk) self.place = place self.len_recordings = len_recording def get_filename(self): now = datetime.now() return self.place + '_' + now.strftime("%b_%d_%Y_%Hh%Mm%Ss") + '.mp3' def read_last_chunk(self): return self.stream.read(self.chunk) def live_read(self): if self.stream.is_stopped(): self.stream.start_stream() i = 0 while not is_pressed('esc'): data = np.frombuffer(self.stream.read(self.chunk), dtype=np.int16) peak = np.average(np.abs(data)) * 2 bars = "#"*int(50 * peak / 2 ** 16) i += 1 print("%04d %05d %s"%(i,peak,bars)) self.stream.stop_stream() def record_next_N_seconds(self, n=None, saving_path=None): if saving_path is None: saving_path = SYNTH_RECORDED_AUDIO_PATH + self.get_filename() if n is None: n = self.len_recordings print(f'Recoding the next {n} secs.' # f'\n\tRecording starts when the first key is pressed;' f'\n\tPress Enter to end the recording;' f'\n\tPress BackSpace (<--) to cancel the recording;' f'\n\tSaving to {saving_path}') try: self.stream.start_stream() backspace_pressed = False self.recording = [] i_chunk = 0 while not is_pressed('enter') and self.chunk / self.rate * i_chunk < n: self.recording.append(self.read_last_chunk()) i_chunk += 1 if is_pressed('backspace'): backspace_pressed = True print('\n \t--> Recording cancelled! (you pressed BackSpace)') break self.stream.stop_stream() # save the file if not backspace_pressed: self.recording = self.recording[self.drop_beginning_chunks:] # drop first chunks to remove keyboard sound with wave.open(saving_path[:-4] + '.wav', 'wb') as waveFile: waveFile.setnchannels(self.channels) waveFile.setsampwidth(self.audio.get_sample_size(self.format)) waveFile.setframerate(self.rate) waveFile.writeframes(b''.join(self.recording)) os.system(f'ffmpeg -i "{saving_path[:-4] + ".wav"}" -vn -loglevel panic -y -ac 1 -ar {int(RATE_AUDIO_SAVE)} -b:a 320k "{saving_path}" ') os.remove(saving_path[:-4] + '.wav') print(f'\n--> Recording saved, duration: {self.chunk / self.rate * i_chunk:.2f} secs.') return saving_path except: print('\n --> The recording failed.') return None def record_one(self): ready_msg = False print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') while True: if not ready_msg: print('-------\nReady to record!') print('Press space to start a recording\n') ready_msg = True if is_pressed('space'): saving_path = self.record_next_N_seconds() break return saving_path def run(self): # with pynput.Listener( # on_press=self.on_press) as listener: # listener.join() ready_msg = False print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)') while True: if not ready_msg: print('-------\nReady to record!') print('Press space to start a recording\n') ready_msg = True if is_pressed('space'): self.record_next_N_seconds() ready_msg = False if is_pressed('esc'): print('End of the recording session. See you soon!') self.close() break def close(self): self.stream.close() self.audio.terminate() if __name__ == '__main__': audio_recorder = AudioRecorder(place='home') audio_recorder.record_one()