83 lines
2.9 KiB
Python
83 lines
2.9 KiB
Python
import json
|
|
import logging
|
|
|
|
import spotipy
|
|
from pydantic import BaseModel, Field
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
|
|
from ..base_plugin import BasePlugin
|
|
|
|
|
|
class PlayMusicInput(BaseModel):
|
|
query: str = Field(..., description="Can be a song, artist, album, or playlist")
|
|
|
|
|
|
class Plugin(BasePlugin):
|
|
def __init__(self, config: dict) -> None:
|
|
super().__init__(config=config)
|
|
|
|
self.spotify = spotipy.Spotify(
|
|
auth_manager=SpotifyOAuth(
|
|
scope="user-library-read",
|
|
redirect_uri="http://localhost:8080",
|
|
client_id=self.config["plugins"]["music"]["spotify_client_id"],
|
|
client_secret=self.config["plugins"]["music"]["spotify_client_secret"]
|
|
)
|
|
)
|
|
|
|
def _get_speaker_for_device(self, device_id: str) -> str:
|
|
"""
|
|
Get the appropriate speaker entity_id based on device_id.
|
|
Falls back to default_speaker if device_id is not found or empty.
|
|
"""
|
|
device_speakers = self.config["plugins"]["music"].get("device_speakers", {})
|
|
|
|
if device_id and device_id in device_speakers:
|
|
speaker = device_speakers[device_id]
|
|
logging.info(f"Using device-specific speaker for {device_id}: {speaker}")
|
|
return speaker
|
|
|
|
default_speaker = self.config["plugins"]["music"]["default_speaker"]
|
|
logging.info(f"Using default speaker: {default_speaker}")
|
|
return default_speaker
|
|
|
|
def _search(self, query: str, limit: int = 10):
|
|
_result = self.spotify.search(query, limit=limit)
|
|
result = []
|
|
for track in _result["tracks"]["items"]:
|
|
artists = [artist["name"] for artist in track["artists"]]
|
|
result.append(
|
|
{
|
|
"name": track["name"],
|
|
"artists": artists,
|
|
"uri": track["uri"]
|
|
}
|
|
)
|
|
return result
|
|
|
|
def tool_play_music(self, input: PlayMusicInput):
|
|
"""
|
|
Play music using a search query.
|
|
"""
|
|
track = self._search(input.query, limit=1)[0]
|
|
speaker = self._get_speaker_for_device(self.device_id)
|
|
logging.info(f"Playing {track['name']} by {', '.join(track['artists'])} on {speaker}")
|
|
payload = {
|
|
"entity_id": speaker,
|
|
"media_content_id": track["uri"],
|
|
"media_content_type": "music",
|
|
"enqueue": "play",
|
|
}
|
|
result = self.homeassistant.call_api(f"services/media_player/play_media", payload=payload)
|
|
return json.dumps({"status": "success", "message": f"Playing music.", "track": track})
|
|
|
|
def tool_stop_music(self):
|
|
"""
|
|
Stop playback of music.
|
|
"""
|
|
speaker = self._get_speaker_for_device(self.device_id)
|
|
self.homeassistant.call_api(
|
|
f"services/media_player/media_pause", payload={"entity_id": speaker}
|
|
)
|
|
return json.dumps({"status": "success", "message": f"Music paused."})
|