import asyncio from enum import Enum from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from pydantic import BaseModel from download_service import DownloadService from music_player import MusicPlayer, PlayerState class ChangePlayerState(Enum): play = "play" pause = "pause" resume = "resume" stop = "stop" class WSConnectionType(Enum): state = "state" queue = "queue" class ConnectionManager: def __init__(self) -> None: self.active_connections: dict[str, set[WebSocket]] = { WSConnectionType.state.value: set(), WSConnectionType.queue.value: set(), } async def connect(self, websocket: WebSocket, type: WSConnectionType): await websocket.accept() self.active_connections[type.value].add(websocket) async def send(self, ws: WebSocket, message: BaseModel): try: await ws.send_json(message.model_dump()) except Exception: self.disconnect(ws) async def broadcast(self, ws_type: WSConnectionType, message: BaseModel): broken = set() conn_list = list(self.active_connections[ws_type.value]) for idx in range(len(conn_list)): ws = conn_list[idx] try: await ws.send_json(message.model_dump()) except Exception: broken.add(ws) for ws in broken: self.disconnect(ws) def disconnect(self, websocket: WebSocket): if websocket in self.active_connections[WSConnectionType.state.value]: self.active_connections[WSConnectionType.state.value].remove(websocket) if websocket in self.active_connections[WSConnectionType.queue.value]: self.active_connections[WSConnectionType.queue.value].remove(websocket) # Setup tags_metadata = [ {"name": "player", "description": "Interact with the Music Player"}, {"name": "experimental"}, {"name": "queue"}, ] app = FastAPI(openapi_tags=tags_metadata) player = MusicPlayer() dl_service = DownloadService() ws_manager = ConnectionManager() # Interface @app.get("/", response_class=HTMLResponse) async def root(): with open("index.html") as f: return f.read() @app.on_event("startup") async def start_event_loop(): asyncio.create_task(state_broadcast_loop()) asyncio.create_task(queue_broadcast_loop()) async def state_broadcast_loop(): while True: await player._state_event.wait() await ws_manager.broadcast(WSConnectionType.state, player.get_state()) player._state_event.clear() async def queue_broadcast_loop(): while True: await player._queue_event.wait() await ws_manager.broadcast(WSConnectionType.queue, player.get_queue()) player._queue_event.clear() # Status updates @app.websocket("/player") async def websocket_player(websocket: WebSocket): await ws_manager.connect(websocket, WSConnectionType.state) try: while True: await websocket.receive_text() await ws_manager.send(websocket, player.get_state()) except WebSocketDisconnect: ws_manager.disconnect(websocket) # Queue updates @app.websocket("/queue") async def websocket_queue(websocket: WebSocket): await ws_manager.connect(websocket, WSConnectionType.queue) try: while True: await websocket.receive_text() await ws_manager.send(websocket, player.get_queue()) except WebSocketDisconnect: ws_manager.disconnect(websocket) @app.get("/queue", tags=["queue"]) def get_queue(): return player.get_queue() @app.post("/queue", tags=["queue"]) async def post_to_queue(url: str): track = dl_service.download(url) await player.add_to_queue(track) @app.post("/player/play", tags=["player"]) async def player_play(): await player.play() @app.post("/player/stop", tags=["player"]) async def player_stop(): await player.stop() @app.post("/player/skip", tags=["player"]) async def player_skip(): await player.next() # Player @app.put("/player/volume", tags=["player"]) async def set_volume(volume: float): await player.set_volume(volume) @app.get("/player/volume", tags=["player"]) def get_volume(): return {"volume": player.get_volume()} @app.get("/player", tags=["player"]) def get_player_state() -> PlayerState: return player.get_state()