import threading from queue import Queue, Empty import numpy as np import requests import base64 import time from dataclasses import dataclass, field import websocket import threading import ssl import librosa import os class AudioStreamingClient: def __init__(self): self.auth_token = os.environ.get("HF_AUTH_TOKEN", None) self.api_url = os.environ.get("HF_API_URL", None) self.stop_event = threading.Event() self.send_queue = Queue() self.recv_queue = Queue() self.session_id = None self.headers = { "Accept": "application/json", "Authorization": f"Bearer {self.auth_token}", "Content-Type": "application/json" } self.session_state = "idle" # Possible states: idle, sending, processing, waiting self.ws_ready = threading.Event() def start(self): print("Starting audio streaming...") ws_url = self.api_url.replace("http", "ws") + "/ws" self.ws = websocket.WebSocketApp( ws_url, header=[f"{key}: {value}" for key, value in self.headers.items()], on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) self.ws_thread = threading.Thread(target=self.ws.run_forever, kwargs={'sslopt': {"cert_reqs": ssl.CERT_NONE}}) self.ws_thread.start() # Wait for the WebSocket to be ready self.ws_ready.wait() self.send_thread = threading.Thread(target=self.send_audio) self.send_thread.start() def on_close(self): self.stop_event.set() self.send_thread.join() self.ws.close() self.ws_thread.join() print("Audio streaming stopped.") def on_open(self, ws): print("WebSocket connection opened.") self.ws_ready.set() # Signal that the WebSocket is ready def on_message(self, ws, message): # message is bytes if message == b'DONE': print("listen") self.session_state = "listen" else: print("processing") self.session_state = "processing" audio_np = np.frombuffer(message, dtype=np.int16) self.recv_queue.put(audio_np) def on_error(self, ws, error): print(f"WebSocket error: {error}") def on_close(self, ws, close_status_code, close_msg): print("WebSocket connection closed.") def send_audio(self): while not self.stop_event.is_set(): if not self.send_queue.empty(): chunk = self.send_queue.get() if self.session_state != "processing": self.ws.send(chunk.tobytes(), opcode=websocket.ABNF.OPCODE_BINARY) else: self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake time.sleep(0.01) def put_audio(self, chunk, sample_rate): chunk = np.clip(chunk, -32768, 32767).astype(np.int16) chunk = chunk.astype(np.float32) / 32768.0 chunk = librosa.resample(chunk, orig_sr=48000, target_sr=16000) chunk = (chunk * 32768.0).astype(np.int16) self.send_queue.put(chunk) def get_audio(self, sample_rate, output_size): output_chunk = np.array([], dtype=np.int16) output_sample_rate = 16000 output_chunk_size = int(output_size*output_sample_rate/sample_rate) while output_chunk.size < output_chunk_size: try: self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake chunk = self.recv_queue.get(timeout=0.1) except Empty: chunk = None if chunk is not None: # Ensure chunk is int16 and clip to valid range chunk_int16 = np.clip(chunk, -32768, 32767).astype(np.int16) output_chunk = np.concatenate([output_chunk, chunk_int16]) else: print("padding chunk of size ", len(output_chunk)) output_chunk = np.pad(output_chunk, (0, output_chunk_size - len(output_chunk))) output_chunk = output_chunk.astype(np.float32) / 32768.0 output_chunk = librosa.resample(output_chunk, orig_sr=output_sample_rate, target_sr=sample_rate) output_chunk = (output_chunk * 32768.0).astype(np.int16) print("output_chunk size: ", len(output_chunk)) output_chunk = output_chunk[:output_size] return np.pad(output_chunk, (0, output_size - len(output_chunk))) if __name__ == "__main__": client = AudioStreamingClient() client.start()