py-dj/models.py

230 lines
6.6 KiB
Python

import asyncio
import os
import threading
import time
from enum import Enum
from uuid import uuid4
import pygame
from pydantic import BaseModel
class Track(BaseModel):
id: str = str(uuid4())
artist: str | None
title: str | None
duration: int | None
filepath: str
class WSConnectionType(Enum):
state = "state"
queue = "queue"
class PlaybackState(str, Enum):
Playing = "Playing"
Paused = "Paused"
Stopped = "Stopped"
class PlayerState(BaseModel):
playback_state: PlaybackState = PlaybackState.Stopped
track: Track | None = None
position: int | None = None
volume: float
class Queue(BaseModel):
items: list[Track] = []
def next(self) -> Track | None:
if len(self.items) > 0:
return self.items.pop(0)
else:
return None
def add(self, track: Track) -> None:
self.items.append(track)
def remove_by_id(self, track_id: str) -> None:
track = self.get_by_id(track_id)
if track:
self.items.remove(track)
def get_by_id(self, track_id: str) -> Track | None:
return next((x for x in self.items if x.id == track_id), None)
def len(self) -> int:
return len(self.items)
class MusicPlayer:
def __init__(self):
# Player State
self._state = PlayerState(volume=1)
self._state_lock: asyncio.Lock = asyncio.Lock()
self._state_event: asyncio.Event = asyncio.Event()
# Sound initialization
pygame.init()
pygame.mixer.init()
pygame.mixer.music.set_endevent(pygame.USEREVENT)
pygame.mixer.music.set_volume(self._state.volume)
# Threading
self.lock = threading.Lock()
self._running = True
self._thread = threading.Thread(target=self._run_async_loop, daemon=True)
self._thread.start()
# Queue
self._queue: Queue = Queue()
self._queue_lock: asyncio.Lock = asyncio.Lock()
self._queue_event: asyncio.Event = asyncio.Event()
def _run_async_loop(self):
asyncio.run(self._loop())
async def _loop(self):
while self._running:
for event in pygame.event.get():
if event.type == pygame.USEREVENT: # a song just ended
await self._handle_track_finished()
await self._play_next_track()
await self._update_position()
time.sleep(0.1)
async def _update_position(self):
new_pos = pygame.mixer.music.get_pos() // 1000
# Changes only on fulls seconds
if new_pos != self._state.position:
if new_pos == -1:
self._state.position = None
else:
self._state.position = new_pos
self._state_event.set()
async def _set_playback_state(self, value: PlaybackState):
async with self._state_lock:
self._state.playback_state = value
self._state_event.set()
async def _set_track(self, track: Track | None):
async with self._state_lock:
self._state.track = track
self._state.position = 0
self._state_event.set()
async def _handle_track_finished(self) -> None:
print(f"Finished playing {self._state.track}")
await self._unload_track()
await self._set_playback_state(PlaybackState.Stopped)
async def _unload_track(self) -> None:
pygame.mixer.music.unload()
# Delete file from disc
if self._state.track:
os.remove(self._state.track.filepath)
# Update state
await self._set_track(None)
async def _load_track(self, track: Track):
await self._set_track(track)
pygame.mixer.music.unload()
pygame.mixer.music.load(track.filepath)
async def _queue_add(self, track):
async with self._queue_lock:
self._queue.add(track)
self._queue_event.set()
async def _queue_next(self) -> Track | None:
async with self._queue_lock:
next_track = self._queue.next()
self._queue_event.set()
return next_track
async def _start_playback(self):
pygame.mixer.music.play()
await self._set_playback_state(PlaybackState.Playing)
async def _pause_playback(self):
pygame.mixer.music.pause()
await self._set_playback_state(PlaybackState.Paused)
async def _resume_playback(self):
pygame.mixer.music.unpause()
await self._set_playback_state(PlaybackState.Playing)
async def _stop_playback(self):
pygame.mixer.music.stop()
await self._set_playback_state(PlaybackState.Stopped)
await self._set_track(None)
async def _play_next_track(self):
next_track = await self._queue_next()
if next_track:
await self._load_track(next_track)
await self._start_playback()
async def add_to_queue(self, track: Track):
with self.lock:
que_len = self._queue.len()
await self._queue_add(track)
# If queue is empty and no corrent track, start playing
if que_len == 0 and not self._state.track:
await self._play_next_track()
def remove_from_queue_by_id(self, track_id: str):
with self.lock:
self._queue.remove_by_id(track_id)
self._queue_event.set()
async def play(self):
with self.lock:
match self._state.playback_state:
case PlaybackState.Playing:
await self._pause_playback()
case PlaybackState.Paused:
await self._resume_playback()
case PlaybackState.Stopped:
await self._play_next_track()
async def stop(self):
with self.lock:
await self._stop_playback()
async def next(self):
with self.lock:
await self._play_next_track()
def shutdown(self):
with self.lock:
self._running = False
self._thread.join()
pygame.mixer.quit()
def get_queue(self) -> Queue:
return self._queue
async def _set_volume(self, volume: float):
async with self._state_lock:
self._state.volume = volume
pygame.mixer.music.set_volume(volume)
self._state_event.set()
async def set_volume(self, volume: float) -> None:
with self.lock:
await self._set_volume(volume)
def get_volume(self) -> float:
return self._state.volume
def get_state(self) -> PlayerState:
return self._state
def get_current_track(self) -> Track | None:
return self._state.track