python service http refactor start

This commit is contained in:
michalcourson
2026-02-14 11:24:09 -05:00
parent 5516ce9212
commit f3b883602e
27 changed files with 415 additions and 205 deletions

1
audio-service/build.bat Normal file
View File

@ -0,0 +1 @@
python -m venv venv

View File

@ -0,0 +1,10 @@
{
"Test": [
{
"name": "test 2",
"filePath": "C:\\Users\\mickl\\Music\\clips\\original\\audio_capture_20250105_131700.wav",
"volume": 1,
"playbackType": "playStop"
}
]
}

View File

@ -3,4 +3,5 @@ numpy==1.22.3
python-osc==1.9.3
scipy==1.10.1
comtypes==1.4.8
pycaw==20240210
pycaw==20240210
Flask==3.1.2

View File

@ -0,0 +1,10 @@
{
"input_device": {
"default_samplerate": 44100.0,
"index": 1,
"max_input_channels": 8,
"name": "VM Mic mix (VB-Audio Voicemeete"
},
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings",
"recording_length": 30
}

Binary file not shown.

View File

@ -5,6 +5,13 @@ from datetime import datetime
import scipy.io.wavfile as wavfile
class AudioRecorder:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
print("Creating new AudioRecorder instance")
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, duration=30, sample_rate=44100, channels=2, recordings_dir='recordings'):
"""
Initialize audio recorder with configurable parameters.
@ -19,6 +26,33 @@ class AudioRecorder:
self.channels = channels
self.buffer = np.zeros((int(duration * sample_rate), channels), dtype=np.float32)
self.recordings_dir = recordings_dir
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self.record_callback
)
def refresh_stream(self):
"""
Refresh the audio stream with updated parameters.
"""
was_active = self.stream.active
if was_active:
self.stream.stop()
self.buffer = np.zeros((int(self.duration * self.sample_rate), self.channels), dtype=np.float32)
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self.record_callback
)
if was_active:
self.stream.start()
def record_callback(self, indata, frames, time, status):
"""
@ -61,15 +95,48 @@ class AudioRecorder:
return filename
def set_buffer_duration(self, duration):
"""
Set the duration of the audio buffer.
:param duration: New buffer duration in seconds
"""
self.duration = duration
self.buffer = np.zeros((int(duration * self.sample_rate), self.channels), dtype=np.float32)
def set_recording_directory(self, directory):
"""
Set the directory where recordings will be saved.
:param directory: Path to the recordings directory
"""
self.recordings_dir = directory
def start_recording(self):
"""
Start continuous audio recording with circular buffer.
"""
if(self.stream.active):
print("Already recording")
return
print('number of channels', self.channels)
stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self.record_callback
)
stream.start()
return stream
self.stream.start()
def stop_recording(self):
"""
Stop continuous audio recording with circular buffer.
"""
if(not self.stream.active):
print("Already stopped")
return
self.stream.stop()
def is_recording(self):
"""
Check if the audio stream is currently active.
:return: True if recording, False otherwise
"""
return self.stream.active

View File

@ -1,12 +0,0 @@
from pythonosc.udp_client import SimpleUDPClient
import sys
ip = "127.0.0.1"
port = 5005
client = SimpleUDPClient(ip, port) # Create client
# client.send_message("/record/start", 0)
#sleep(5)
client.send_message(sys.argv[1], sys.argv[2] if len(sys.argv) > 2 else 0)
#sleep(5)
# client.send_message("/record/stop", 0)

View File

@ -1,92 +1,75 @@
import argparse
import os
import sys
from osc_server import OSCRecordingServer
from audio_recorder import AudioRecorder
from windows_audio import WindowsAudioManager
import sounddevice as sd
from metadata_manager import MetaDataManager
from settings import SettingsManager
from flask import Flask
from routes.recording import recording_bp
from routes.device import device_bp
from routes.metadata import metadata_bp
from routes.settings import settings_bp
import threading
app = Flask(__name__)
def main():
global recorder, audio_manager
# Create argument parser
parser = argparse.ArgumentParser(description='Audio Recording Service')
# Input device argument
parser.add_argument('--input-device',
type=str,
help='Name or index of the input audio device',
default=None)
# Recording length argument
parser.add_argument('--recording-length',
type=float,
help='Maximum recording length in seconds',
default=30.0)
# Recording save path argument
parser.add_argument('--save-path',
type=str,
help='Directory path to save recordings',
default=os.path.join(os.path.dirname(__file__), 'recordings'))
# OSC port argument
parser.add_argument('--osc-port',
type=int,
help='OSC server port number',
default=5005)
default=5010)
# Parse arguments
args = parser.parse_args()
audio_manager = WindowsAudioManager()
settings = SettingsManager()
# Ensure save path exists
os.makedirs(args.save_path, exist_ok=True)
audio_manager=WindowsAudioManager()
# Handle input device selection
input_device = None
devices = audio_manager.list_audio_devices('input')
if args.input_device:
try:
# Try to convert to integer first (for device index)
input_device = int(args.input_device)
except ValueError:
# If not an integer, treat as device name
print(devices)
for i, device in enumerate(devices):
if args.input_device.lower() in device['name'].lower():
input_device = device['index']
print(f"Using input device: {device['name']}")
break
os.makedirs(settings.get_settings('save_path'), exist_ok=True)
# Create AudioRecorder with specified parameters
# Handle input device selection
# Create Singletons with correct parameters
recorder = AudioRecorder(
duration=args.recording_length,
recordings_dir=args.save_path,
duration=settings.get_settings('recording_length'),
recordings_dir=settings.get_settings('save_path'),
# channels=min(2, devices[input_device]['max_input_channels']),
)
# Create OSC server with specified port
osc_server = OSCRecordingServer(
recorder=recorder,
port=args.osc_port,
audio_manager=audio_manager
)
meta = MetaDataManager()
audio_manager = WindowsAudioManager()
# Register blueprints
app.register_blueprint(recording_bp)
app.register_blueprint(device_bp)
app.register_blueprint(metadata_bp)
app.register_blueprint(settings_bp)
app.run(host='127.0.0.1', port=args.osc_port, debug=False, use_reloader=True)
osc_server.set_audio_device(None, str(input_device))
osc_server.start_recording(None)
# Run the OSC server
try:
print(f"Starting OSC Recording Server on port {args.osc_port}")
print(f"Recording save path: {args.save_path}")
print(f"Max recording length: {args.recording_length} seconds")
osc_server.run_server()
# osc_server.run_server()
except KeyboardInterrupt:
print("\nServer stopped by user.")
except Exception as e:
print(f"Error starting server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,48 @@
import os
import json
class MetaDataManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# read metadata file from executing directory
self.metadata_file = os.path.join(os.getcwd(), "metadata.json")
if os.path.exists(self.metadata_file):
with open(self.metadata_file, "r") as f:
self.collections = json.load(f)
else:
self.collections = {}
def create_collection(self, name):
if name in self.collections:
raise ValueError(f"Collection '{name}' already exists.")
self.collections[name] = []
self.save_metadata()
def delete_collection(self, name):
if name not in self.collections:
raise ValueError(f"Collection '{name}' does not exist.")
del self.collections[name]
self.save_metadata()
def add_clip_to_collection(self, collection_name, clip_metadata):
if collection_name not in self.collections:
raise ValueError(f"Collection '{collection_name}' does not exist.")
self.collections[collection_name].append(clip_metadata)
self.save_metadata()
def get_collections(self):
return list(self.collections.keys())
def get_clips_in_collection(self, collection_name):
if collection_name not in self.collections:
raise ValueError(f"Collection '{collection_name}' does not exist.")
return self.collections[collection_name]
def save_metadata(self):
with open(self.metadata_file, "w") as f:
json.dump(self.collections, f, indent=4)

View File

@ -1,130 +0,0 @@
from pythonosc import dispatcher, osc_server
import threading
import sys
from audio_recorder import AudioRecorder
from windows_audio import WindowsAudioManager
class OSCRecordingServer:
def __init__(self, recorder, audio_manager, ip="127.0.0.1", port=5005):
"""
Initialize OSC server for audio recording triggers.
:param recorder: AudioRecorder instance
:param audio_manager: WindowsAudioManager instance
:param ip: IP address to bind OSC server
:param port: Port number for OSC server
"""
self.recorder = recorder
self.audio_manager = audio_manager
self.ip = ip
self.port = port
self._setup_dispatcher()
self.server_thread = None
def _setup_dispatcher(self):
"""
Set up OSC message dispatchers for different recording commands.
"""
self.osc_dispatcher = dispatcher.Dispatcher()
self.osc_dispatcher.map("/record/start", self.start_recording)
self.osc_dispatcher.map("/record/stop", self.stop_recording)
self.osc_dispatcher.map("/record/save", self.save_recording)
self.osc_dispatcher.map("/exit", self.exit_program)
self.osc_dispatcher.map("/device/set", self.set_audio_device) # New device set handler
self.osc_dispatcher.map("/device/list", self.list_audio_devices) # New device list handler
def start_recording(self, unused_addr, args=None):
"""
Start audio recording via OSC message.
"""
print("OSC: Starting audio recording")
self.recording_stream = self.recorder.start_recording()
def stop_recording(self, unused_addr, args=None):
"""
Stop active audio recording via OSC message.
"""
print("OSC: Stopping audio recording")
if hasattr(self, 'recording_stream'):
self.recording_stream.stop()
def save_recording(self, unused_addr, args=None):
"""
Save the current audio buffer via OSC message.
"""
print("OSC: Saving audio recording")
saved_file = self.recorder.save_last_n_seconds()
print(f"Saved recording to: {saved_file}")
def set_audio_device(self, unused_addr, device_index):
"""
Set the default input audio device via OSC message.
:param device_index: Index of the audio device to set
"""
try:
device_index = int(device_index)
print(f"OSC: Setting audio device to index {device_index}")
# Get the sample rate of the new device
sample_rate = self.audio_manager.set_default_input_device(device_index)
# Reinitialize recorder with new device's sample rate
self.recorder = AudioRecorder(
duration=self.recorder.duration,
sample_rate=sample_rate,
channels=self.recorder.channels,
recordings_dir=self.recorder.recordings_dir
)
print(f"Successfully set audio device to index {device_index} with sample rate {sample_rate}")
except Exception as e:
print(f"OSC: Error setting audio device - {e}")
def list_audio_devices(self, unused_addr, device_type='input'):
"""
List available audio devices via OSC message.
:param device_type: 'input' or 'output'
"""
try:
devices = self.audio_manager.list_audio_devices(device_type)
print(f"Available {device_type} devices:")
for idx, device in enumerate(devices):
print(f"Index {device['index']}: {device['name']}")
except Exception as e:
print(f"OSC: Error listing audio devices - {e}")
def exit_program(self, unused_addr, args=None):
"""
Gracefully exit the program via OSC message.
"""
print("OSC: Received exit command. Shutting down...")
if hasattr(self, 'recording_stream'):
self.recording_stream.stop()
if self.server_thread:
self.server.shutdown()
self.server_thread.join()
sys.exit(0)
def run_server(self):
"""
Start the OSC server in a separate thread.
"""
self.server = osc_server.ThreadingOSCUDPServer(
(self.ip, self.port),
self.osc_dispatcher
)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
return self.server_thread
def stop_server(self):
"""
Stop the OSC server.
"""
if hasattr(self, 'server'):
self.server.shutdown()

View File

@ -0,0 +1,37 @@
from flask import Blueprint, request, jsonify
from windows_audio import WindowsAudioManager
from audio_recorder import AudioRecorder
device_bp = Blueprint('device', __name__)
audio_manager = WindowsAudioManager()
recorder = AudioRecorder()
# @device_bp.route('/device/set', methods=['POST'])
# def set_audio_device():
# device_index = request.json.get('device_index')
# try:
# device_index = int(device_index)
# print(f'HTTP: Setting audio device to index {device_index}')
# sample_rate = audio_manager.set_default_input_device(device_index)
# recorder.sample_rate = sample_rate
# return jsonify({'status': 'device set', 'device_index': device_index, 'sample_rate': sample_rate})
# except Exception as e:
# return jsonify({'status': 'error', 'message': str(e)}), 400
@device_bp.route('/device/get', methods=['GET'])
def get_audio_device():
try:
device_info = audio_manager.get_default_device('input')
return jsonify({'status': 'success', 'device_info': device_info})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@device_bp.route('/device/list', methods=['GET'])
def list_audio_devices():
device_type = request.args.get('device_type', 'input')
try:
devices = audio_manager.list_audio_devices(device_type)
return jsonify({'status': 'success', 'devices': devices})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400

View File

@ -0,0 +1,43 @@
from flask import Blueprint, request, jsonify
from metadata_manager import MetaDataManager
metadata_bp = Blueprint('metadata', __name__)
@metadata_bp.route('/meta/collections', methods=['GET'])
def get_collections():
meta_manager = MetaDataManager()
collections = meta_manager.get_collections()
return jsonify({'status': 'success', 'collections': collections})
@metadata_bp.route('/meta/collections/add', methods=['POST'])
def add_collection():
meta_manager = MetaDataManager()
collection_name = request.json.get('name')
try:
meta_manager.create_collection(collection_name)
return jsonify({'status': 'success', 'collection': collection_name})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@metadata_bp.route('/meta/collection/clips', methods=['GET'])
def get_clips_in_collection():
meta_manager = MetaDataManager()
collection_name = request.args.get('name')
try:
clips = meta_manager.get_clips_in_collection(collection_name)
return jsonify({'status': 'success', 'clips': clips})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@metadata_bp.route('/meta/collection/clips/add', methods=['POST'])
def add_clip_to_collection():
meta_manager = MetaDataManager()
collection_name = request.json.get('name')
clip_metadata = request.json.get('clip')
try:
meta_manager.add_clip_to_collection(collection_name, clip_metadata)
clips = meta_manager.get_clips_in_collection(collection_name)
return jsonify({'status': 'success', 'clips': clips})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400

View File

@ -0,0 +1,34 @@
from flask import Blueprint, request, jsonify
from audio_recorder import AudioRecorder
recording_bp = Blueprint('recording', __name__)
@recording_bp.route('/record/start', methods=['POST'])
def start_recording():
recorder = AudioRecorder()
print('HTTP: Starting audio recording')
recorder.start_recording()
return jsonify({'status': 'recording started'})
@recording_bp.route('/record/stop', methods=['POST'])
def stop_recording():
recorder = AudioRecorder()
print('HTTP: Stopping audio recording')
recorder.stop_recording()
return jsonify({'status': 'recording stopped'})
@recording_bp.route('/record/save', methods=['POST'])
def save_recording():
recorder = AudioRecorder()
print('HTTP: Saving audio recording')
saved_file = recorder.save_last_n_seconds()
return jsonify({'status': 'recording saved', 'file': saved_file})
@recording_bp.route('/record/status', methods=['GET'])
def recording_status():
recorder = AudioRecorder()
print('HTTP: Checking recording status')
status = 'recording' if recorder.is_recording() else 'stopped'
return jsonify({'status': status})

View File

@ -0,0 +1,25 @@
from flask import Blueprint, request, jsonify
from settings import SettingsManager
settings_bp = Blueprint('settings', __name__)
@settings_bp.route('/settings', methods=['GET'])
def get_all_settings():
return jsonify({'status': 'success', 'settings': SettingsManager().get_all_settings()})
@settings_bp.route('/settings/<name>', methods=['GET'])
def get_setting(name):
value = SettingsManager().get_settings(name)
if value is not None:
return jsonify({'status': 'success', 'name': name, 'value': value})
else:
return jsonify({'status': 'error', 'message': f'Setting "{name}" not found'}), 404
@settings_bp.route('/settings/<name>', methods=['POST'])
def set_setting(name):
value = request.json.get('value')
if value is None:
return jsonify({'status': 'error', 'message': 'Value is required'}), 400
SettingsManager().set_settings(name, value)
return jsonify({'status': 'success', 'name': name, 'value': value})

View File

@ -0,0 +1,68 @@
import os
import json
from audio_recorder import AudioRecorder
from windows_audio import WindowsAudioManager
class SettingsManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
# read settings file from executing directory
self.settings_file = os.path.join(os.getcwd(), "settings.json")
if os.path.exists(self.settings_file):
with open(self.settings_file, "r") as f:
self.settings = json.load(f)
else:
self.settings = {
"input_device": None,
"save_path": os.path.join(os.getcwd(), "recordings"),
"recording_length": 15
}
audio_manager = WindowsAudioManager()
devices = audio_manager.list_audio_devices('input')
print(f"Available input devices: {self.settings}")
input = self.settings["input_device"]
#see if input device is in "devices", if not set to the first index
if input is not None and any(d['name'] == input["name"] for d in devices):
print(f"Using saved input device index: {input}")
else:
input = devices[0] if devices else None
self.settings["input_device"] = input
self.save_settings()
def get_settings(self, name):
# print(f"Getting setting '{name}': {self.settings}")
return self.settings.get(name, None)
def get_all_settings(self):
return self.settings
def set_settings(self, name, value):
self.settings[name] = value
self.save_settings()
def save_settings(self):
self.refresh_settings()
with open(self.settings_file, "w") as f:
json.dump(self.settings, f, indent=4)
def refresh_settings(self):
recorder = AudioRecorder()
# Update recorder parameters based on new setting
recorder.set_buffer_duration(self.get_settings('recording_length'))
recorder.recordings_dir = self.get_settings('save_path')
audio_manager = WindowsAudioManager()
audio_manager.set_default_input_device(self.get_settings('input_device')['index'])
recorder.refresh_stream()

View File

@ -7,6 +7,12 @@ from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import json
class WindowsAudioManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
Initialize Windows audio device and volume management.
@ -42,8 +48,27 @@ class WindowsAudioManager:
}
for dev in self.devices if dev['max_output_channels'] > 0
]
def get_default_device(self, kind='input'):
"""
Get the default audio device.
:param kind: 'input' or 'output'
:return: Default audio device information
"""
if kind == 'input':
dev = self.devices[self.default_input]
return [
{
'index': dev['index'],
'name': dev['name'],
'max_input_channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
]
def set_default_input_device(self, device_index):
if(device_index is None):
return self.get_current_input_device_sample_rate()
"""
Set the default input audio device.