#!/usr/bin/env python # # this module uses code from Michael Hudson's xmms-py modules # this code is available in its original form here: # http://www.python.net/crew/mwh/hacks/xmms-py.html # the original code had this notice on it: # # Released by Michael Hudson on 2000-07-01, and again on 2001-04-26 # public domain; no warranty, no restrictions # # most of the support for xmms, beep, and audacious comes from # various pieces of Hudson's modules # # licensed under the GNU GPL v2 # a copy of this license can be found: # http://www.gnu.org/copyleft/gpl.html # # Future Plan: # - allow for customizeable announce string (like dcclogger) # - more mp3 players (any requests?) # # 11/26/07 - Fixed a bug w/ using juk/amarok w/o pydcop # import xchat import sys, struct import socket, os, pwd from subprocess import * pcop, pydcop, bus = False, False, False try: import pcop, pydcop except: pass try: import dbus bus = dbus.SessionBus() except: pass __module_name__ = "pymp3" __module_version__ = "0.5" __module_description__ = "mp3 announce/utils" __debugging__ = False if __debugging__: import traceback def print_debug(string): global __debugging__ if __debugging__: print "\00302" + str(string) + "\003" def print_info(string): print "\00303" + str(string) + "\003" players = { 'audacious' : 'audacious', 'beep' : 'beep-media-player', 'xmms' : 'xmms', 'banshee' : 'banshee.exe', 'juk' : 'juk', 'amarok' : 'amarokapp', 'rhythmbox' : 'rhythmbox', } # find out which player is running def which(): ps = Popen(['ps', 'aux'], stdout=PIPE) output = ps.stdout.readlines() for line in output: for player,findstr in players.items(): if line.rfind(findstr) > -1: return player return #FIXME: This code isn't that great; it should probably not rely on 'split' since # quoted won't work properly. Think of a way to fix this (maybe resort to shell=True) def command(runstr): return Popen(runstr.split(), stdout=PIPE).communicate()[0] # these players use xmms style command socket SOCKET_PLAYERS = ['audacious', 'beep', 'xmms'] class SocketCommand: CMD_PLAY = 2 # CMD_PAUSE = 3 # CMD_STOP = 4 # CMD_GET_PLAYLIST_POS = 7 # # TODO: make socket_next and socket_prev use this # instead of using next/prev repeatedly #CMD_SET_PLAYLIST_POS = 8 # CMD_GET_PLAYLIST_LENGTH = 9 # CMD_GET_OUTPUT_TIME = 11 # CMD_GET_PLAYLIST_FILE = 17 # CMD_GET_PLAYLIST_TITLE = 18 # CMD_GET_PLAYLIST_TIME = 19 # CMD_GET_INFO = 20 # CMD_EJECT = 28 # CMD_PLAYLIST_PREV = 29 # CMD_PLAYLIST_NEXT = 30 # CMD_TOGGLE_REPEAT = 33 # CMD_TOGGLE_SHUFFLE = 34 # class ClientPacketHeader: def __init__(self): self.version,self.cmd,self.length = 0,0,0 def __repr__(self): return "<< %s : version: %s cmd: %s length: %s >>"\ %(self.__class__.__name__,self.version,self.cmd,self.length) def encode(self): return struct.pack("hhl",self.version,self.cmd,self.length) """ I've tried to make the following class a facsimily of a "persistent connection", but my attempts have led to the following error with xmms: ** WARNING **: ctrl_write_packet(): Failed to send data: Broken pipe Even manually closing, deleting, and then re-initializing the socket did not avoid this warning. It seems that only letting the garbage collector snag old Connection objects makes xmms happy. There is one aspect here missing from Hudson's original library: sending a custom send format with the 'args' option. I wasn't using this feature in any requests, as all of my provided formats were 'l' anyway. """ class XmmsConnection: def __init__(self,session=0): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect("/tmp/xmms_%s.%d"%(pwd.getpwuid(os.geteuid())[0],session)) def read_header(self): head = ClientPacketHeader() head.version, head.cmd, head.length = struct.unpack("hhl",self.sock.recv(8)) return head def send(self, cmd, args=''): data = "" if isinstance(args, int): data = struct.pack('l', args) packet = struct.pack("hhl", 1, cmd, len(data)) + data self.sock.send(packet) def get_reply(self, format=''): header = self.read_header() if format: reply = struct.unpack(format, self.sock.recv(header.length)) else: reply = self.sock.recv(header.length) return reply class AudaciousConnection(XmmsConnection): def __init__(self,session=0): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect("/tmp/audacious_%s.%d"%(pwd.getpwuid(os.geteuid())[0],session)) class MediaPlayer: """A superclass that implements some book-keeping and some convenience functions for media player objects. These objects print out as an announce string, and get (and cache) info from the player in a clean, consistent "getitem" API.""" def __init__(self, name): self.name = name def _nyi(self, name): print_debug("%s not yet implemented for `%s`." % name, self.name) def _empty_dict(self): """Return an empty info dictionary with all of the keys set.""" keys = ['player', 'playlist_position', 'playlist_length', 'file', 'display_title', 'elapsed', 'length', 'bitrate', 'frequency', 'title', 'artist', 'album', 'track'] d = {} for key in keys: d[key] = '' d['player'] = self.name return d def human_bitrate(self, bps): """Takes bits per second and returns a string with appropriate units.""" units = ['bps', 'kbps', 'Mbps'] # order of magnitude # if we get a weird number, assume kbps = kiloBYTESpersec # if we get a number ending in '00', assume it's 1000's of bits (correct) if str(bps).endswith("00"): reduce_factor = 1000 else: reduce_factor = 1024.0 oom = 0 while bps /(reduce_factor**(oom+1)) >= 1: oom += 1 return '%0.1f %s' % (bps/reduce_factor**oom, units[oom]) def s_to_ms(self, s): """Converts seconds to minutes:seconds: mm:ss.""" s = int(s) sec = s % 60 min = s / 60 return '%2d:%02d' % (min, sec) def us_to_ms(self, us): """Converts miliseconds to minutes:seconds: mm:ss.""" us = int(us) return self.s_to_ms(us/1000) def play(self): self._nyi('Play') def stop(self): self._nyi('Stop') def pause(self): self._nyi('Pause') def next(self): self._nyi('Next') def prev(self): self._nyi('Prev') def eject(self): self._nyi('eject') def open(self): self._nyi('open') def shuffle(self): self._nyi('shuffle') def repeat(self): self._nyi('repeat') def get_info(self): return self._empty_dict() def __str__(self): """FIXME: This implements the old announce strings. It's probably easier to move this to the subclasses, but for now this is fine.""" info = self.get_info() if self.name in SOCKET_PLAYERS: return '%s ~ [%s] of [%s] ~ %s ~ %sHz' % (info['display_title'], \ info['elapsed'], info['length'], info['bitrate'], info['frequency']) elif self.name in ['juk', 'amarok']: return '%s - [%s] - %s ~ [%s] of [%s] ~ %s' % (info['artist'], \ info['album'], info['title'], info['elapsed'], info['length'], \ info['bitrate']) elif self.name in ['banshee', 'rhythmbox']: return '%s - [%s] - %s ~ [%s] of [%s]' % (info['artist'], info['album'], \ info['title'], info['elapsed'], info['length']) def __repr__(self): return '' % (self.name) class Xmms(MediaPlayer): def __init__(self, name='xmms'): MediaPlayer.__init__(self, name) self._ifcache = {} def _makeConnection(self): if self.name in ['beep', 'xmms']: return XmmsConnection() elif self.name == 'audacious': return AudaciousConnection() return False def _cmd(self, command, args='', reply_format=''): connection = self._makeConnection() connection.send(command, args=args) return connection.get_reply(format=reply_format) def play(self): self._cmd(SocketCommand.CMD_PLAY) def stop(self): self._cmd(SocketCommand.CMD_STOP) def pause(self): self._cmd(SocketCommand.CMD_PAUSE) def next(self): self._cmd(SocketCommand.CMD_PLAYLIST_NEXT) def prev(self): self._cmd(SocketCommand.CMD_PLAYLIST_PREV) def eject(self): self._cmd(SocketCommand.CMD_EJECT) def open(self): self._cmd(SocketCommand.CMD_EJECT) def shuffle(self): self._cmd(SocketCommand.CMD_TOGGLE_SHUFFLE) def repeat(self): self._cmd(SocketCommand.CMD_TOGGLE_REPEAT) def get_info(self): d = self._empty_dict() d['playlist_position'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_POS, reply_format='l')[0] position = d['playlist_position'] d['playlist_length'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_LENGTH, reply_format='l')[0] d['file'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_FILE, args=position)[:-1] d['display_title'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TITLE, args=position)[:-1] info = self._cmd(SocketCommand.CMD_GET_INFO, reply_format='lll') utime_elapsed = self._cmd(SocketCommand.CMD_GET_OUTPUT_TIME, reply_format='l')[0] utime_length = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TIME, args=position, reply_format='l')[0] d['elapsed'] = self.us_to_ms(utime_elapsed) d['length'] = self.us_to_ms(utime_length) d['bitrate'] = self.human_bitrate(info[0]) d['frequency'] = info[1] return d BEEP_FIRST_RUN = True BEEP_MESSAGE = """beep-media-player has a bug with its control socket and returns bogus information for bitrate, frequency, and number of channels. Consider the 'audacious' media player, or BMPx, as beep-media-player is no longer in development.""".replace("\n", ' ') class Beep(Xmms): def __init__(self): global BEEP_FIRST_RUN, BEEP_MESSAGE if BEEP_FIRST_RUN: print_info(BEEP_MESSAGE) BEEP_FIRST_RUN = False Xmms.__init__(self, 'beep') class Audacious(Xmms): def __init__(self): Xmms.__init__(self, 'audacious') BANSHEE_FIRST_RUN = True BANSHEE_MESSAGE = """Although banshee is supported without them, it is recommended that you install the python-dbus bindings for increased speed.""".replace("\n", " ") class Banshee(MediaPlayer): def __init__(self): global BANSHEE_FIRST_RUN, BANSHEE_MESSAGE if BANSHEE_FIRST_RUN and not bus: print_info(BANSHEE_MESSAGE) BANSHEE_FIRST_RUN = False MediaPlayer.__init__(self, 'banshee') self._ifcache = {} interface = ['play', 'stop', 'pause', 'next', 'prev', 'eject', 'open', 'get_info'] if bus: self.d_obj = bus.get_object("org.gnome.Banshee", "/org/gnome/Banshee/Player") self.banshee = dbus.Interface(self.d_obj, "org.gnome.Banshee.Core") for func in interface: setattr(self, func, getattr(self, '%s_dbus' % func)) else: for func in interface: setattr(self, func, getattr(self, '%s_nodbus' % func)) def play_dbus(self): self.banshee.Play() def stop_dbus(self): self.banshee.Pause() def pause_dbus(self): self.banshee.TogglePlaying() def next_dbus(self): self.banshee.Next() def prev_dbus(self): self.banshee.Previous() def eject_dbus(self): self.banshee.ShowWindow() def open_dbus(self): self.banshee.ShowWindow() def get_info_dbus(self): d = self._empty_dict() d['length'] = self.s_to_ms(self.banshee.GetPlayingDuration()) d['elapsed'] = self.s_to_ms(self.banshee.GetPlayingPosition()) d['artist'] = unicode(self.banshee.GetPlayingArtist()).encode('UTF-8') d['title'] = unicode(self.banshee.GetPlayingTitle()).encode('UTF-8') d['album'] = unicode(self.banshee.GetPlayingAlbum()).encode('UTF-8') return d def play_nodbus(self): command('banshee --play') def stop_nodbus(self): command('banshee --pause') def pause_nodbus(self): command('banshee --toggle-playing') def next_nodbus(self): command('banshee --next') def prev_nodbus(self): command('banshee --previous') def eject_nodbus(self): command('banshee --show') def open_nodbus(self): command('banshee --show') # shuffle & repeat not yet implemented def get_info_nodbus(self): d = self._empty_dict() info = command(' '.join(['banshee', '--hide-field', '--query-title', '--query-artist', '--query-position', '--query-album', '--query-duration'])).strip() # duration, artist, album, title, position # banshee reports things in seconds info = info.split('\n') d['length'] = self.s_to_ms(info[0]) d['artist'] = info[1] d['album'] = info[2] d['title'] = info[3] d['elapsed'] = self.s_to_ms(info[4]) return d class Rhythmbox(MediaPlayer): """MediaPlayer class for Rhythmbox, a Gtk/Gnome media player. It's possible to implement this without using python-dbus by wrapping around 'dbus-send', but frankly I didn't feel like it.""" def __init__(self): if not bus: raise Exception('Rhythmbox is not supported w/o python-dbus bindings.') MediaPlayer.__init__(self, 'rhythmbox') player_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Player") shell_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Shell") self.player = dbus.Interface(player_obj, "org.gnome.Rhythmbox.Player") self.shell = dbus.Interface(shell_obj, "org.gnome.Rhythmbox.Shell") def play(self): if not bool(self.player.getPlaying()): self.player.playPause() def stop(self): if bool(self.player.getPlaying()): self.player.playPause() def pause(self): self.player.playPause() def next(self): self.player.next() def prev(self): self.player.previous() def eject(self): print_info("There isn't an easy way to do this in rhythmbox right now.") def open(self): print_info("There isn't an easy way to do this in rhythmbox right now.") def get_info(self): d = self._empty_dict() uri = unicode(self.player.getPlayingUri()) properties = dict([(unicode(key), val) for key,val in dict(self.shell.getSongProperties(uri)).items()]) d['length'] = self.s_to_ms(int(properties.get('duration', 0))) d['elapsed'] = self.s_to_ms(int(self.player.getElapsed())) d['artist'] = unicode(properties.get('artist', '')).encode('UTF-8') d['album'] = unicode(properties.get('album', '')).encode('UTF-8') d['title'] = unicode(properties.get('title', '')).encode('UTF-8') # Banshee reports a 'bitrate', but as far as i can tell it's always 0 return d JUK_FIRST_RUN = True DCOP_MESSAGE = """Although juk is supported without them, it is recommended that you install the python-dcop bindings for increased speed.""".replace("\n", ' ') class Juk(MediaPlayer): """MediaPlayer class for Juk, a Qt/KDE media player. This implementation is a bit messy because it resolves whether or not to use DCOP statically; after importing, the comparissons are made and the appropriate functions are used.""" def __init__(self): global JUK_FIRST_RUN, DCOP_MESSAGE, pydcop if JUK_FIRST_RUN and not pydcop: print_info(DCOP_MESSAGE) JUK_FIRST_RUN = False MediaPlayer.__init__(self, 'juk') self._ifcache = {} # these functions are to be selected from _%s_dcop and #s_nodcop self._functions = ['eject', 'open'] # these functions are created below; the keys are function names, the values # are juk PLayer dcop values self._func_map = {'play':'play', 'stop':'stop', 'pause':'playPause', 'next':'forward', 'prev':'back'} if pydcop: # if we have pydcop, create 'juk' and set some functions self.juk = pydcop.anyAppCalled("juk") self.get_property = (lambda x: self.juk.Player.trackProperty(x)) self.get_juk = (lambda func: getattr(self.juk.Player, func)()) for func in self._functions: setattr(self, func, getattr(self, '_%s_dcop' % func)) else: # with no dcop, set equivalent functions to above using 'command' interface self.get_property = (lambda x: command('dcop juk Player trackProperty %s' % (x)).strip()) self.get_juk = (lambda func: command('dcop juk Player %s' % func)) for func in self._functions: setattr(self, func, getattr(self, '_%s_nodcop' % func)) # this forloop sets all of the keys in 'func_map' to lambdas that call # whatever 'get_juk' was created by the conditional above for funcname, juk_property in self._func_map.items(): setattr(self, funcname, (lambda prop=juk_property: self.get_juk(prop))) def _eject_dcop(self): pcop.dcop_call("juk", "juk-mainwindow#1", "restore", ()) pcop.dcop_call("juk", "juk-mainwindow#1", "raise", ()) def _open_dcop(self): self._eject_dcop() def _eject_nodcop(self): command('dcop juk juk-mainwindow#1 restore') command('dcop juk juk-mainwindow#1 raise') def _open_nodcop(self): self._eject_nodcop() def get_info(self): d = self._empty_dict() elapsed = self.get_juk('currentTime') d['elapsed'] = self.s_to_ms(elapsed) d['title'] = self.get_property('Title') d['artist'] = self.get_property('Artist') d['album'] = self.get_property('Album') d['length'] = self.s_to_ms(self.get_property('Seconds')) d['bitrate'] = '%s Kbps' % self.get_property('Bitrate') return d AMAROK_FIRST_RUN = True AMAROK_DCOP_MESSAGE = """Although amarok is supported without them, it is recommended that you install the python-dcop bindings for increased speed.""".replace("\n", ' ') class Amarok(MediaPlayer): """MediaPlayer class for Amarok, a Qt/KDE media player. This implementation is a bit messy because it resolves whether or not to use DCOP statically; after importing, the comparissons are made and the appropriate functions are used.""" def __init__(self): global AMAROK_FIRST_RUN, AMAROK_DCOP_MESSAGE, pydcop if AMAROK_FIRST_RUN and not pydcop: print_info(AMAROK_DCOP_MESSAGE) AMAROK_FIRST_RUN = False MediaPlayer.__init__(self, 'amarok') self._ifcache = {} """If the pydcop is available, then we create a 'self.get_property' function that uses pydcop; if it isn't available, we create a function that works the same but using our 'command' interface. Then, using the 'self.get_property', we bind 'self.play', 'self.stop', etc. to the object's namespace.""" self._functions = ['play', 'stop', 'pause'] if pydcop: self.amarok = pydcop.anyAppCalled("amarok") self.get_property = (lambda x: getattr(self.amarok.player, x)()) self.get_playlist = (lambda x: getattr(self.amarok.playlist, x)()) self.set_playlist = (lambda x: self.amarok.playlist.playByIndex(x)) else: self.get_property = (lambda x: command('dcop amarok player %s' % x).strip()) self.get_playlist = (lambda x: command('dcop amarok playlist %s' % x).strip()) self.set_playlist = (lambda x: command('dcop amarok playlist playByIndex %s' % x)) for func in self._functions: setattr(self, func, (lambda func=func: self.get_property(func))) def open(self): print_info("There isn't an easy way to do this with amarok right now.") def eject(self): print_info("There isn't an easy way to do this with amarok right now.") def prev_n(self, n): """Go backwards 'n' times in the playlist""" position = self.get_playlist('getActiveIndex') new_position = position - n if new_position < 0: new_position = 0 self.set_playlist(new_position) def next_n(self, n): """Go forwards 'n' times in the playlist""" position = self.get_playlist('getActiveIndex') playlist_length = self.get_playlist('getTotalTrackCount') new_position = position + n if new_position >= playlist_length: new_position = playlist_length - 1 self.set_playlist(new_position) def get_info(self): d = self._empty_dict() # this comes back in 'm:ss' d['elapsed'] = self.get_property('currentTime') d['title'] = self.get_property('title') d['artist'] = self.get_property('artist') d['album'] = self.get_property('album') d['length'] = self.get_property('totalTime') d['bitrate'] = '%s Kbps' % self.get_property('bitrate') return d def current_player(): player = which() print_debug("detected %s is running" % player) if not player: raise Exception("Currently not running a supported media player.") player_obj = eval("%s()" % player.capitalize()) return player_obj def help(args): print "\nCommands:" print " \002/mp3\002 : announce the currently playing mp3" print " \002/mp3\002 \00303stop\003 : stop playing" print " \002/mp3\002 \00303play\003 : start playing" print " \002/mp3\002 \00303pause\003 : pause playback" print " \002/mp3\002 \00303next [#]\003 : skip to next (# of) track(s)" print " \002/mp3\002 \00303prev [#]\003 : skip to prev (# of) track(s)" print " \002/mp3\002 \00303open\003 : open files" print "" def usage(): print "Usage: \002/mp3\002 [cmd]\n\002/mp3\002 \037help\037 for commands." def announce(): player = current_player() xchat.command('me is listening to: %s' % (player)) def stop(*args): player = current_player() player.stop() def play(*args): player = current_player() player.play() def pause(*args): player = current_player() player.pause() def open(*args): player = current_player() player.open() def eject(*args): player = current_player() player.eject() def _make_num(numstr): try: return int(numstr) except: print_error('"%s" must be a number.' % numstr) return False def next(argv): num = 1 if len(argv) == 3: num = _make_num(argv[2]) if not num: return player = current_player() if player.name in ['amarok']: player.next_n(num) else: for i in range(num): player.next() def prev(argv): num = 1 if len(argv) == 3: num = _make_num(argv[2]) if not num: return player = current_player() if player.name in ['amarok']: player.prev_n(num) else: for i in range(num): player.prev() def dispatch(argv, arg_to_eol, c): if len(argv) == 1: try: announce() except Exception, ex: if __debugging__: traceback.print_exc(sys.stdout) if len(getattr(ex, 'args', [])): print_info(ex.args[0]) else: usage() return xchat.EAT_XCHAT try: { "help" : help, "stop" : stop, "play" : play, "pause" : pause, "next" : next, "prev" : prev, "eject" : eject, "open" : open, }[argv[1]](argv) except Exception, ex: if __debugging__: traceback.print_exc(sys.stdout) if len(getattr(ex, 'args', [])): print_info(ex.args[0]) else: usage() return xchat.EAT_XCHAT __unhook__ = xchat.hook_command("mp3", dispatch, help="/mp3 help for commands.")