+++ /dev/null
-
-import os
-
-from brisa.core import log
-from brisa.upnp.device import Service, ServiceController
-
-pjoin = os.path.join
-
-class PlayListOutBoundExcept(Exception):
- def __rep__(self):
- return "Play List Out of Bounds!"
-
-
-
-class PlayList(Service):
- """Class PlayList
- Introduction
- ============
- Implements a playlist for ZukeBox server.
- """
-
- service_type = "urn:schemas-upnp-org:service:PlayList:1"
- service_name = "PlayList"
-
- def __init__(self, positions=10, xml_path):
- scpd_path = pjoin(xml_path, "zukebox-playlist-scpd.xml")
- Service.__init__(self, service_name, service_type, '', scpd_path,
- PlayListControl(positions, service_type))
-
- def get_playlist(self):
- return self.control_controller.get_playlist()
-
-class PlayListControl(ServiceController):
-
- def __init__(self, positions, serv_type):
- ServiceController.__init__(self, serv_type)
- self.positions = positions
- self.list = []
- self.current = 0
- self.prev = self.current
- self.next = None
- self.from_name = None
- self.to_name = None
- self.current_uri = None
- self.current_uri_metadata = None
-
- def soap_IsLocked(self, *args, **kwargs):
- locked = True
- if not len(self.list) == self.positions:
- locked = False
- rt = {"Locked": locked}
- return {"IsLockedResponse": rt}
-
- def soap_IsAvailble(self, *args, **kwargs):
- availble = False
- if not len(self.list) == 0:
- availble = True
- rt = {"Availble": availble}
- return {"IsAvailbleResponse": rt}
-
- def soap_IsAvailble(self, *args, **kwargs):
- availble = False
- if not len(self.list) == 0:
- availble = True
- rt = {"Availble": availble}
- return {"IsAvailbleResponse": rt}
-
- def soap_Append(self, *args, **kwargs):
- """Put a object in the playlist
- """
- if not self.is_locked():
- self.current_uri = kwargs["CurrentURI"]
- self.current_uri_metadata = kwargs["CurrentURIMetaData"]
- self.from_name = kwargs["FromName"]
- self.to_name = kwargs["ToName"]
- self.list.append(self.current_uri)
-
- return {"Append": {}}
- else:
- raise PlayListOutBoundExcept()
-
- def soap_Drop(self, *args, **kwargs):
- """Pop the object at position passed by index
- """
- if self.is_availble():
- index = kwargs["Index"]
- self.list.pop(index)
- return {"Drop": {}}
- else:
- raise PlayListOutBoundExcept()
-
- def soap_GetSizeOfPlayList(self, *args, **kwargs):
- """Return the size of playlist"""
- lenght = len(self.list)
- rt = {"PlayListSize": lenght}
- return {"GetSizeOfPlayListResponse": rt}
-
- def soap_GetCurrent(self, *args, **kwargs):
- if self.is_availble():
- rt = {"CurrentPosition": self.list[self.current]}
- return {"GetCurrentResponse": rt}
-
- def clean_playlist(self):
- if self.is_availble():
- self.list = []
-
- def get_playlist(self):
- return self.list