This commit is contained in:
michalcourson
2026-02-04 18:13:56 -05:00
commit 51ad065047
26 changed files with 8383 additions and 0 deletions

View File

@ -0,0 +1,75 @@
import sounddevice as sd
import numpy as np
import os
from datetime import datetime
import scipy.io.wavfile as wavfile
class AudioRecorder:
def __init__(self, duration=30, sample_rate=44100, channels=2, recordings_dir='recordings'):
"""
Initialize audio recorder with configurable parameters.
:param duration: Length of audio buffer in seconds
:param sample_rate: Audio sample rate (if None, use default device sample rate)
:param channels: Number of audio channels
"""
self.duration = duration
self.sample_rate = sample_rate
self.channels = channels
self.buffer = np.zeros((int(duration * sample_rate), channels), dtype=np.float32)
self.recordings_dir = recordings_dir
def record_callback(self, indata, frames, time, status):
"""
Circular buffer callback for continuous recording.
:param indata: Input audio data
:param frames: Number of frames
:param time: Timestamp
:param status: Recording status
"""
if status:
print(f"Recording status: {status}")
# Circular buffer implementation
self.buffer = np.roll(self.buffer, -frames, axis=0)
self.buffer[-frames:] = indata
def save_last_n_seconds(self):
"""
Save the last n seconds of audio to a file.
:param output_dir: Directory to save recordings
:return: Path to saved audio file
"""
# Create output directory if it doesn't exist
os.makedirs(self.recordings_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.recordings_dir, f"audio_capture_{timestamp}.wav")
# Normalize audio to prevent clipping
audio_data = self.buffer / np.max(np.abs(self.buffer)) * .5
# Convert float32 to int16 for WAV file
audio_data_int16 = (audio_data * 32767).astype(np.int16)
# Write buffer to file
wavfile.write(filename, int(self.sample_rate), audio_data_int16)
return filename
def start_recording(self):
"""
Start continuous audio recording with circular buffer.
"""
print('number of channels', self.channels)
stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self.record_callback
)
stream.start()
return stream

View File

@ -0,0 +1,12 @@
from pythonosc.udp_client import SimpleUDPClient
import sys
ip = "127.0.0.1"
port = 5005
client = SimpleUDPClient(ip, port) # Create client
# client.send_message("/record/start", 0)
#sleep(5)
client.send_message(sys.argv[1], sys.argv[2] if len(sys.argv) > 2 else 0)
#sleep(5)
# client.send_message("/record/stop", 0)

92
audio-service/src/main.py Normal file
View File

@ -0,0 +1,92 @@
import argparse
import os
import sys
from osc_server import OSCRecordingServer
from audio_recorder import AudioRecorder
from windows_audio import WindowsAudioManager
import sounddevice as sd
def main():
# Create argument parser
parser = argparse.ArgumentParser(description='Audio Recording Service')
# Input device argument
parser.add_argument('--input-device',
type=str,
help='Name or index of the input audio device',
default=None)
# Recording length argument
parser.add_argument('--recording-length',
type=float,
help='Maximum recording length in seconds',
default=30.0)
# Recording save path argument
parser.add_argument('--save-path',
type=str,
help='Directory path to save recordings',
default=os.path.join(os.path.dirname(__file__), 'recordings'))
# OSC port argument
parser.add_argument('--osc-port',
type=int,
help='OSC server port number',
default=5005)
# Parse arguments
args = parser.parse_args()
# Ensure save path exists
os.makedirs(args.save_path, exist_ok=True)
audio_manager=WindowsAudioManager()
# Handle input device selection
input_device = None
devices = audio_manager.list_audio_devices('input')
if args.input_device:
try:
# Try to convert to integer first (for device index)
input_device = int(args.input_device)
except ValueError:
# If not an integer, treat as device name
print(devices)
for i, device in enumerate(devices):
if args.input_device.lower() in device['name'].lower():
input_device = device['index']
print(f"Using input device: {device['name']}")
break
# Create AudioRecorder with specified parameters
recorder = AudioRecorder(
duration=args.recording_length,
recordings_dir=args.save_path,
# channels=min(2, devices[input_device]['max_input_channels']),
)
# Create OSC server with specified port
osc_server = OSCRecordingServer(
recorder=recorder,
port=args.osc_port,
audio_manager=audio_manager
)
osc_server.set_audio_device(None, str(input_device))
osc_server.start_recording(None)
# Run the OSC server
try:
print(f"Starting OSC Recording Server on port {args.osc_port}")
print(f"Recording save path: {args.save_path}")
print(f"Max recording length: {args.recording_length} seconds")
osc_server.run_server()
except KeyboardInterrupt:
print("\nServer stopped by user.")
except Exception as e:
print(f"Error starting server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,130 @@
from pythonosc import dispatcher, osc_server
import threading
import sys
from audio_recorder import AudioRecorder
from windows_audio import WindowsAudioManager
class OSCRecordingServer:
def __init__(self, recorder, audio_manager, ip="127.0.0.1", port=5005):
"""
Initialize OSC server for audio recording triggers.
:param recorder: AudioRecorder instance
:param audio_manager: WindowsAudioManager instance
:param ip: IP address to bind OSC server
:param port: Port number for OSC server
"""
self.recorder = recorder
self.audio_manager = audio_manager
self.ip = ip
self.port = port
self._setup_dispatcher()
self.server_thread = None
def _setup_dispatcher(self):
"""
Set up OSC message dispatchers for different recording commands.
"""
self.osc_dispatcher = dispatcher.Dispatcher()
self.osc_dispatcher.map("/record/start", self.start_recording)
self.osc_dispatcher.map("/record/stop", self.stop_recording)
self.osc_dispatcher.map("/record/save", self.save_recording)
self.osc_dispatcher.map("/exit", self.exit_program)
self.osc_dispatcher.map("/device/set", self.set_audio_device) # New device set handler
self.osc_dispatcher.map("/device/list", self.list_audio_devices) # New device list handler
def start_recording(self, unused_addr, args=None):
"""
Start audio recording via OSC message.
"""
print("OSC: Starting audio recording")
self.recording_stream = self.recorder.start_recording()
def stop_recording(self, unused_addr, args=None):
"""
Stop active audio recording via OSC message.
"""
print("OSC: Stopping audio recording")
if hasattr(self, 'recording_stream'):
self.recording_stream.stop()
def save_recording(self, unused_addr, args=None):
"""
Save the current audio buffer via OSC message.
"""
print("OSC: Saving audio recording")
saved_file = self.recorder.save_last_n_seconds()
print(f"Saved recording to: {saved_file}")
def set_audio_device(self, unused_addr, device_index):
"""
Set the default input audio device via OSC message.
:param device_index: Index of the audio device to set
"""
try:
device_index = int(device_index)
print(f"OSC: Setting audio device to index {device_index}")
# Get the sample rate of the new device
sample_rate = self.audio_manager.set_default_input_device(device_index)
# Reinitialize recorder with new device's sample rate
self.recorder = AudioRecorder(
duration=self.recorder.duration,
sample_rate=sample_rate,
channels=self.recorder.channels,
recordings_dir=self.recorder.recordings_dir
)
print(f"Successfully set audio device to index {device_index} with sample rate {sample_rate}")
except Exception as e:
print(f"OSC: Error setting audio device - {e}")
def list_audio_devices(self, unused_addr, device_type='input'):
"""
List available audio devices via OSC message.
:param device_type: 'input' or 'output'
"""
try:
devices = self.audio_manager.list_audio_devices(device_type)
print(f"Available {device_type} devices:")
for idx, device in enumerate(devices):
print(f"Index {device['index']}: {device['name']}")
except Exception as e:
print(f"OSC: Error listing audio devices - {e}")
def exit_program(self, unused_addr, args=None):
"""
Gracefully exit the program via OSC message.
"""
print("OSC: Received exit command. Shutting down...")
if hasattr(self, 'recording_stream'):
self.recording_stream.stop()
if self.server_thread:
self.server.shutdown()
self.server_thread.join()
sys.exit(0)
def run_server(self):
"""
Start the OSC server in a separate thread.
"""
self.server = osc_server.ThreadingOSCUDPServer(
(self.ip, self.port),
self.osc_dispatcher
)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
return self.server_thread
def stop_server(self):
"""
Stop the OSC server.
"""
if hasattr(self, 'server'):
self.server.shutdown()

View File

@ -0,0 +1,4 @@
[ViewState]
Mode=
Vid=
FolderType=Generic

View File

@ -0,0 +1,97 @@
import sounddevice as sd
import numpy as np
import comtypes
import comtypes.client
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import json
class WindowsAudioManager:
def __init__(self):
"""
Initialize Windows audio device and volume management.
"""
self.devices = sd.query_devices()
self.default_input = sd.default.device[0]
self.default_output = sd.default.device[1]
def list_audio_devices(self, kind='input'):
"""
List available audio devices.
:param kind: 'input' or 'output'
:return: List of audio devices
"""
if kind == 'input':
return [
{
'index': dev['index'],
'name': dev['name'],
'max_input_channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_input_channels'] > 0
]
elif kind == 'output':
return [
{
'index': dev['index'],
'name': dev['name'],
'max_output_channels': dev['max_output_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_output_channels'] > 0
]
def set_default_input_device(self, device_index):
"""
Set the default input audio device.
:param device_index: Index of the audio device
:return: Sample rate of the selected device
"""
sd.default.device[0] = device_index
self.default_input = device_index
# Get the sample rate of the selected device
device_info = sd.query_devices(device_index)
return device_info['default_samplerate']
def get_current_input_device_sample_rate(self):
"""
Get the sample rate of the current input device.
:return: Sample rate of the current input device
"""
device_info = sd.query_devices(self.default_input)
return device_info['default_samplerate']
def get_system_volume(self):
"""
Get the system master volume.
:return: Current system volume (0.0 to 1.0)
"""
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(
IAudioEndpointVolume._iid_,
CLSCTX_ALL,
None
)
volume = interface.QueryInterface(IAudioEndpointVolume)
return volume.GetMasterVolumeLevelScalar()
def set_system_volume(self, volume_level):
"""
Set the system master volume.
:param volume_level: Volume level (0.0 to 1.0)
"""
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(
IAudioEndpointVolume._iid_,
CLSCTX_ALL,
None
)
volume = interface.QueryInterface(IAudioEndpointVolume)
volume.SetMasterVolumeLevelScalar(volume_level, None)