File size: 6,074 Bytes
e775f6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import pynput
import sys
sys.path.append('../../')
from src.music.config import SYNTH_RECORDED_AUDIO_PATH, RATE_AUDIO_SAVE
from datetime import datetime
import numpy as np
import os
import wave

from ctypes import *
from contextlib import contextmanager
import pyaudio

ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p)

def py_error_handler(filename, line, function, err, fmt):
    pass
c_error_handler = ERROR_HANDLER_FUNC(py_error_handler)

@contextmanager
def noalsaerr():
    asound = cdll.LoadLibrary('libasound.so')
    asound.snd_lib_error_set_handler(c_error_handler)
    yield
    asound.snd_lib_error_set_handler(None)

global KEY_PRESSED
KEY_PRESSED = None

def on_press(key):
    global KEY_PRESSED
    try:
        KEY_PRESSED = key.name
    except:
        pass

def on_release(key):
    global KEY_PRESSED
    KEY_PRESSED = None


def is_pressed(key):
    global KEY_PRESSED
    return KEY_PRESSED == key

# keyboard listener
listener = pynput.keyboard.Listener(on_press=on_press, on_release=on_release)
listener.start()

LEN_RECORDINGS = 40
class AudioRecorder:
    def __init__(self, chunk=2**10, rate=44100, place='', len_recording=LEN_RECORDINGS, drop_beginning=0.5):
        self.chunk = chunk
        self.rate = rate
        with noalsaerr():
            self.audio = pyaudio.PyAudio()
        self.channels = 1
        self.format = pyaudio.paInt16
        self.stream = self.audio.open(format=self.format,
                                      channels=self.channels,
                                      rate=rate,
                                      input=True,
                                      frames_per_buffer=chunk)
        self.stream.stop_stream()
        self.drop_beginning_chunks = int(drop_beginning * self.rate / self.chunk)
        self.place = place
        self.len_recordings = len_recording

    def get_filename(self):
        now = datetime.now()
        return self.place + '_' + now.strftime("%b_%d_%Y_%Hh%Mm%Ss") + '.mp3'

    def read_last_chunk(self):
        return self.stream.read(self.chunk)

    def live_read(self):
        if self.stream.is_stopped():
            self.stream.start_stream()
        i = 0
        while not is_pressed('esc'):
            data = np.frombuffer(self.stream.read(self.chunk), dtype=np.int16)
            peak = np.average(np.abs(data)) * 2
            bars = "#"*int(50 * peak / 2 ** 16)
            i += 1
            print("%04d %05d %s"%(i,peak,bars))
        self.stream.stop_stream()

    def record_next_N_seconds(self, n=None, saving_path=None):
        if saving_path is None:
            saving_path = SYNTH_RECORDED_AUDIO_PATH + self.get_filename()
        if n is None:
            n = self.len_recordings

        print(f'Recoding the next {n} secs.'
              # f'\n\tRecording starts when the first key is pressed;'
              f'\n\tPress Enter to end the recording;'
              f'\n\tPress BackSpace (<--) to cancel the recording;'
              f'\n\tSaving to {saving_path}')
        try:
            self.stream.start_stream()
            backspace_pressed = False
            self.recording = []
            i_chunk = 0
            while not is_pressed('enter') and self.chunk / self.rate * i_chunk < n:
                self.recording.append(self.read_last_chunk())
                i_chunk += 1
                if is_pressed('backspace'):
                    backspace_pressed = True
                    print('\n \t--> Recording cancelled! (you pressed BackSpace)')
                    break
            self.stream.stop_stream()

            # save the file
            if not backspace_pressed:
                self.recording = self.recording[self.drop_beginning_chunks:] # drop first chunks to remove keyboard sound
                with wave.open(saving_path[:-4] + '.wav', 'wb') as waveFile:
                    waveFile.setnchannels(self.channels)
                    waveFile.setsampwidth(self.audio.get_sample_size(self.format))
                    waveFile.setframerate(self.rate)
                    waveFile.writeframes(b''.join(self.recording))
                os.system(f'ffmpeg -i "{saving_path[:-4] + ".wav"}" -vn -loglevel panic -y -ac 1 -ar {int(RATE_AUDIO_SAVE)} -b:a 320k "{saving_path}" ')
                os.remove(saving_path[:-4] + '.wav')
                print(f'\n--> Recording saved, duration: {self.chunk / self.rate * i_chunk:.2f} secs.')
            return saving_path
        except:
            print('\n --> The recording failed.')
            return None

    def record_one(self):
        ready_msg = False
        print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)')
        while True:
            if not ready_msg:
                print('-------\nReady to record!')
                print('Press space to start a recording\n')
                ready_msg = True

            if is_pressed('space'):
                saving_path = self.record_next_N_seconds()
                break
        return saving_path

    def run(self):
        # with pynput.Listener(
        #         on_press=self.on_press) as listener:
        #     listener.join()
        ready_msg = False
        print('Starting the recording loop!\n\tPress BackSpace to cancel the current recording;\n\tPress Esc to quit the loop (only works while not recording)')
        while True:
            if not ready_msg:
                print('-------\nReady to record!')
                print('Press space to start a recording\n')
                ready_msg = True

            if is_pressed('space'):
                self.record_next_N_seconds()
                ready_msg = False
            if is_pressed('esc'):
                print('End of the recording session. See you soon!')
                self.close()
                break

    def close(self):
        self.stream.close()
        self.audio.terminate()

if __name__ == '__main__':
    audio_recorder = AudioRecorder(place='home')
    audio_recorder.record_one()