remove client test app
This commit is contained in:
@ -1,34 +1,34 @@
|
||||
[
|
||||
{
|
||||
"name": "Uncategorized",
|
||||
"id": 0,
|
||||
"clips": []
|
||||
},
|
||||
{
|
||||
"name": "Test",
|
||||
"id": 1,
|
||||
"clips": []
|
||||
},
|
||||
{
|
||||
"name": "New",
|
||||
"id": 2,
|
||||
"clips": [
|
||||
{
|
||||
"endTime": 30,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_193822.wav",
|
||||
"name": "Pee pee\npoo poo",
|
||||
"playbackType": "playOverlap",
|
||||
"startTime": 27.76674010920584,
|
||||
"volume": 0.25
|
||||
},
|
||||
{
|
||||
"endTime": 27.516843118383072,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_200442.wav",
|
||||
"name": "Clip 20260220_200442",
|
||||
"playbackType": "playOverlap",
|
||||
"startTime": 25.120307988450435,
|
||||
"volume": 0.64
|
||||
}
|
||||
]
|
||||
}
|
||||
[
|
||||
{
|
||||
"name": "Uncategorized",
|
||||
"id": 0,
|
||||
"clips": []
|
||||
},
|
||||
{
|
||||
"name": "Test",
|
||||
"id": 1,
|
||||
"clips": []
|
||||
},
|
||||
{
|
||||
"name": "New",
|
||||
"id": 2,
|
||||
"clips": [
|
||||
{
|
||||
"endTime": 30,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_193822.wav",
|
||||
"name": "Pee pee\npoo poo",
|
||||
"playbackType": "playOverlap",
|
||||
"startTime": 27.76674010920584,
|
||||
"volume": 0.25
|
||||
},
|
||||
{
|
||||
"endTime": 27.516843118383072,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings\\audio_capture_20260220_200442.wav",
|
||||
"name": "Clip 20260220_200442",
|
||||
"playbackType": "playOverlap",
|
||||
"startTime": 25.120307988450435,
|
||||
"volume": 0.64
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
@ -1,7 +1,7 @@
|
||||
sounddevice==0.5.1
|
||||
numpy==1.22.3
|
||||
python-osc==1.9.3
|
||||
scipy==1.10.1
|
||||
comtypes==1.4.8
|
||||
pycaw==20240210
|
||||
sounddevice==0.5.1
|
||||
numpy==1.22.3
|
||||
python-osc==1.9.3
|
||||
scipy==1.10.1
|
||||
comtypes==1.4.8
|
||||
pycaw==20240210
|
||||
Flask==3.1.2
|
||||
@ -1,17 +1,17 @@
|
||||
{
|
||||
"input_device": {
|
||||
"channels": 2,
|
||||
"default_samplerate": 48000,
|
||||
"index": 55,
|
||||
"name": "VM Mic mix (VB-Audio Voicemeeter VAIO)"
|
||||
},
|
||||
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings",
|
||||
"recording_length": 30,
|
||||
"output_device": {
|
||||
"channels": 2,
|
||||
"default_samplerate": 48000,
|
||||
"index": 44,
|
||||
"name": "VM to Headset (VB-Audio Voicemeeter VAIO)"
|
||||
},
|
||||
"http_port": 5010
|
||||
{
|
||||
"input_device": {
|
||||
"channels": 2,
|
||||
"default_samplerate": 48000,
|
||||
"index": 55,
|
||||
"name": "VM Mic mix (VB-Audio Voicemeeter VAIO)"
|
||||
},
|
||||
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\recordings",
|
||||
"recording_length": 30,
|
||||
"output_device": {
|
||||
"channels": 2,
|
||||
"default_samplerate": 48000,
|
||||
"index": 44,
|
||||
"name": "VM to Headset (VB-Audio Voicemeeter VAIO)"
|
||||
},
|
||||
"http_port": 5010
|
||||
}
|
||||
@ -1,168 +1,168 @@
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import os
|
||||
from datetime import datetime
|
||||
import scipy.io.wavfile as wavfile
|
||||
from metadata_manager import MetaDataManager
|
||||
from audio_clip import AudioClip
|
||||
|
||||
|
||||
# AudioClip class for clip playback
|
||||
|
||||
|
||||
class AudioIO:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
# print("Creating new AudioRecorder instance")
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
self.duration = 30
|
||||
self.channels = 2
|
||||
self.input_sample_rate = 44100
|
||||
self.output_sample_rate = 44100
|
||||
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
self.recordings_dir = "recordings"
|
||||
|
||||
sd.default.latency = 'low'
|
||||
|
||||
self.socket = None
|
||||
|
||||
self.in_stream = sd.InputStream(
|
||||
callback=self.record_callback
|
||||
)
|
||||
|
||||
self.out_stream = sd.OutputStream(
|
||||
callback=self.playback_callback,
|
||||
latency=3
|
||||
)
|
||||
|
||||
self.clip_map = {}
|
||||
|
||||
|
||||
def refresh_streams(self):
|
||||
was_active = self.in_stream.active
|
||||
if was_active:
|
||||
self.in_stream.stop()
|
||||
self.out_stream.stop()
|
||||
|
||||
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
# print(f"AudioRecorder initialized with duration={self.duration}s, sample_rate={self.sample_rate}Hz, channels={self.channels}")
|
||||
self.in_stream = sd.InputStream(
|
||||
callback=self.record_callback
|
||||
)
|
||||
|
||||
self.out_stream = sd.OutputStream(
|
||||
callback=self.playback_callback
|
||||
)
|
||||
|
||||
if was_active:
|
||||
self.in_stream.start()
|
||||
self.out_stream.start()
|
||||
|
||||
|
||||
|
||||
def record_callback(self, indata, frames, time, status):
|
||||
if status:
|
||||
# print(f"Recording status: {status}")
|
||||
pass
|
||||
|
||||
# Circular buffer implementation
|
||||
self.buffer = np.roll(self.buffer, -frames, axis=0)
|
||||
self.buffer[-frames:] = indata
|
||||
|
||||
def playback_callback(self, outdata, frames, time, status):
|
||||
if status:
|
||||
# print(f"Playback status: {status}")
|
||||
pass
|
||||
|
||||
outdata.fill(0)
|
||||
|
||||
# Iterate over a copy of the items to avoid modifying the dictionary during iteration
|
||||
for clip_id, clip_list in list(self.clip_map.items()):
|
||||
for clip in clip_list[:]: # Iterate over a copy of the list
|
||||
if not clip.is_finished():
|
||||
samples = clip.get_samples(frames)
|
||||
outdata[:] += samples.reshape(-1, 1) # Mix into output
|
||||
if clip.is_finished():
|
||||
self.clip_map[clip_id].remove(clip)
|
||||
if len(self.clip_map[clip_id]) == 0:
|
||||
del self.clip_map[clip_id]
|
||||
break # Exit inner loop since the key is deleted
|
||||
|
||||
|
||||
def save_last_n_seconds(self):
|
||||
# Create output directory if it doesn't exist
|
||||
os.makedirs(self.recordings_dir, exist_ok=True)
|
||||
|
||||
# Generate filename with timestamp
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = os.path.join(self.recordings_dir, f"audio_capture_{timestamp}.wav")
|
||||
|
||||
# Normalize audio to prevent clipping
|
||||
audio_data = self.buffer / np.max(np.abs(self.buffer)) * .5
|
||||
|
||||
# Convert float32 to int16 for WAV file
|
||||
audio_data_int16 = (audio_data * 32767).astype(np.int16)
|
||||
|
||||
# Write buffer to file
|
||||
wavfile.write(filename, int(self.input_sample_rate), audio_data_int16)
|
||||
|
||||
meta = MetaDataManager()
|
||||
|
||||
clip_metadata = {
|
||||
"filename": filename,
|
||||
"name": f"Clip {timestamp}",
|
||||
"playbackType":"playStop",
|
||||
"volume": 1.0,
|
||||
}
|
||||
|
||||
meta.add_clip_to_collection("Uncategorized", clip_metadata )
|
||||
|
||||
|
||||
return clip_metadata
|
||||
|
||||
def set_buffer_duration(self, duration):
|
||||
self.duration = duration
|
||||
self.buffer = np.zeros((int(duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
|
||||
def set_recording_directory(self, directory):
|
||||
self.recordings_dir = directory
|
||||
|
||||
def start_recording(self):
|
||||
if(self.in_stream.active):
|
||||
# print("Already recording")
|
||||
return
|
||||
# print('number of channels', self.channels)
|
||||
|
||||
self.in_stream.start()
|
||||
self.out_stream.start()
|
||||
self.output_sample_rate = self.out_stream.samplerate
|
||||
self.input_sample_rate = self.in_stream.samplerate
|
||||
|
||||
def stop_recording(self):
|
||||
if(not self.in_stream.active):
|
||||
# print("Already stopped")
|
||||
return
|
||||
|
||||
self.in_stream.stop()
|
||||
self.out_stream.stop()
|
||||
|
||||
def is_recording(self):
|
||||
return self.in_stream.active
|
||||
|
||||
def play_clip(self, clip_metadata):
|
||||
print(f"Playing clip: {clip_metadata}")
|
||||
clip_id = clip_metadata.get("filename")
|
||||
if clip_metadata.get("playbackType") == "playStop":
|
||||
if clip_id in self.clip_map:
|
||||
del self.clip_map[clip_id]
|
||||
return
|
||||
else:
|
||||
self.clip_map[clip_id] = []
|
||||
if clip_id not in self.clip_map:
|
||||
self.clip_map[clip_id] = []
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import os
|
||||
from datetime import datetime
|
||||
import scipy.io.wavfile as wavfile
|
||||
from metadata_manager import MetaDataManager
|
||||
from audio_clip import AudioClip
|
||||
|
||||
|
||||
# AudioClip class for clip playback
|
||||
|
||||
|
||||
class AudioIO:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
# print("Creating new AudioRecorder instance")
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
self.duration = 30
|
||||
self.channels = 2
|
||||
self.input_sample_rate = 44100
|
||||
self.output_sample_rate = 44100
|
||||
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
self.recordings_dir = "recordings"
|
||||
|
||||
sd.default.latency = 'low'
|
||||
|
||||
self.socket = None
|
||||
|
||||
self.in_stream = sd.InputStream(
|
||||
callback=self.record_callback
|
||||
)
|
||||
|
||||
self.out_stream = sd.OutputStream(
|
||||
callback=self.playback_callback,
|
||||
latency=3
|
||||
)
|
||||
|
||||
self.clip_map = {}
|
||||
|
||||
|
||||
def refresh_streams(self):
|
||||
was_active = self.in_stream.active
|
||||
if was_active:
|
||||
self.in_stream.stop()
|
||||
self.out_stream.stop()
|
||||
|
||||
self.buffer = np.zeros((int(self.duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
# print(f"AudioRecorder initialized with duration={self.duration}s, sample_rate={self.sample_rate}Hz, channels={self.channels}")
|
||||
self.in_stream = sd.InputStream(
|
||||
callback=self.record_callback
|
||||
)
|
||||
|
||||
self.out_stream = sd.OutputStream(
|
||||
callback=self.playback_callback
|
||||
)
|
||||
|
||||
if was_active:
|
||||
self.in_stream.start()
|
||||
self.out_stream.start()
|
||||
|
||||
|
||||
|
||||
def record_callback(self, indata, frames, time, status):
|
||||
if status:
|
||||
# print(f"Recording status: {status}")
|
||||
pass
|
||||
|
||||
# Circular buffer implementation
|
||||
self.buffer = np.roll(self.buffer, -frames, axis=0)
|
||||
self.buffer[-frames:] = indata
|
||||
|
||||
def playback_callback(self, outdata, frames, time, status):
|
||||
if status:
|
||||
# print(f"Playback status: {status}")
|
||||
pass
|
||||
|
||||
outdata.fill(0)
|
||||
|
||||
# Iterate over a copy of the items to avoid modifying the dictionary during iteration
|
||||
for clip_id, clip_list in list(self.clip_map.items()):
|
||||
for clip in clip_list[:]: # Iterate over a copy of the list
|
||||
if not clip.is_finished():
|
||||
samples = clip.get_samples(frames)
|
||||
outdata[:] += samples.reshape(-1, 1) # Mix into output
|
||||
if clip.is_finished():
|
||||
self.clip_map[clip_id].remove(clip)
|
||||
if len(self.clip_map[clip_id]) == 0:
|
||||
del self.clip_map[clip_id]
|
||||
break # Exit inner loop since the key is deleted
|
||||
|
||||
|
||||
def save_last_n_seconds(self):
|
||||
# Create output directory if it doesn't exist
|
||||
os.makedirs(self.recordings_dir, exist_ok=True)
|
||||
|
||||
# Generate filename with timestamp
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = os.path.join(self.recordings_dir, f"audio_capture_{timestamp}.wav")
|
||||
|
||||
# Normalize audio to prevent clipping
|
||||
audio_data = self.buffer / np.max(np.abs(self.buffer)) * .5
|
||||
|
||||
# Convert float32 to int16 for WAV file
|
||||
audio_data_int16 = (audio_data * 32767).astype(np.int16)
|
||||
|
||||
# Write buffer to file
|
||||
wavfile.write(filename, int(self.input_sample_rate), audio_data_int16)
|
||||
|
||||
meta = MetaDataManager()
|
||||
|
||||
clip_metadata = {
|
||||
"filename": filename,
|
||||
"name": f"Clip {timestamp}",
|
||||
"playbackType":"playStop",
|
||||
"volume": 1.0,
|
||||
}
|
||||
|
||||
meta.add_clip_to_collection("Uncategorized", clip_metadata )
|
||||
|
||||
|
||||
return clip_metadata
|
||||
|
||||
def set_buffer_duration(self, duration):
|
||||
self.duration = duration
|
||||
self.buffer = np.zeros((int(duration * self.input_sample_rate), self.channels), dtype=np.float32)
|
||||
|
||||
def set_recording_directory(self, directory):
|
||||
self.recordings_dir = directory
|
||||
|
||||
def start_recording(self):
|
||||
if(self.in_stream.active):
|
||||
# print("Already recording")
|
||||
return
|
||||
# print('number of channels', self.channels)
|
||||
|
||||
self.in_stream.start()
|
||||
self.out_stream.start()
|
||||
self.output_sample_rate = self.out_stream.samplerate
|
||||
self.input_sample_rate = self.in_stream.samplerate
|
||||
|
||||
def stop_recording(self):
|
||||
if(not self.in_stream.active):
|
||||
# print("Already stopped")
|
||||
return
|
||||
|
||||
self.in_stream.stop()
|
||||
self.out_stream.stop()
|
||||
|
||||
def is_recording(self):
|
||||
return self.in_stream.active
|
||||
|
||||
def play_clip(self, clip_metadata):
|
||||
print(f"Playing clip: {clip_metadata}")
|
||||
clip_id = clip_metadata.get("filename")
|
||||
if clip_metadata.get("playbackType") == "playStop":
|
||||
if clip_id in self.clip_map:
|
||||
del self.clip_map[clip_id]
|
||||
return
|
||||
else:
|
||||
self.clip_map[clip_id] = []
|
||||
if clip_id not in self.clip_map:
|
||||
self.clip_map[clip_id] = []
|
||||
self.clip_map[clip_id].append(AudioClip(clip_metadata, target_sample_rate=self.output_sample_rate))
|
||||
@ -1,85 +1,85 @@
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from audio_io import AudioIO
|
||||
from windows_audio import WindowsAudioManager
|
||||
import sounddevice as sd
|
||||
from metadata_manager import MetaDataManager
|
||||
from settings import SettingsManager
|
||||
|
||||
from flask import Flask
|
||||
from flask_cors import CORS
|
||||
|
||||
from routes.recording import recording_bp
|
||||
from routes.device import device_bp
|
||||
from routes.metadata import metadata_bp
|
||||
from routes.settings import settings_bp
|
||||
from flask_socketio import SocketIO, emit
|
||||
import threading
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||||
|
||||
@socketio.on('connect')
|
||||
def handle_connect():
|
||||
print("Client connected")
|
||||
emit('full_data', MetaDataManager().collections)
|
||||
|
||||
@socketio.on('record_clip')
|
||||
def record_clip(data):
|
||||
io = AudioIO()
|
||||
io.save_last_n_seconds();
|
||||
|
||||
def main():
|
||||
# Create argument parser
|
||||
parser = argparse.ArgumentParser(description='Audio Recording Service')
|
||||
|
||||
# OSC port argument
|
||||
parser.add_argument('--osc-port',
|
||||
type=int,
|
||||
help='OSC server port number',
|
||||
default=5010)
|
||||
|
||||
# Parse arguments
|
||||
args = parser.parse_args()
|
||||
audio_manager = WindowsAudioManager()
|
||||
settings = SettingsManager()
|
||||
meta = MetaDataManager()
|
||||
|
||||
|
||||
|
||||
# Ensure save path exists
|
||||
os.makedirs(settings.get_settings('save_path'), exist_ok=True)
|
||||
|
||||
|
||||
io = AudioIO()
|
||||
io.start_recording()
|
||||
|
||||
# settings.socket = socketio
|
||||
io.socket = socketio
|
||||
meta.socket = socketio
|
||||
|
||||
# Register blueprints
|
||||
app.register_blueprint(recording_bp)
|
||||
app.register_blueprint(device_bp)
|
||||
app.register_blueprint(metadata_bp)
|
||||
app.register_blueprint(settings_bp)
|
||||
# app.run(host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
|
||||
socketio.run(app, host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
|
||||
|
||||
|
||||
|
||||
# Run the OSC server
|
||||
# try:
|
||||
# print(f"Starting OSC Recording Server on port {args.osc_port}")
|
||||
# # osc_server.run_server()
|
||||
# except KeyboardInterrupt:
|
||||
# print("\nServer stopped by user.")
|
||||
# except Exception as e:
|
||||
# print(f"Error starting server: {e}")
|
||||
# sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from audio_io import AudioIO
|
||||
from windows_audio import WindowsAudioManager
|
||||
import sounddevice as sd
|
||||
from metadata_manager import MetaDataManager
|
||||
from settings import SettingsManager
|
||||
|
||||
from flask import Flask
|
||||
from flask_cors import CORS
|
||||
|
||||
from routes.recording import recording_bp
|
||||
from routes.device import device_bp
|
||||
from routes.metadata import metadata_bp
|
||||
from routes.settings import settings_bp
|
||||
from flask_socketio import SocketIO, emit
|
||||
import threading
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||||
|
||||
@socketio.on('connect')
|
||||
def handle_connect():
|
||||
print("Client connected")
|
||||
emit('full_data', MetaDataManager().collections)
|
||||
|
||||
@socketio.on('record_clip')
|
||||
def record_clip(data):
|
||||
io = AudioIO()
|
||||
io.save_last_n_seconds();
|
||||
|
||||
def main():
|
||||
# Create argument parser
|
||||
parser = argparse.ArgumentParser(description='Audio Recording Service')
|
||||
|
||||
# OSC port argument
|
||||
parser.add_argument('--osc-port',
|
||||
type=int,
|
||||
help='OSC server port number',
|
||||
default=5010)
|
||||
|
||||
# Parse arguments
|
||||
args = parser.parse_args()
|
||||
audio_manager = WindowsAudioManager()
|
||||
settings = SettingsManager()
|
||||
meta = MetaDataManager()
|
||||
|
||||
|
||||
|
||||
# Ensure save path exists
|
||||
os.makedirs(settings.get_settings('save_path'), exist_ok=True)
|
||||
|
||||
|
||||
io = AudioIO()
|
||||
io.start_recording()
|
||||
|
||||
# settings.socket = socketio
|
||||
io.socket = socketio
|
||||
meta.socket = socketio
|
||||
|
||||
# Register blueprints
|
||||
app.register_blueprint(recording_bp)
|
||||
app.register_blueprint(device_bp)
|
||||
app.register_blueprint(metadata_bp)
|
||||
app.register_blueprint(settings_bp)
|
||||
# app.run(host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
|
||||
socketio.run(app, host='127.0.0.1', port=settings.get_settings('http_port'), debug=False, use_reloader=True)
|
||||
|
||||
|
||||
|
||||
# Run the OSC server
|
||||
# try:
|
||||
# print(f"Starting OSC Recording Server on port {args.osc_port}")
|
||||
# # osc_server.run_server()
|
||||
# except KeyboardInterrupt:
|
||||
# print("\nServer stopped by user.")
|
||||
# except Exception as e:
|
||||
# print(f"Error starting server: {e}")
|
||||
# sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1,20 +1,20 @@
|
||||
{
|
||||
"Uncategorized": [
|
||||
{
|
||||
"endTime": 12.489270386266055,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133540.wav",
|
||||
"name": "Clip 20260214_133540",
|
||||
"playbackType": "playStop",
|
||||
"startTime": 10.622317596566523,
|
||||
"volume": 1
|
||||
},
|
||||
{
|
||||
"endTime": 6.824034334763957,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133137.wav",
|
||||
"name": "Clip 20260214_133137",
|
||||
"playbackType": "playStop",
|
||||
"startTime": 3.7982832618025753,
|
||||
"volume": 1
|
||||
}
|
||||
]
|
||||
{
|
||||
"Uncategorized": [
|
||||
{
|
||||
"endTime": 12.489270386266055,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133540.wav",
|
||||
"name": "Clip 20260214_133540",
|
||||
"playbackType": "playStop",
|
||||
"startTime": 10.622317596566523,
|
||||
"volume": 1
|
||||
},
|
||||
{
|
||||
"endTime": 6.824034334763957,
|
||||
"filename": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings\\audio_capture_20260214_133137.wav",
|
||||
"name": "Clip 20260214_133137",
|
||||
"playbackType": "playStop",
|
||||
"startTime": 3.7982832618025753,
|
||||
"volume": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -1,103 +1,103 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
class MetaDataManager:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
self.socket = None
|
||||
# read metadata file from executing directory
|
||||
self.metadata_file = os.path.join(os.getcwd(), "metadata.json")
|
||||
if os.path.exists(self.metadata_file):
|
||||
with open(self.metadata_file, "r") as f:
|
||||
self.collections = json.load(f)
|
||||
else:
|
||||
self.collections = {}
|
||||
if(collections := next((c for c in self.collections if c.get("name") == "Uncategorized"), None)) is None:
|
||||
self.collections.append({"name": "Uncategorized", "id": 0, "clips": []})
|
||||
self.save_metadata()
|
||||
|
||||
def create_collection(self, name):
|
||||
if any(c.get("name") == name for c in self.collections):
|
||||
raise ValueError(f"Collection '{name}' already exists.")
|
||||
new_id = max((c.get("id", 0) for c in self.collections), default=0) + 1
|
||||
self.collections.append({"name": name, "id": new_id, "clips": []})
|
||||
self.save_metadata()
|
||||
|
||||
def delete_collection(self, name):
|
||||
collection = next((c for c in self.collections if c.get("name") == name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{name}' does not exist.")
|
||||
self.collections.remove(collection)
|
||||
self.save_metadata()
|
||||
|
||||
def add_clip_to_collection(self, collection_name, clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
collection["clips"].append(clip_metadata)
|
||||
self.save_metadata()
|
||||
|
||||
def remove_clip_from_collection(self, collection_name, clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
# Remove all clips with the same file name as clip_metadata["file_name"]
|
||||
in_list = any(clip.get("filename") == clip_metadata.get("filename") for clip in collection["clips"])
|
||||
if not in_list:
|
||||
raise ValueError(f"Clip with filename '{clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
|
||||
|
||||
collection["clips"] = [
|
||||
clip for clip in collection["clips"]
|
||||
if clip.get("filename") != clip_metadata.get("filename")
|
||||
]
|
||||
self.save_metadata()
|
||||
|
||||
def move_clip_to_collection(self, source_collection, target_collection, clip_metadata):
|
||||
self.remove_clip_from_collection(source_collection, clip_metadata)
|
||||
self.add_clip_to_collection(target_collection, clip_metadata)
|
||||
|
||||
def edit_clip_in_collection(self, collection_name, new_clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
# Find the index of the clip with the same file name as old_clip_metadata["file_name"]
|
||||
index = next((i for i, clip in enumerate(collection["clips"]) if clip.get("filename") == new_clip_metadata.get("filename")), None)
|
||||
if index is None:
|
||||
raise ValueError(f"Clip with filename '{new_clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
|
||||
|
||||
collection["clips"][index] = new_clip_metadata
|
||||
self.save_metadata()
|
||||
|
||||
def get_collections(self):
|
||||
return list(map(lambda c: {"name": c.get("name"), "id": c.get("id")}, self.collections))
|
||||
|
||||
def get_clips_in_collection(self, collection_name):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
return collection["clips"]
|
||||
|
||||
def reorder_clips_in_collection(self, collection_name, new_order):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
existing_filenames = {clip.get("filename") for clip in collection["clips"]}
|
||||
new_filenames = {clip.get("filename") for clip in new_order}
|
||||
|
||||
if not new_filenames.issubset(existing_filenames):
|
||||
raise ValueError("New order contains clips that do not exist in the collection.")
|
||||
|
||||
collection["clips"] = new_order
|
||||
if not self.socket is None:
|
||||
self.socket.emit('collection_updated', collection)
|
||||
self.save_metadata()
|
||||
|
||||
def save_metadata(self):
|
||||
with open(self.metadata_file, "w") as f:
|
||||
json.dump(self.collections, f, indent=4)
|
||||
import os
|
||||
import json
|
||||
|
||||
class MetaDataManager:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
self.socket = None
|
||||
# read metadata file from executing directory
|
||||
self.metadata_file = os.path.join(os.getcwd(), "metadata.json")
|
||||
if os.path.exists(self.metadata_file):
|
||||
with open(self.metadata_file, "r") as f:
|
||||
self.collections = json.load(f)
|
||||
else:
|
||||
self.collections = {}
|
||||
if(collections := next((c for c in self.collections if c.get("name") == "Uncategorized"), None)) is None:
|
||||
self.collections.append({"name": "Uncategorized", "id": 0, "clips": []})
|
||||
self.save_metadata()
|
||||
|
||||
def create_collection(self, name):
|
||||
if any(c.get("name") == name for c in self.collections):
|
||||
raise ValueError(f"Collection '{name}' already exists.")
|
||||
new_id = max((c.get("id", 0) for c in self.collections), default=0) + 1
|
||||
self.collections.append({"name": name, "id": new_id, "clips": []})
|
||||
self.save_metadata()
|
||||
|
||||
def delete_collection(self, name):
|
||||
collection = next((c for c in self.collections if c.get("name") == name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{name}' does not exist.")
|
||||
self.collections.remove(collection)
|
||||
self.save_metadata()
|
||||
|
||||
def add_clip_to_collection(self, collection_name, clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
collection["clips"].append(clip_metadata)
|
||||
self.save_metadata()
|
||||
|
||||
def remove_clip_from_collection(self, collection_name, clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
# Remove all clips with the same file name as clip_metadata["file_name"]
|
||||
in_list = any(clip.get("filename") == clip_metadata.get("filename") for clip in collection["clips"])
|
||||
if not in_list:
|
||||
raise ValueError(f"Clip with filename '{clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
|
||||
|
||||
collection["clips"] = [
|
||||
clip for clip in collection["clips"]
|
||||
if clip.get("filename") != clip_metadata.get("filename")
|
||||
]
|
||||
self.save_metadata()
|
||||
|
||||
def move_clip_to_collection(self, source_collection, target_collection, clip_metadata):
|
||||
self.remove_clip_from_collection(source_collection, clip_metadata)
|
||||
self.add_clip_to_collection(target_collection, clip_metadata)
|
||||
|
||||
def edit_clip_in_collection(self, collection_name, new_clip_metadata):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
# Find the index of the clip with the same file name as old_clip_metadata["file_name"]
|
||||
index = next((i for i, clip in enumerate(collection["clips"]) if clip.get("filename") == new_clip_metadata.get("filename")), None)
|
||||
if index is None:
|
||||
raise ValueError(f"Clip with filename '{new_clip_metadata.get('filename')}' not found in collection '{collection_name}'.")
|
||||
|
||||
collection["clips"][index] = new_clip_metadata
|
||||
self.save_metadata()
|
||||
|
||||
def get_collections(self):
|
||||
return list(map(lambda c: {"name": c.get("name"), "id": c.get("id")}, self.collections))
|
||||
|
||||
def get_clips_in_collection(self, collection_name):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
return collection["clips"]
|
||||
|
||||
def reorder_clips_in_collection(self, collection_name, new_order):
|
||||
collection = next((c for c in self.collections if c.get("name") == collection_name), None)
|
||||
if collection is None:
|
||||
raise ValueError(f"Collection '{collection_name}' does not exist.")
|
||||
existing_filenames = {clip.get("filename") for clip in collection["clips"]}
|
||||
new_filenames = {clip.get("filename") for clip in new_order}
|
||||
|
||||
if not new_filenames.issubset(existing_filenames):
|
||||
raise ValueError("New order contains clips that do not exist in the collection.")
|
||||
|
||||
collection["clips"] = new_order
|
||||
if not self.socket is None:
|
||||
self.socket.emit('collection_updated', collection)
|
||||
self.save_metadata()
|
||||
|
||||
def save_metadata(self):
|
||||
with open(self.metadata_file, "w") as f:
|
||||
json.dump(self.collections, f, indent=4)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
[ViewState]
|
||||
Mode=
|
||||
Vid=
|
||||
FolderType=Generic
|
||||
[ViewState]
|
||||
Mode=
|
||||
Vid=
|
||||
FolderType=Generic
|
||||
|
||||
@ -1,57 +1,57 @@
|
||||
|
||||
from flask import Blueprint, request, jsonify
|
||||
from audio_io import AudioIO
|
||||
import os
|
||||
|
||||
recording_bp = Blueprint('recording', __name__)
|
||||
|
||||
@recording_bp.route('/record/start', methods=['POST'])
|
||||
def start_recording():
|
||||
recorder = AudioIO()
|
||||
print('HTTP: Starting audio recording')
|
||||
recorder.start_recording()
|
||||
return jsonify({'status': 'recording started'})
|
||||
|
||||
@recording_bp.route('/record/stop', methods=['POST'])
|
||||
def stop_recording():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Stopping audio recording')
|
||||
recorder.stop_recording()
|
||||
return jsonify({'status': 'recording stopped'})
|
||||
|
||||
@recording_bp.route('/record/save', methods=['POST'])
|
||||
def save_recording():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Saving audio recording')
|
||||
saved_file = recorder.save_last_n_seconds()
|
||||
return jsonify({'status': 'recording saved', 'file': saved_file})
|
||||
|
||||
|
||||
@recording_bp.route('/record/status', methods=['GET'])
|
||||
def recording_status():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Checking recording status')
|
||||
status = 'recording' if recorder.is_recording() else 'stopped'
|
||||
return jsonify({'status': status})
|
||||
|
||||
@recording_bp.route('/record/delete', methods=['POST'])
|
||||
def recording_delete():
|
||||
filename = request.json.get('filename')
|
||||
try:
|
||||
os.remove(filename)
|
||||
return jsonify({'status': 'success'})
|
||||
except Exception as e:
|
||||
return jsonify({'status': 'error', 'message': str(e)}), 400
|
||||
|
||||
@recording_bp.route('/playback/start', methods=['POST'])
|
||||
def playback_start():
|
||||
print(f"Playing clip")
|
||||
# print('HTTP: Starting audio playback')
|
||||
clip = request.json
|
||||
try:
|
||||
io = AudioIO()
|
||||
io.play_clip(clip)
|
||||
# os.remove(filename)
|
||||
return jsonify({'status': 'success'})
|
||||
except Exception as e:
|
||||
|
||||
from flask import Blueprint, request, jsonify
|
||||
from audio_io import AudioIO
|
||||
import os
|
||||
|
||||
recording_bp = Blueprint('recording', __name__)
|
||||
|
||||
@recording_bp.route('/record/start', methods=['POST'])
|
||||
def start_recording():
|
||||
recorder = AudioIO()
|
||||
print('HTTP: Starting audio recording')
|
||||
recorder.start_recording()
|
||||
return jsonify({'status': 'recording started'})
|
||||
|
||||
@recording_bp.route('/record/stop', methods=['POST'])
|
||||
def stop_recording():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Stopping audio recording')
|
||||
recorder.stop_recording()
|
||||
return jsonify({'status': 'recording stopped'})
|
||||
|
||||
@recording_bp.route('/record/save', methods=['POST'])
|
||||
def save_recording():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Saving audio recording')
|
||||
saved_file = recorder.save_last_n_seconds()
|
||||
return jsonify({'status': 'recording saved', 'file': saved_file})
|
||||
|
||||
|
||||
@recording_bp.route('/record/status', methods=['GET'])
|
||||
def recording_status():
|
||||
recorder = AudioIO()
|
||||
# print('HTTP: Checking recording status')
|
||||
status = 'recording' if recorder.is_recording() else 'stopped'
|
||||
return jsonify({'status': status})
|
||||
|
||||
@recording_bp.route('/record/delete', methods=['POST'])
|
||||
def recording_delete():
|
||||
filename = request.json.get('filename')
|
||||
try:
|
||||
os.remove(filename)
|
||||
return jsonify({'status': 'success'})
|
||||
except Exception as e:
|
||||
return jsonify({'status': 'error', 'message': str(e)}), 400
|
||||
|
||||
@recording_bp.route('/playback/start', methods=['POST'])
|
||||
def playback_start():
|
||||
print(f"Playing clip")
|
||||
# print('HTTP: Starting audio playback')
|
||||
clip = request.json
|
||||
try:
|
||||
io = AudioIO()
|
||||
io.play_clip(clip)
|
||||
# os.remove(filename)
|
||||
return jsonify({'status': 'success'})
|
||||
except Exception as e:
|
||||
return jsonify({'status': 'error', 'message': str(e)}), 400
|
||||
@ -1,32 +1,32 @@
|
||||
from flask import Blueprint, request, jsonify
|
||||
from settings import SettingsManager
|
||||
|
||||
settings_bp = Blueprint('settings', __name__)
|
||||
|
||||
|
||||
@settings_bp.route('/settings', methods=['GET'])
|
||||
def get_all_settings():
|
||||
return jsonify({'status': 'success', 'settings': SettingsManager().get_all_settings()})
|
||||
|
||||
@settings_bp.route('/settings/<name>', methods=['GET'])
|
||||
def get_setting(name):
|
||||
value = SettingsManager().get_settings(name)
|
||||
if value is not None:
|
||||
return jsonify({'status': 'success', 'name': name, 'value': value})
|
||||
else:
|
||||
return jsonify({'status': 'error', 'message': f'Setting "{name}" not found'}), 404
|
||||
|
||||
@settings_bp.route('/settings/update', methods=['POST'])
|
||||
def set_all_settings():
|
||||
settings = request.json.get('settings')
|
||||
print (f"Received settings update: {settings}")
|
||||
if settings is None:
|
||||
return jsonify({'status': 'error', 'message': 'Settings are required'}), 400
|
||||
try:
|
||||
for name, value in settings.items():
|
||||
print(f"Updating setting '{name}' to '{value}'")
|
||||
SettingsManager().set_settings(name, value)
|
||||
return jsonify({'status': 'success', 'settings': settings})
|
||||
except ValueError as e:
|
||||
return jsonify({'status': 'error', 'message': str(e)}), 400
|
||||
from flask import Blueprint, request, jsonify
|
||||
from settings import SettingsManager
|
||||
|
||||
settings_bp = Blueprint('settings', __name__)
|
||||
|
||||
|
||||
@settings_bp.route('/settings', methods=['GET'])
|
||||
def get_all_settings():
|
||||
return jsonify({'status': 'success', 'settings': SettingsManager().get_all_settings()})
|
||||
|
||||
@settings_bp.route('/settings/<name>', methods=['GET'])
|
||||
def get_setting(name):
|
||||
value = SettingsManager().get_settings(name)
|
||||
if value is not None:
|
||||
return jsonify({'status': 'success', 'name': name, 'value': value})
|
||||
else:
|
||||
return jsonify({'status': 'error', 'message': f'Setting "{name}" not found'}), 404
|
||||
|
||||
@settings_bp.route('/settings/update', methods=['POST'])
|
||||
def set_all_settings():
|
||||
settings = request.json.get('settings')
|
||||
print (f"Received settings update: {settings}")
|
||||
if settings is None:
|
||||
return jsonify({'status': 'error', 'message': 'Settings are required'}), 400
|
||||
try:
|
||||
for name, value in settings.items():
|
||||
print(f"Updating setting '{name}' to '{value}'")
|
||||
SettingsManager().set_settings(name, value)
|
||||
return jsonify({'status': 'success', 'settings': settings})
|
||||
except ValueError as e:
|
||||
return jsonify({'status': 'error', 'message': str(e)}), 400
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
{
|
||||
"input_device": {
|
||||
"index": 0,
|
||||
"name": "Microsoft Sound Mapper - Input",
|
||||
"max_input_channels": 2,
|
||||
"default_samplerate": 44100.0
|
||||
},
|
||||
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings",
|
||||
"recording_length": 15
|
||||
{
|
||||
"input_device": {
|
||||
"index": 0,
|
||||
"name": "Microsoft Sound Mapper - Input",
|
||||
"max_input_channels": 2,
|
||||
"default_samplerate": 44100.0
|
||||
},
|
||||
"save_path": "C:\\Users\\mickl\\Desktop\\cliptrim-ui\\ClipTrimApp\\audio-service\\src\\recordings",
|
||||
"recording_length": 15
|
||||
}
|
||||
@ -1,112 +1,112 @@
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import comtypes
|
||||
import comtypes.client
|
||||
from comtypes import CLSCTX_ALL
|
||||
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
|
||||
import json
|
||||
|
||||
class WindowsAudioManager:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
"""
|
||||
Initialize Windows audio device and volume management.
|
||||
"""
|
||||
host_apis = sd.query_hostapis()
|
||||
wasapi_device_indexes = None
|
||||
for api in host_apis:
|
||||
if api['name'].lower() == 'Windows WASAPI'.lower():
|
||||
wasapi_device_indexes = api['devices']
|
||||
break
|
||||
# print(f"Host APIs: {host_apis}")
|
||||
# print(f"WASAPI Device Indexes: {wasapi_device_indexes}")
|
||||
wasapi_device_indexes = set(wasapi_device_indexes) if wasapi_device_indexes is not None else set()
|
||||
self.devices = [dev for dev in sd.query_devices() if dev['index'] in wasapi_device_indexes]
|
||||
# self.devices = sd.query_devices()
|
||||
# print(f"devices: {self.devices}")
|
||||
|
||||
self.default_input = sd.default.device[0]
|
||||
self.default_output = sd.default.device[1]
|
||||
|
||||
def list_audio_devices(self, kind='input'):
|
||||
"""
|
||||
List available audio devices.
|
||||
|
||||
:param kind: 'input' or 'output'
|
||||
:return: List of audio devices
|
||||
"""
|
||||
if kind == 'input':
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'channels': dev['max_input_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
for dev in self.devices if dev['max_input_channels'] > 0
|
||||
]
|
||||
elif kind == 'output':
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'channels': dev['max_output_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
for dev in self.devices if dev['max_output_channels'] > 0
|
||||
]
|
||||
def get_default_device(self, kind='input'):
|
||||
"""
|
||||
Get the default audio device.
|
||||
|
||||
:param kind: 'input' or 'output'
|
||||
:return: Default audio device information
|
||||
"""
|
||||
if kind == 'input':
|
||||
dev = self.devices[self.default_input]
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'max_input_channels': dev['max_input_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
]
|
||||
|
||||
def set_default_input_device(self, device_index):
|
||||
if(device_index is None):
|
||||
return 0
|
||||
"""
|
||||
Set the default input audio device.
|
||||
|
||||
:param device_index: Index of the audio device
|
||||
:return: Sample rate of the selected device
|
||||
"""
|
||||
sd.default.device[0] = device_index
|
||||
self.default_input = device_index
|
||||
|
||||
# Get the sample rate of the selected device
|
||||
device_info = sd.query_devices(device_index)
|
||||
return device_info['default_samplerate']
|
||||
|
||||
def set_default_output_device(self, device_index):
|
||||
if(device_index is None):
|
||||
return self.get_current_output_device_sample_rate()
|
||||
"""
|
||||
Set the default output audio device.
|
||||
|
||||
:param device_index: Index of the audio device
|
||||
:return: Sample rate of the selected device
|
||||
"""
|
||||
sd.default.device[1] = device_index
|
||||
self.default_output = device_index
|
||||
|
||||
# Get the sample rate of the selected device
|
||||
device_info = sd.query_devices(device_index)
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import comtypes
|
||||
import comtypes.client
|
||||
from comtypes import CLSCTX_ALL
|
||||
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
|
||||
import json
|
||||
|
||||
class WindowsAudioManager:
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.init()
|
||||
return cls._instance
|
||||
def init(self):
|
||||
"""
|
||||
Initialize Windows audio device and volume management.
|
||||
"""
|
||||
host_apis = sd.query_hostapis()
|
||||
wasapi_device_indexes = None
|
||||
for api in host_apis:
|
||||
if api['name'].lower() == 'Windows WASAPI'.lower():
|
||||
wasapi_device_indexes = api['devices']
|
||||
break
|
||||
# print(f"Host APIs: {host_apis}")
|
||||
# print(f"WASAPI Device Indexes: {wasapi_device_indexes}")
|
||||
wasapi_device_indexes = set(wasapi_device_indexes) if wasapi_device_indexes is not None else set()
|
||||
self.devices = [dev for dev in sd.query_devices() if dev['index'] in wasapi_device_indexes]
|
||||
# self.devices = sd.query_devices()
|
||||
# print(f"devices: {self.devices}")
|
||||
|
||||
self.default_input = sd.default.device[0]
|
||||
self.default_output = sd.default.device[1]
|
||||
|
||||
def list_audio_devices(self, kind='input'):
|
||||
"""
|
||||
List available audio devices.
|
||||
|
||||
:param kind: 'input' or 'output'
|
||||
:return: List of audio devices
|
||||
"""
|
||||
if kind == 'input':
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'channels': dev['max_input_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
for dev in self.devices if dev['max_input_channels'] > 0
|
||||
]
|
||||
elif kind == 'output':
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'channels': dev['max_output_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
for dev in self.devices if dev['max_output_channels'] > 0
|
||||
]
|
||||
def get_default_device(self, kind='input'):
|
||||
"""
|
||||
Get the default audio device.
|
||||
|
||||
:param kind: 'input' or 'output'
|
||||
:return: Default audio device information
|
||||
"""
|
||||
if kind == 'input':
|
||||
dev = self.devices[self.default_input]
|
||||
return [
|
||||
{
|
||||
'index': dev['index'],
|
||||
'name': dev['name'],
|
||||
'max_input_channels': dev['max_input_channels'],
|
||||
'default_samplerate': dev['default_samplerate']
|
||||
}
|
||||
]
|
||||
|
||||
def set_default_input_device(self, device_index):
|
||||
if(device_index is None):
|
||||
return 0
|
||||
"""
|
||||
Set the default input audio device.
|
||||
|
||||
:param device_index: Index of the audio device
|
||||
:return: Sample rate of the selected device
|
||||
"""
|
||||
sd.default.device[0] = device_index
|
||||
self.default_input = device_index
|
||||
|
||||
# Get the sample rate of the selected device
|
||||
device_info = sd.query_devices(device_index)
|
||||
return device_info['default_samplerate']
|
||||
|
||||
def set_default_output_device(self, device_index):
|
||||
if(device_index is None):
|
||||
return self.get_current_output_device_sample_rate()
|
||||
"""
|
||||
Set the default output audio device.
|
||||
|
||||
:param device_index: Index of the audio device
|
||||
:return: Sample rate of the selected device
|
||||
"""
|
||||
sd.default.device[1] = device_index
|
||||
self.default_output = device_index
|
||||
|
||||
# Get the sample rate of the selected device
|
||||
device_info = sd.query_devices(device_index)
|
||||
return device_info['default_samplerate']
|
||||
Reference in New Issue
Block a user