py-dj/main.py
2025-04-14 10:10:01 +02:00

161 lines
4.2 KiB
Python

import asyncio
from enum import Enum
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from download_service import DownloadService
from music_player import MusicPlayer, PlayerState
class ChangePlayerState(Enum):
play = "play"
pause = "pause"
resume = "resume"
stop = "stop"
class WSConnectionType(Enum):
state = "state"
queue = "queue"
class ConnectionManager:
def __init__(self) -> None:
self.active_connections: dict[str, set[WebSocket]] = {
WSConnectionType.state.value: set(),
WSConnectionType.queue.value: set(),
}
async def connect(self, websocket: WebSocket, type: WSConnectionType):
await websocket.accept()
self.active_connections[type.value].add(websocket)
async def send(self, ws: WebSocket, message: BaseModel):
try:
await ws.send_json(message.model_dump())
except Exception:
self.disconnect(ws)
async def broadcast(self, ws_type: WSConnectionType, message: BaseModel):
broken = set()
for ws in self.active_connections[ws_type.value]:
try:
await ws.send_json(message.model_dump())
except Exception:
broken.add(ws)
for ws in broken:
self.disconnect(ws)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections[WSConnectionType.state.value]:
self.active_connections[WSConnectionType.state.value].remove(websocket)
if websocket in self.active_connections[WSConnectionType.queue.value]:
self.active_connections[WSConnectionType.queue.value].remove(websocket)
# Setup
tags_metadata = [
{"name": "player", "description": "Interact with the Music Player"},
{"name": "experimental"},
{"name": "queue"},
]
app = FastAPI(openapi_tags=tags_metadata)
player = MusicPlayer()
dl_service = DownloadService()
ws_manager = ConnectionManager()
# Interface
@app.get("/", response_class=HTMLResponse)
async def root():
with open("interface.html") as f:
return f.read()
@app.on_event("startup")
async def start_event_loop():
asyncio.create_task(state_broadcast_loop())
asyncio.create_task(queue_broadcast_loop())
async def state_broadcast_loop():
while True:
await player._state_event.wait()
await ws_manager.broadcast(WSConnectionType.state, player.get_state())
player._state_event.clear()
async def queue_broadcast_loop():
while True:
await player._queue_event.wait()
await ws_manager.broadcast(WSConnectionType.queue, player.get_queue())
player._queue_event.clear()
# Status updates
@app.websocket("/player")
async def websocket_player(websocket: WebSocket):
await ws_manager.connect(websocket, WSConnectionType.state)
try:
while True:
await websocket.receive_text()
await ws_manager.send(websocket, player.get_state())
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
# Queue updates
@app.websocket("/queue")
async def websocket_queue(websocket: WebSocket):
await ws_manager.connect(websocket, WSConnectionType.queue)
try:
while True:
await websocket.receive_text()
await ws_manager.send(websocket, player.get_queue())
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
@app.get("/queue", tags=["queue"])
def get_queue():
return player.get_queue()
@app.post("/queue", tags=["queue"])
async def post_to_queue(url: str):
track = dl_service.download(url)
await player.add_to_queue(track)
@app.post("/player/play", tags=["player"])
async def player_play():
await player.play()
@app.post("/player/stop", tags=["player"])
async def player_stop():
await player.stop()
@app.post("/player/skip", tags=["player"])
async def player_skip():
await player.next()
# Player
@app.put("/player/volume", tags=["player"])
async def set_volume(volume: float):
await player.set_volume(volume)
@app.get("/player/volume", tags=["player"])
def get_volume():
return {"volume": player.get_volume()}
@app.get("/player", tags=["player"])
def get_player_state() -> PlayerState:
return player.get_state()