Merge branch 'socket' into react_migration

This commit is contained in:
michalcourson
2026-02-28 17:04:44 -05:00
80 changed files with 2471 additions and 2301 deletions

View File

@ -1,34 +1,134 @@
[
{
"name": "Uncategorized",
"id": 0,
"clips": []
},
{
"name": "Test",
"id": 1,
"clips": []
},
{
"name": "New",
"id": 2,
"clips": [
{
"endTime": 30,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_193822.wav",
"name": "Pee pee\npoo poo",
"playbackType": "playOverlap",
"startTime": 27.76674010920584,
"volume": 0.25
},
{
"endTime": 27.516843118383072,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_200442.wav",
"name": "Clip 20260220_200442",
"playbackType": "playOverlap",
"startTime": 25.120307988450435,
"volume": 0.64
}
]
}
[
{
"name": "Uncategorized",
"id": 0,
"clips": [
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_195932.wav",
"name": "Clip 20260226_195932",
"playbackType": "playOverlap",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260228_165611.wav",
"name": "Clip 20260228_165611",
"playbackType": "playStop",
"volume": 1.0
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260228_165646.wav",
"name": "Clip 20260228_165646",
"playbackType": "playStop",
"volume": 1.0
}
]
},
{
"name": "Test",
"id": 1,
"clips": [
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_183812.wav",
"name": "Clip 20260226_183812",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_183607.wav",
"name": "Clip 20260226_183607",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_183822.wav",
"name": "Clip 20260226_183822",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184028.wav",
"name": "Clip 20260226_184028",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184030.wav",
"name": "Clip 20260226_184030",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184032.wav",
"name": "Clip 20260226_184032",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184037.wav",
"name": "Clip 20260226_184037",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184040.wav",
"name": "Clip 20260226_184040",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184041.wav",
"name": "Clip 20260226_184041",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260226_184042.wav",
"name": "Clip 20260226_184042",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260228_092721.wav",
"name": "Clip 20260228_092721",
"playbackType": "playStop",
"volume": 1,
"startTime": 6.438382145377559,
"endTime": 14.277258292166426
}
]
},
{
"name": "New",
"id": 2,
"clips": [
{
"endTime": 30,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_193822.wav",
"name": "Pee pee\npoo poo",
"playbackType": "playOverlap",
"startTime": 27.64044943820222,
"volume": 0.31
},
{
"endTime": 30,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_200442.wav",
"name": "Test",
"playbackType": "playOverlap",
"startTime": 26.14685314685314,
"volume": 0.64
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260228_085116.wav",
"name": "pp",
"playbackType": "playStop",
"volume": 1
},
{
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260228_120955.wav",
"name": "nose",
"playbackType": "playStop",
"volume": 1
}
]
}
]

View File

@ -1,7 +1,6 @@
sounddevice==0.5.1
numpy==1.22.3
python-osc==1.9.3
scipy==1.10.1
comtypes==1.4.8
pycaw==20240210
Flask==3.1.2
Flask==3.1.3
flask_cors==6.0.2
flask_socketio==5.6.1
numpy==2.4.2
scipy==1.17.1
sounddevice==0.5.5

View File

@ -1,17 +1,17 @@
{
"input_device": {
"channels": 2,
"default_samplerate": 48000,
"index": 55,
"name": "VM Mic mix (VB-Audio Voicemeeter VAIO)"
},
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings",
"recording_length": 30,
"output_device": {
"channels": 2,
"default_samplerate": 48000,
"index": 45,
"name": "VM to Discord (VB-Audio Voicemeeter VAIO)"
},
"http_port": 5010
{
"input_device": {
"channels": 2,
"default_samplerate": 48000,
"index": 55,
"name": "VM Mic mix (VB-Audio Voicemeeter VAIO)"
},
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings",
"recording_length": 30,
"output_device": {
"channels": 2,
"default_samplerate": 48000,
"index": 45,
"name": "VM to Discord (VB-Audio Voicemeeter VAIO)"
},
"http_port": 5010
}

View File

@ -1,166 +1,168 @@
import sounddevice as sd
import numpy as np
import os
from datetime import datetime
import scipy.io.wavfile as wavfile
from metadata_manager import MetaDataManager
from audio_clip import AudioClip
# AudioClip class for clip playback
class AudioIO:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
# print("Creating new AudioRecorder instance")
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
self.duration = 30
self.channels = 2
self.input_sample_rate = 44100
self.output_sample_rate = 44100
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
self.recordings_dir = "recordings"
sd.default.latency = 'low'
self.in_stream = sd.InputStream(
callback=self.record_callback
)
self.out_stream = sd.OutputStream(
callback=self.playback_callback,
latency=3
)
self.clip_map = {}
def refresh_streams(self):
was_active = self.in_stream.active
if was_active:
self.in_stream.stop()
self.out_stream.stop()
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
# print(f"AudioRecorder initialized with duration={self.duration}s, sample_rate={self.sample_rate}Hz, channels={self.channels}")
self.in_stream = sd.InputStream(
callback=self.record_callback
)
self.out_stream = sd.OutputStream(
callback=self.playback_callback
)
if was_active:
self.in_stream.start()
self.out_stream.start()
def record_callback(self, indata, frames, time, status):
if status:
# print(f"Recording status: {status}")
pass
# Circular buffer implementation
self.buffer = np.roll(self.buffer, -frames, axis=0)
self.buffer[-frames:] = indata
def playback_callback(self, outdata, frames, time, status):
if status:
# print(f"Playback status: {status}")
pass
outdata.fill(0)
# Iterate over a copy of the items to avoid modifying the dictionary during iteration
for clip_id, clip_list in list(self.clip_map.items()):
for clip in clip_list[:]: # Iterate over a copy of the list
if not clip.is_finished():
samples = clip.get_samples(frames)
outdata[:] += samples.reshape(-1, 1) # Mix into output
if clip.is_finished():
self.clip_map[clip_id].remove(clip)
if len(self.clip_map[clip_id]) == 0:
del self.clip_map[clip_id]
break # Exit inner loop since the key is deleted
def save_last_n_seconds(self):
# Create output directory if it doesn't exist
os.makedirs(self.recordings_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.recordings_dir, f"audio_capture_{timestamp}.wav")
# Normalize audio to prevent clipping
audio_data = self.buffer / np.max(np.abs(self.buffer)) * .5
# Convert float32 to int16 for WAV file
audio_data_int16 = (audio_data * 32767).astype(np.int16)
# Write buffer to file
wavfile.write(filename, int(self.input_sample_rate), audio_data_int16)
meta = MetaDataManager()
clip_metadata = {
"filename": filename,
"name": f"Clip {timestamp}",
"playbackType":"playStop",
"volume": 1.0,
}
meta.add_clip_to_collection("Uncategorized", clip_metadata )
return clip_metadata
def set_buffer_duration(self, duration):
self.duration = duration
self.buffer = np.zeros((int(duration * self.input_sample_rate), self.channels), dtype=np.float32)
def set_recording_directory(self, directory):
self.recordings_dir = directory
def start_recording(self):
if(self.in_stream.active):
# print("Already recording")
return
# print('number of channels', self.channels)
self.in_stream.start()
self.out_stream.start()
self.output_sample_rate = self.out_stream.samplerate
self.input_sample_rate = self.in_stream.samplerate
def stop_recording(self):
if(not self.in_stream.active):
# print("Already stopped")
return
self.in_stream.stop()
self.out_stream.stop()
def is_recording(self):
return self.in_stream.active
def play_clip(self, clip_metadata):
print(f"Playing clip: {clip_metadata}")
clip_id = clip_metadata.get("filename")
if clip_metadata.get("playbackType") == "playStop":
if clip_id in self.clip_map:
del self.clip_map[clip_id]
return
else:
self.clip_map[clip_id] = []
if clip_id not in self.clip_map:
self.clip_map[clip_id] = []
import sounddevice as sd
import numpy as np
import os
from datetime import datetime
import scipy.io.wavfile as wavfile
from metadata_manager import MetaDataManager
from audio_clip import AudioClip
# AudioClip class for clip playback
class AudioIO:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
# print("Creating new AudioRecorder instance")
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
self.duration = 30
self.channels = 2
self.input_sample_rate = 44100
self.output_sample_rate = 44100
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
self.recordings_dir = "recordings"
sd.default.latency = 'low'
self.socket = None
self.in_stream = sd.InputStream(
callback=self.record_callback
)
self.out_stream = sd.OutputStream(
callback=self.playback_callback,
latency=3
)
self.clip_map = {}
def refresh_streams(self):
was_active = self.in_stream.active
if was_active:
self.in_stream.stop()
self.out_stream.stop()
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
# print(f"AudioRecorder initialized with duration={self.duration}s, sample_rate={self.sample_rate}Hz, channels={self.channels}")
self.in_stream = sd.InputStream(
callback=self.record_callback
)
self.out_stream = sd.OutputStream(
callback=self.playback_callback
)
if was_active:
self.in_stream.start()
self.out_stream.start()
def record_callback(self, indata, frames, time, status):
if status:
# print(f"Recording status: {status}")
pass
# Circular buffer implementation
self.buffer = np.roll(self.buffer, -frames, axis=0)
self.buffer[-frames:] = indata
def playback_callback(self, outdata, frames, time, status):
if status:
# print(f"Playback status: {status}")
pass
outdata.fill(0)
# Iterate over a copy of the items to avoid modifying the dictionary during iteration
for clip_id, clip_list in list(self.clip_map.items()):
for clip in clip_list[:]: # Iterate over a copy of the list
if not clip.is_finished():
samples = clip.get_samples(frames)
outdata[:] += samples.reshape(-1, 1) # Mix into output
if clip.is_finished():
self.clip_map[clip_id].remove(clip)
if len(self.clip_map[clip_id]) == 0:
del self.clip_map[clip_id]
break # Exit inner loop since the key is deleted
def save_last_n_seconds(self):
# Create output directory if it doesn't exist
os.makedirs(self.recordings_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.recordings_dir, f"audio_capture_{timestamp}.wav")
# Normalize audio to prevent clipping
audio_data = self.buffer / np.max(np.abs(self.buffer)) * .5
# Convert float32 to int16 for WAV file
audio_data_int16 = (audio_data * 32767).astype(np.int16)
# Write buffer to file
wavfile.write(filename, int(self.input_sample_rate), audio_data_int16)
meta = MetaDataManager()
clip_metadata = {
"filename": filename,
"name": f"Clip {timestamp}",
"playbackType":"playStop",
"volume": 1.0,
}
meta.add_clip_to_collection("Uncategorized", clip_metadata )
self.socket.emit('new_clip', clip_metadata)
return clip_metadata
def set_buffer_duration(self, duration):
self.duration = duration
self.buffer = np.zeros((int(duration * self.input_sample_rate), self.channels), dtype=np.float32)
def set_recording_directory(self, directory):
self.recordings_dir = directory
def start_recording(self):
if(self.in_stream.active):
# print("Already recording")
return
# print('number of channels', self.channels)
self.in_stream.start()
self.out_stream.start()
self.output_sample_rate = self.out_stream.samplerate
self.input_sample_rate = self.in_stream.samplerate
def stop_recording(self):
if(not self.in_stream.active):
# print("Already stopped")
return
self.in_stream.stop()
self.out_stream.stop()
def is_recording(self):
return self.in_stream.active
def play_clip(self, clip_metadata):
print(f"Playing clip: {clip_metadata}")
clip_id = clip_metadata.get("filename")
if clip_metadata.get("playbackType") == "playStop":
if clip_id in self.clip_map:
del self.clip_map[clip_id]
return
else:
self.clip_map[clip_id] = []
if clip_id not in self.clip_map:
self.clip_map[clip_id] = []
self.clip_map[clip_id].append(AudioClip(clip_metadata, target_sample_rate=self.output_sample_rate))

View File

@ -1,68 +1,94 @@
import argparse
import os
import sys
from audio_io import AudioIO
from windows_audio import WindowsAudioManager
import sounddevice as sd
from metadata_manager import MetaDataManager
from settings import SettingsManager
from flask import Flask
from flask_cors import CORS
from routes.recording import recording_bp
from routes.device import device_bp
from routes.metadata import metadata_bp
from routes.settings import settings_bp
from flask_socketio import SocketIO
import threading
app = Flask(__name__)
CORS(app)
# socketio = SocketIO(app, cors_allowed_origins="*")
# CORS(socketio)
def main():
# Create argument parser
parser = argparse.ArgumentParser(description='Audio Recording Service')
# OSC port argument
parser.add_argument('--osc-port',
type=int,
help='OSC server port number',
default=5010)
# Parse arguments
args = parser.parse_args()
audio_manager = WindowsAudioManager()
settings = SettingsManager()
# Ensure save path exists
os.makedirs(settings.get_settings('save_path'), exist_ok=True)
io = AudioIO()
io.start_recording()
# Register blueprints
app.register_blueprint(recording_bp)
app.register_blueprint(device_bp)
app.register_blueprint(metadata_bp)
app.register_blueprint(settings_bp)
app.run(host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
# socketio.run(app, host='127.0.0.1', port=args.osc_port, debug=False, use_reloader=True)
# Run the OSC server
# try:
# print(f"Starting OSC Recording Server on port {args.osc_port}")
# # osc_server.run_server()
# except KeyboardInterrupt:
# print("\nServer stopped by user.")
# except Exception as e:
# print(f"Error starting server: {e}")
# sys.exit(1)
if __name__ == "__main__":
import argparse
import os
import sys
from audio_io import AudioIO
from windows_audio import WindowsAudioManager
import sounddevice as sd
from metadata_manager import MetaDataManager
from settings import SettingsManager
from flask import Flask
from flask_cors import CORS
from routes.recording import recording_bp
from routes.device import device_bp
from routes.metadata import metadata_bp
from routes.settings import settings_bp
from flask_socketio import SocketIO, emit
import threading
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True, async_mode='eventlet')
@socketio.on('connect')
def handle_connect():
print("Client connected")
emit('full_data', MetaDataManager().collections)
@socketio.on('record_clip')
def record_clip():
io = AudioIO()
io.save_last_n_seconds();
@socketio.on('play_clip')
def play_clip(data):
io = AudioIO()
print(f"Received play_clip event with data: {data}")
if data:
io.play_clip(data)
def main():
# Create argument parser
parser = argparse.ArgumentParser(description='Audio Recording Service')
# OSC port argument
parser.add_argument('--osc-port',
type=int,
help='OSC server port number',
default=5010)
# Parse arguments
args = parser.parse_args()
audio_manager = WindowsAudioManager()
settings = SettingsManager()
meta = MetaDataManager()
# Ensure save path exists
os.makedirs(settings.get_settings('save_path'), exist_ok=True)
io = AudioIO()
io.start_recording()
# settings.socket = socketio
io.socket = socketio
meta.socket = socketio
# Register blueprints
app.register_blueprint(recording_bp)
app.register_blueprint(device_bp)
app.register_blueprint(metadata_bp)
app.register_blueprint(settings_bp)
print(f"Starting Flask server on port {settings.get_settings('http_port')}")
# app.run(host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
socketio.run(app, host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True, allow_unsafe_werkzeug=True)
# Run the OSC server
# try:
# print(f"Starting OSC Recording Server on port {args.osc_port}")
# # osc_server.run_server()
# except KeyboardInterrupt:
# print("\nServer stopped by user.")
# except Exception as e:
# print(f"Error starting server: {e}")
# sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,20 +1,20 @@
{
"Uncategorized": [
{
"endTime": 12.489270386266055,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133540.wav",
"name": "Clip 20260214_133540",
"playbackType": "playStop",
"startTime": 10.622317596566523,
"volume": 1
},
{
"endTime": 6.824034334763957,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133137.wav",
"name": "Clip 20260214_133137",
"playbackType": "playStop",
"startTime": 3.7982832618025753,
"volume": 1
}
]
{
"Uncategorized": [
{
"endTime": 12.489270386266055,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133540.wav",
"name": "Clip 20260214_133540",
"playbackType": "playStop",
"startTime": 10.622317596566523,
"volume": 1
},
{
"endTime": 6.824034334763957,
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133137.wav",
"name": "Clip 20260214_133137",
"playbackType": "playStop",
"startTime": 3.7982832618025753,
"volume": 1
}
]
}

View File

@ -1,100 +1,114 @@
import os
import json
class MetaDataManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
# read metadata file from executing directory
self.metadata_file = os.path.join(os.getcwd(), "metadata.json")
if os.path.exists(self.metadata_file):
with open(self.metadata_file, "r") as f:
self.collections = json.load(f)
else:
self.collections = {}
if(collections := next((c for c in self.collections if c.get("name") == "Uncategorized"), None)) is None:
self.collections.append({"name": "Uncategorized", "id": 0, "clips": []})
self.save_metadata()
def create_collection(self, name):
if any(c.get("name") == name for c in self.collections):
raise ValueError(f"Collection '{name}' already exists.")
new_id = max((c.get("id", 0) for c in self.collections), default=0) + 1
self.collections.append({"name": name, "id": new_id, "clips": []})
self.save_metadata()
def delete_collection(self, name):
collection = next((c for c in self.collections if c.get("name") == name), None)
if collection is None:
raise ValueError(f"Collection '{name}' does not exist.")
self.collections.remove(collection)
self.save_metadata()
def add_clip_to_collection(self, collection_name, clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
collection["clips"].append(clip_metadata)
self.save_metadata()
def remove_clip_from_collection(self, collection_name, clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
# Remove all clips with the same file name as clip_metadata["file_name"]
in_list = any(clip.get("filename") == clip_metadata.get("filename") for clip in collection["clips"])
if not in_list:
raise ValueError(f"Clip with filename '{clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
collection["clips"] = [
clip for clip in collection["clips"]
if clip.get("filename") != clip_metadata.get("filename")
]
self.save_metadata()
def move_clip_to_collection(self, source_collection, target_collection, clip_metadata):
self.remove_clip_from_collection(source_collection, clip_metadata)
self.add_clip_to_collection(target_collection, clip_metadata)
def edit_clip_in_collection(self, collection_name, new_clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
# Find the index of the clip with the same file name as old_clip_metadata["file_name"]
index = next((i for i, clip in enumerate(collection["clips"]) if clip.get("filename") == new_clip_metadata.get("filename")), None)
if index is None:
raise ValueError(f"Clip with filename '{new_clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
collection["clips"][index] = new_clip_metadata
self.save_metadata()
def get_collections(self):
return list(map(lambda c: {"name": c.get("name"), "id": c.get("id")}, self.collections))
def get_clips_in_collection(self, collection_name):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
return collection["clips"]
def reorder_clips_in_collection(self, collection_name, new_order):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
existing_filenames = {clip.get("filename") for clip in collection["clips"]}
new_filenames = {clip.get("filename") for clip in new_order}
if not new_filenames.issubset(existing_filenames):
raise ValueError("New order contains clips that do not exist in the collection.")
collection["clips"] = new_order
self.save_metadata()
def save_metadata(self):
with open(self.metadata_file, "w") as f:
json.dump(self.collections, f, indent=4)
import os
import json
class MetaDataManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
self.socket = None
# read metadata file from executing directory
self.metadata_file = os.path.join(os.getcwd(), "metadata.json")
if os.path.exists(self.metadata_file):
with open(self.metadata_file, "r") as f:
self.collections = json.load(f)
else:
self.collections = {}
if(collections := next((c for c in self.collections if c.get("name") == "Uncategorized"), None)) is None:
self.collections.append({"name": "Uncategorized", "id": 0, "clips": []})
self.save_metadata()
def create_collection(self, name):
if any(c.get("name") == name for c in self.collections):
raise ValueError(f"Collection '{name}' already exists.")
new_id = max((c.get("id", 0) for c in self.collections), default=0) + 1
self.collections.append({"name": name, "id": new_id, "clips": []})
self.save_metadata()
def delete_collection(self, name):
collection = next((c for c in self.collections if c.get("name") == name), None)
if collection is None:
raise ValueError(f"Collection '{name}' does not exist.")
self.collections.remove(collection)
self.save_metadata()
def add_clip_to_collection(self, collection_name, clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
collection["clips"].append(clip_metadata)
if not self.socket is None:
self.socket.emit('collection_updated', collection)
self.save_metadata()
def remove_clip_from_collection(self, collection_name, clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
# Remove all clips with the same file name as clip_metadata["file_name"]
in_list = any(clip.get("filename") == clip_metadata.get("filename") for clip in collection["clips"])
if not in_list:
raise ValueError(f"Clip with filename '{clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
collection["clips"] = [
clip for clip in collection["clips"]
if clip.get("filename") != clip_metadata.get("filename")
]
if not self.socket is None:
self.socket.emit('collection_updated', collection)
self.save_metadata()
def move_clip_to_collection(self, source_collection, target_collection, clip_metadata):
self.remove_clip_from_collection(source_collection, clip_metadata)
self.add_clip_to_collection(target_collection, clip_metadata)
if not self.socket is None:
self.socket.emit('collection_updated', source_collection)
self.socket.emit('collection_updated', target_collection)
def edit_clip_in_collection(self, collection_name, new_clip_metadata):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
# Find the index of the clip with the same file name as old_clip_metadata["file_name"]
index = next((i for i, clip in enumerate(collection["clips"]) if clip.get("filename") == new_clip_metadata.get("filename")), None)
if index is None:
raise ValueError(f"Clip with filename '{new_clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
collection["clips"][index] = new_clip_metadata
if not self.socket is None:
self.socket.emit('collection_updated', collection)
self.save_metadata()
def get_collections(self):
return list(map(lambda c: {"name": c.get("name"), "id": c.get("id")}, self.collections))
def get_clips_in_collection(self, collection_name):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
return collection["clips"]
def reorder_clips_in_collection(self, collection_name, new_order):
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
if collection is None:
raise ValueError(f"Collection '{collection_name}' does not exist.")
existing_filenames = {clip.get("filename") for clip in collection["clips"]}
new_filenames = {clip.get("filename") for clip in new_order}
if not new_filenames.issubset(existing_filenames):
raise ValueError("New order contains clips that do not exist in the collection.")
collection["clips"] = new_order
if not self.socket is None:
self.socket.emit('collection_updated', collection)
self.save_metadata()
def save_metadata(self):
with open(self.metadata_file, "w") as f:
json.dump(self.collections, f, indent=4)

View File

@ -1,4 +1,4 @@
[ViewState]
Mode=
Vid=
FolderType=Generic
[ViewState]
Mode=
Vid=
FolderType=Generic

View File

@ -98,3 +98,9 @@ def edit_clip_in_collection():
return jsonify({'status': 'success', 'collections': collections})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@metadata_bp.route('/ws/test', methods=['POST'])
def test_websocket():
MetaDataManager().socket.emit('test_event', {'data': 'Test message from metadata route'})
return jsonify({'status': 'success'})

View File

@ -1,57 +1,57 @@
from flask import Blueprint, request, jsonify
from audio_io import AudioIO
import os
recording_bp = Blueprint('recording', __name__)
@recording_bp.route('/record/start', methods=['POST'])
def start_recording():
recorder = AudioIO()
print('HTTP: Starting audio recording')
recorder.start_recording()
return jsonify({'status': 'recording started'})
@recording_bp.route('/record/stop', methods=['POST'])
def stop_recording():
recorder = AudioIO()
# print('HTTP: Stopping audio recording')
recorder.stop_recording()
return jsonify({'status': 'recording stopped'})
@recording_bp.route('/record/save', methods=['POST'])
def save_recording():
recorder = AudioIO()
# print('HTTP: Saving audio recording')
saved_file = recorder.save_last_n_seconds()
return jsonify({'status': 'recording saved', 'file': saved_file})
@recording_bp.route('/record/status', methods=['GET'])
def recording_status():
recorder = AudioIO()
# print('HTTP: Checking recording status')
status = 'recording' if recorder.is_recording() else 'stopped'
return jsonify({'status': status})
@recording_bp.route('/record/delete', methods=['POST'])
def recording_delete():
filename = request.json.get('filename')
try:
os.remove(filename)
return jsonify({'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@recording_bp.route('/playback/start', methods=['POST'])
def playback_start():
print(f"Playing clip")
# print('HTTP: Starting audio playback')
clip = request.json
try:
io = AudioIO()
io.play_clip(clip)
# os.remove(filename)
return jsonify({'status': 'success'})
except Exception as e:
from flask import Blueprint, request, jsonify
from audio_io import AudioIO
import os
recording_bp = Blueprint('recording', __name__)
@recording_bp.route('/record/start', methods=['POST'])
def start_recording():
recorder = AudioIO()
print('HTTP: Starting audio recording')
recorder.start_recording()
return jsonify({'status': 'recording started'})
@recording_bp.route('/record/stop', methods=['POST'])
def stop_recording():
recorder = AudioIO()
# print('HTTP: Stopping audio recording')
recorder.stop_recording()
return jsonify({'status': 'recording stopped'})
@recording_bp.route('/record/save', methods=['POST'])
def save_recording():
recorder = AudioIO()
# print('HTTP: Saving audio recording')
saved_file = recorder.save_last_n_seconds()
return jsonify({'status': 'recording saved', 'file': saved_file})
@recording_bp.route('/record/status', methods=['GET'])
def recording_status():
recorder = AudioIO()
# print('HTTP: Checking recording status')
status = 'recording' if recorder.is_recording() else 'stopped'
return jsonify({'status': status})
@recording_bp.route('/record/delete', methods=['POST'])
def recording_delete():
filename = request.json.get('filename')
try:
os.remove(filename)
return jsonify({'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@recording_bp.route('/playback/start', methods=['POST'])
def playback_start():
print(f"Playing clip")
# print('HTTP: Starting audio playback')
clip = request.json
try:
io = AudioIO()
io.play_clip(clip)
# os.remove(filename)
return jsonify({'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400

View File

@ -1,31 +1,32 @@
from flask import Blueprint, request, jsonify
from settings import SettingsManager
settings_bp = Blueprint('settings', __name__)
@settings_bp.route('/settings', methods=['GET'])
def get_all_settings():
return jsonify({'status': 'success', 'settings': SettingsManager().get_all_settings()})
@settings_bp.route('/settings/<name>', methods=['GET'])
def get_setting(name):
value = SettingsManager().get_settings(name)
if value is not None:
return jsonify({'status': 'success', 'name': name, 'value': value})
else:
return jsonify({'status': 'error', 'message': f'Setting "{name}" not found'}), 404
@settings_bp.route('/settings/update', methods=['POST'])
def set_all_settings():
settings = request.json.get('settings')
print (f"Received settings update: {settings}")
if settings is None:
return jsonify({'status': 'error', 'message': 'Settings are required'}), 400
try:
for name, value in settings.items():
print(f"Updating setting '{name}' to '{value}'")
SettingsManager().set_settings(name, value)
return jsonify({'status': 'success', 'settings': settings})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
from flask import Blueprint, request, jsonify
from settings import SettingsManager
settings_bp = Blueprint('settings', __name__)
@settings_bp.route('/settings', methods=['GET'])
def get_all_settings():
return jsonify({'status': 'success', 'settings': SettingsManager().get_all_settings()})
@settings_bp.route('/settings/<name>', methods=['GET'])
def get_setting(name):
value = SettingsManager().get_settings(name)
if value is not None:
return jsonify({'status': 'success', 'name': name, 'value': value})
else:
return jsonify({'status': 'error', 'message': f'Setting "{name}" not found'}), 404
@settings_bp.route('/settings/update', methods=['POST'])
def set_all_settings():
settings = request.json.get('settings')
print (f"Received settings update: {settings}")
if settings is None:
return jsonify({'status': 'error', 'message': 'Settings are required'}), 400
try:
for name, value in settings.items():
print(f"Updating setting '{name}' to '{value}'")
SettingsManager().set_settings(name, value)
return jsonify({'status': 'success', 'settings': settings})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400

View File

@ -1,10 +1,10 @@
{
"input_device": {
"index": 0,
"name": "Microsoft Sound Mapper - Input",
"max_input_channels": 2,
"default_samplerate": 44100.0
},
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings",
"recording_length": 15
{
"input_device": {
"index": 0,
"name": "Microsoft Sound Mapper - Input",
"max_input_channels": 2,
"default_samplerate": 44100.0
},
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings",
"recording_length": 15
}

View File

@ -1,112 +1,108 @@
import sounddevice as sd
import numpy as np
import comtypes
import comtypes.client
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import json
class WindowsAudioManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
"""
Initialize Windows audio device and volume management.
"""
host_apis = sd.query_hostapis()
wasapi_device_indexes = None
for api in host_apis:
if api['name'].lower() == 'Windows WASAPI'.lower():
wasapi_device_indexes = api['devices']
break
# print(f"Host APIs: {host_apis}")
# print(f"WASAPI Device Indexes: {wasapi_device_indexes}")
wasapi_device_indexes = set(wasapi_device_indexes) if wasapi_device_indexes is not None else set()
self.devices = [dev for dev in sd.query_devices() if dev['index'] in wasapi_device_indexes]
# self.devices = sd.query_devices()
# print(f"devices: {self.devices}")
self.default_input = sd.default.device[0]
self.default_output = sd.default.device[1]
def list_audio_devices(self, kind='input'):
"""
List available audio devices.
:param kind: 'input' or 'output'
:return: List of audio devices
"""
if kind == 'input':
return [
{
'index': dev['index'],
'name': dev['name'],
'channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_input_channels'] > 0
]
elif kind == 'output':
return [
{
'index': dev['index'],
'name': dev['name'],
'channels': dev['max_output_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_output_channels'] > 0
]
def get_default_device(self, kind='input'):
"""
Get the default audio device.
:param kind: 'input' or 'output'
:return: Default audio device information
"""
if kind == 'input':
dev = self.devices[self.default_input]
return [
{
'index': dev['index'],
'name': dev['name'],
'max_input_channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
]
def set_default_input_device(self, device_index):
if(device_index is None):
return 0
"""
Set the default input audio device.
:param device_index: Index of the audio device
:return: Sample rate of the selected device
"""
sd.default.device[0] = device_index
self.default_input = device_index
# Get the sample rate of the selected device
device_info = sd.query_devices(device_index)
return device_info['default_samplerate']
def set_default_output_device(self, device_index):
if(device_index is None):
return self.get_current_output_device_sample_rate()
"""
Set the default output audio device.
:param device_index: Index of the audio device
:return: Sample rate of the selected device
"""
sd.default.device[1] = device_index
self.default_output = device_index
# Get the sample rate of the selected device
device_info = sd.query_devices(device_index)
import sounddevice as sd
import numpy as np
import json
class WindowsAudioManager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.init()
return cls._instance
def init(self):
"""
Initialize Windows audio device and volume management.
"""
host_apis = sd.query_hostapis()
wasapi_device_indexes = None
for api in host_apis:
if api['name'].lower() == 'Windows WASAPI'.lower():
wasapi_device_indexes = api['devices']
break
# print(f"Host APIs: {host_apis}")
# print(f"WASAPI Device Indexes: {wasapi_device_indexes}")
wasapi_device_indexes = set(wasapi_device_indexes) if wasapi_device_indexes is not None else set()
self.devices = [dev for dev in sd.query_devices() if dev['index'] in wasapi_device_indexes]
# self.devices = sd.query_devices()
# print(f"devices: {self.devices}")
self.default_input = sd.default.device[0]
self.default_output = sd.default.device[1]
def list_audio_devices(self, kind='input'):
"""
List available audio devices.
:param kind: 'input' or 'output'
:return: List of audio devices
"""
if kind == 'input':
return [
{
'index': dev['index'],
'name': dev['name'],
'channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_input_channels'] > 0
]
elif kind == 'output':
return [
{
'index': dev['index'],
'name': dev['name'],
'channels': dev['max_output_channels'],
'default_samplerate': dev['default_samplerate']
}
for dev in self.devices if dev['max_output_channels'] > 0
]
def get_default_device(self, kind='input'):
"""
Get the default audio device.
:param kind: 'input' or 'output'
:return: Default audio device information
"""
if kind == 'input':
dev = self.devices[self.default_input]
return [
{
'index': dev['index'],
'name': dev['name'],
'max_input_channels': dev['max_input_channels'],
'default_samplerate': dev['default_samplerate']
}
]
def set_default_input_device(self, device_index):
if(device_index is None):
return 0
"""
Set the default input audio device.
:param device_index: Index of the audio device
:return: Sample rate of the selected device
"""
sd.default.device[0] = device_index
self.default_input = device_index
# Get the sample rate of the selected device
device_info = sd.query_devices(device_index)
return device_info['default_samplerate']
def set_default_output_device(self, device_index):
if(device_index is None):
return self.get_current_output_device_sample_rate()
"""
Set the default output audio device.
:param device_index: Index of the audio device
:return: Sample rate of the selected device
"""
sd.default.device[1] = device_index
self.default_output = device_index
# Get the sample rate of the selected device
device_info = sd.query_devices(device_index)
return device_info['default_samplerate']