import asyncio import os import threading import time from enum import Enum from uuid import uuid4 import pygame from pydantic import BaseModel class Track(BaseModel): id: str = str(uuid4()) artist: str | None title: str | None duration: int | None filepath: str class WSConnectionType(Enum): state = "state" queue = "queue" class PlaybackState(str, Enum): Playing = "Playing" Paused = "Paused" Stopped = "Stopped" class PlayerState(BaseModel): playback_state: PlaybackState = PlaybackState.Stopped track: Track | None = None position: int | None = None volume: float class Queue(BaseModel): items: list[Track] = [] def next(self) -> Track | None: if len(self.items) > 0: return self.items.pop(0) else: return None def add(self, track: Track) -> None: self.items.append(track) def remove_by_id(self, track_id: str) -> None: track = self.get_by_id(track_id) if track: self.items.remove(track) def get_by_id(self, track_id: str) -> Track | None: return next((x for x in self.items if x.id == track_id), None) def len(self) -> int: return len(self.items) class MusicPlayer: def __init__(self): # Player State self._state = PlayerState(volume=1) self._state_lock: asyncio.Lock = asyncio.Lock() self._state_event: asyncio.Event = asyncio.Event() # Sound initialization pygame.init() pygame.mixer.init() pygame.mixer.music.set_endevent(pygame.USEREVENT) pygame.mixer.music.set_volume(self._state.volume) # Threading self.lock = threading.Lock() self._running = True self._thread = threading.Thread(target=self._run_async_loop, daemon=True) self._thread.start() # Queue self._queue: Queue = Queue() self._queue_lock: asyncio.Lock = asyncio.Lock() self._queue_event: asyncio.Event = asyncio.Event() def _run_async_loop(self): asyncio.run(self._loop()) async def _loop(self): while self._running: for event in pygame.event.get(): if event.type == pygame.USEREVENT: # a song just ended await self._handle_track_finished() await self._play_next_track() await self._update_position() time.sleep(0.1) async def _update_position(self): new_pos = pygame.mixer.music.get_pos() // 1000 # Changes only on fulls seconds if new_pos != self._state.position: if new_pos == -1: self._state.position = None else: self._state.position = new_pos self._state_event.set() async def _set_playback_state(self, value: PlaybackState): async with self._state_lock: self._state.playback_state = value self._state_event.set() async def _set_track(self, track: Track | None): async with self._state_lock: self._state.track = track self._state.position = 0 self._state_event.set() async def _handle_track_finished(self) -> None: print(f"Finished playing {self._state.track}") await self._unload_track() await self._set_playback_state(PlaybackState.Stopped) async def _unload_track(self) -> None: pygame.mixer.music.unload() # Delete file from disc if self._state.track: os.remove(self._state.track.filepath) # Update state await self._set_track(None) async def _load_track(self, track: Track): await self._set_track(track) pygame.mixer.music.unload() pygame.mixer.music.load(track.filepath) async def _queue_add(self, track): async with self._queue_lock: self._queue.add(track) self._queue_event.set() async def _queue_next(self) -> Track | None: async with self._queue_lock: next_track = self._queue.next() self._queue_event.set() return next_track async def _start_playback(self): pygame.mixer.music.play() await self._set_playback_state(PlaybackState.Playing) async def _pause_playback(self): pygame.mixer.music.pause() await self._set_playback_state(PlaybackState.Paused) async def _resume_playback(self): pygame.mixer.music.unpause() await self._set_playback_state(PlaybackState.Playing) async def _stop_playback(self): pygame.mixer.music.stop() await self._set_playback_state(PlaybackState.Stopped) await self._set_track(None) async def _play_next_track(self): next_track = await self._queue_next() if next_track: await self._load_track(next_track) await self._start_playback() async def add_to_queue(self, track: Track): with self.lock: que_len = self._queue.len() await self._queue_add(track) # If queue is empty and no corrent track, start playing if que_len == 0 and not self._state.track: await self._play_next_track() def remove_from_queue_by_id(self, track_id: str): with self.lock: self._queue.remove_by_id(track_id) self._queue_event.set() async def play(self): with self.lock: match self._state.playback_state: case PlaybackState.Playing: await self._pause_playback() case PlaybackState.Paused: await self._resume_playback() case PlaybackState.Stopped: await self._play_next_track() async def stop(self): with self.lock: await self._stop_playback() async def next(self): with self.lock: await self._play_next_track() def shutdown(self): with self.lock: self._running = False self._thread.join() pygame.mixer.quit() def get_queue(self) -> Queue: return self._queue async def _set_volume(self, volume: float): async with self._state_lock: self._state.volume = volume pygame.mixer.music.set_volume(volume) self._state_event.set() async def set_volume(self, volume: float) -> None: with self.lock: await self._set_volume(volume) def get_volume(self) -> float: return self._state.volume def get_state(self) -> PlayerState: return self._state def get_current_track(self) -> Track | None: return self._state.track