230 lines
6.6 KiB
Python
230 lines
6.6 KiB
Python
import asyncio
|
|
import os
|
|
import threading
|
|
import time
|
|
from enum import Enum
|
|
from uuid import uuid4
|
|
|
|
import pygame
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class Track(BaseModel):
|
|
id: str = str(uuid4())
|
|
artist: str | None
|
|
title: str | None
|
|
duration: int | None
|
|
filepath: str
|
|
|
|
|
|
class WSConnectionType(Enum):
|
|
state = "state"
|
|
queue = "queue"
|
|
|
|
|
|
class PlaybackState(str, Enum):
|
|
Playing = "Playing"
|
|
Paused = "Paused"
|
|
Stopped = "Stopped"
|
|
|
|
|
|
class PlayerState(BaseModel):
|
|
playback_state: PlaybackState = PlaybackState.Stopped
|
|
track: Track | None = None
|
|
position: int | None = None
|
|
volume: float
|
|
|
|
|
|
class Queue(BaseModel):
|
|
items: list[Track] = []
|
|
|
|
def next(self) -> Track | None:
|
|
if len(self.items) > 0:
|
|
return self.items.pop(0)
|
|
else:
|
|
return None
|
|
|
|
def add(self, track: Track) -> None:
|
|
self.items.append(track)
|
|
|
|
def remove_by_id(self, track_id: str) -> None:
|
|
track = self.get_by_id(track_id)
|
|
if track:
|
|
self.items.remove(track)
|
|
|
|
def get_by_id(self, track_id: str) -> Track | None:
|
|
return next((x for x in self.items if x.id == track_id), None)
|
|
|
|
def len(self) -> int:
|
|
return len(self.items)
|
|
|
|
|
|
class MusicPlayer:
|
|
def __init__(self):
|
|
# Player State
|
|
self._state = PlayerState(volume=1)
|
|
self._state_lock: asyncio.Lock = asyncio.Lock()
|
|
self._state_event: asyncio.Event = asyncio.Event()
|
|
|
|
# Sound initialization
|
|
pygame.init()
|
|
pygame.mixer.init()
|
|
pygame.mixer.music.set_endevent(pygame.USEREVENT)
|
|
pygame.mixer.music.set_volume(self._state.volume)
|
|
|
|
# Threading
|
|
self.lock = threading.Lock()
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._run_async_loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
# Queue
|
|
self._queue: Queue = Queue()
|
|
self._queue_lock: asyncio.Lock = asyncio.Lock()
|
|
self._queue_event: asyncio.Event = asyncio.Event()
|
|
|
|
def _run_async_loop(self):
|
|
asyncio.run(self._loop())
|
|
|
|
async def _loop(self):
|
|
while self._running:
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.USEREVENT: # a song just ended
|
|
await self._handle_track_finished()
|
|
await self._play_next_track()
|
|
|
|
await self._update_position()
|
|
time.sleep(0.1)
|
|
|
|
async def _update_position(self):
|
|
new_pos = pygame.mixer.music.get_pos() // 1000
|
|
# Changes only on fulls seconds
|
|
if new_pos != self._state.position:
|
|
if new_pos == -1:
|
|
self._state.position = None
|
|
else:
|
|
self._state.position = new_pos
|
|
|
|
self._state_event.set()
|
|
|
|
async def _set_playback_state(self, value: PlaybackState):
|
|
async with self._state_lock:
|
|
self._state.playback_state = value
|
|
self._state_event.set()
|
|
|
|
async def _set_track(self, track: Track | None):
|
|
async with self._state_lock:
|
|
self._state.track = track
|
|
self._state.position = 0
|
|
self._state_event.set()
|
|
|
|
async def _handle_track_finished(self) -> None:
|
|
print(f"Finished playing {self._state.track}")
|
|
await self._unload_track()
|
|
await self._set_playback_state(PlaybackState.Stopped)
|
|
|
|
async def _unload_track(self) -> None:
|
|
pygame.mixer.music.unload()
|
|
# Delete file from disc
|
|
if self._state.track:
|
|
os.remove(self._state.track.filepath)
|
|
# Update state
|
|
await self._set_track(None)
|
|
|
|
async def _load_track(self, track: Track):
|
|
await self._set_track(track)
|
|
pygame.mixer.music.unload()
|
|
pygame.mixer.music.load(track.filepath)
|
|
|
|
async def _queue_add(self, track):
|
|
async with self._queue_lock:
|
|
self._queue.add(track)
|
|
self._queue_event.set()
|
|
|
|
async def _queue_next(self) -> Track | None:
|
|
async with self._queue_lock:
|
|
next_track = self._queue.next()
|
|
self._queue_event.set()
|
|
return next_track
|
|
|
|
async def _start_playback(self):
|
|
pygame.mixer.music.play()
|
|
await self._set_playback_state(PlaybackState.Playing)
|
|
|
|
async def _pause_playback(self):
|
|
pygame.mixer.music.pause()
|
|
await self._set_playback_state(PlaybackState.Paused)
|
|
|
|
async def _resume_playback(self):
|
|
pygame.mixer.music.unpause()
|
|
await self._set_playback_state(PlaybackState.Playing)
|
|
|
|
async def _stop_playback(self):
|
|
pygame.mixer.music.stop()
|
|
await self._set_playback_state(PlaybackState.Stopped)
|
|
await self._set_track(None)
|
|
|
|
async def _play_next_track(self):
|
|
next_track = await self._queue_next()
|
|
if next_track:
|
|
await self._load_track(next_track)
|
|
await self._start_playback()
|
|
|
|
async def add_to_queue(self, track: Track):
|
|
with self.lock:
|
|
que_len = self._queue.len()
|
|
await self._queue_add(track)
|
|
# If queue is empty and no corrent track, start playing
|
|
if que_len == 0 and not self._state.track:
|
|
await self._play_next_track()
|
|
|
|
def remove_from_queue_by_id(self, track_id: str):
|
|
with self.lock:
|
|
self._queue.remove_by_id(track_id)
|
|
self._queue_event.set()
|
|
|
|
async def play(self):
|
|
with self.lock:
|
|
match self._state.playback_state:
|
|
case PlaybackState.Playing:
|
|
await self._pause_playback()
|
|
case PlaybackState.Paused:
|
|
await self._resume_playback()
|
|
case PlaybackState.Stopped:
|
|
await self._play_next_track()
|
|
|
|
async def stop(self):
|
|
with self.lock:
|
|
await self._stop_playback()
|
|
|
|
async def next(self):
|
|
with self.lock:
|
|
await self._play_next_track()
|
|
|
|
def shutdown(self):
|
|
with self.lock:
|
|
self._running = False
|
|
self._thread.join()
|
|
pygame.mixer.quit()
|
|
|
|
def get_queue(self) -> Queue:
|
|
return self._queue
|
|
|
|
async def _set_volume(self, volume: float):
|
|
async with self._state_lock:
|
|
self._state.volume = volume
|
|
pygame.mixer.music.set_volume(volume)
|
|
self._state_event.set()
|
|
|
|
async def set_volume(self, volume: float) -> None:
|
|
with self.lock:
|
|
await self._set_volume(volume)
|
|
|
|
def get_volume(self) -> float:
|
|
return self._state.volume
|
|
|
|
def get_state(self) -> PlayerState:
|
|
return self._state
|
|
|
|
def get_current_track(self) -> Track | None:
|
|
return self._state.track
|