import asyncio import yt_dlp from fastapi import WebSocket from pydantic import BaseModel from models import Track, WSConnectionType class ConnectionManager: def __init__(self) -> None: self.active_connections: dict[str, set[WebSocket]] = { WSConnectionType.state.value: set(), WSConnectionType.queue.value: set(), } async def connect(self, websocket: WebSocket, type: WSConnectionType): await websocket.accept() self.active_connections[type.value].add(websocket) async def send(self, ws: WebSocket, message: BaseModel): try: await ws.send_json(message.model_dump()) except Exception: self.disconnect(ws) async def broadcast(self, ws_type: WSConnectionType, message: BaseModel): broken = set() conn_list = list(self.active_connections[ws_type.value]) for idx in range(len(conn_list)): ws = conn_list[idx] try: await ws.send_json(message.model_dump()) except Exception: broken.add(ws) for ws in broken: self.disconnect(ws) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections[WSConnectionType.state.value]: self.active_connections[WSConnectionType.state.value].remove(websocket) if websocket in self.active_connections[WSConnectionType.queue.value]: self.active_connections[WSConnectionType.queue.value].remove(websocket) class DownloadService: ydl_opts = { "format": "mp3/bestaudio/best", # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", } ], "paths": {"home": "queue"}, "outtmpl": {"default": "%(artist)s - %(track)s [%(id)s].%(ext)s"}, } def __init__(self) -> None: self.ydl = yt_dlp.YoutubeDL(self.ydl_opts) async def download(self, url: str) -> Track: def extract(): return self.ydl.extract_info(url, download=True) info = await asyncio.to_thread(extract) try: filepath = info["requested_downloads"][-1]["filepath"] # type: ignore except KeyError: raise ValueError("Could not ") track = Track( artist=info.get("artist", None), # type: ignore title=info.get("title", None), # type: ignore duration=info.get("duration", None), # type: ignore filepath=filepath, # type: ignore ) print(f"Finished processing {track}") return track