#!/usr/bin/env python import urllib, urllib2, sys, re PROG = "lurker.py" VERSION = "0.1a" DEBUG = True # we require BeautifulSoup try: import BeautifulSoup except ImportError: print "Error: BeautifulSoup is required for %s" % (PROG) print " You can install it via easyinstall: easy_install beautifulsoup" sys.exit(-1) if DEBUG: import unittest def dbg(s): print 'dbg> %s' % s enc = urllib.urlencode home = 'http://gotlurk.net' search = 'http://gotlurk.net/?action=dosearch' # regex patterns # matches v#### after a '_' or ' ' vol = re.compile(r'[ _](?Pv[0-9]{1,4})') # matches c#### or -#### chap = re.compile(r'(?P[c\-]{1}[0-9]{1,4})') ### Generic Utilities ### def numToList(string): """converts a string like "3,5,7-9,14" into a list of ints raises InvalidRange raises InvalidInteger""" ret = [] numsplit = string.split(",") # the following code makes nums into a list of all integers for n in numsplit: nr = n.split('-') # handle the case of a single number if len(nr) == 1: try: ret.append(int(n)) except: raise ValueError("number (%s)" % n) # handle the case of a range elif len(nr) == 2: try: low = int(nr[0]) high = int(nr[1]) + 1 if low > high: raise ValueError("number (%s)" % nr) ret += range(low, high) except ValueError: raise ValueError("number (%s)" % nr) else: raise ValueError("range") return ret def sizeToBytes(string): """Converts 'human' sizes like 11K/23.1M/1.11G to bytes.""" ### Juicy Bits ### def get_cookie(): 'need a valid session to search at lurk...' res = urllib2.urlopen(home) if 'set-cookie' in res.headers.keys(): return res.headers['set-cookie'].split(';')[0] cookie = False def do_query(querystr): 'does a query on lurk, returns a list of results created by soup_results' query = enc({'sstr' : querystr}) req = urllib2.Request(search) req.add_header('Cookie', cookie) req.add_header('Referer', home) req.add_data(query) res = urllib2.urlopen(req) results = res.readlines() restr = ''.join(results) return soup_results(restr) def soup_results(restr): soup = BeautifulSoup.BeautifulSoup(restr) rows = soup.tbody.findChildren('tr')[1:] results = [] for row in rows: tds = row.findChildren('td') if len(tds) != 6: continue results.append({ 'bot': tds[0].a.string, 'pack': int(tds[2].string), 'gets': int(tds[3].string.strip('x')), 'size': tds[4].string, 'name': tds[5].string}) return results def parse_name(name): coax = lambda x: int(x.strip('-_cv')) v = vol.search(name) c = chap.search(name) ret = {'vol' : None, 'chap' : None} if v: ret['vol'] = coax(v.group('vol')) if c: ret['chap'] = coax(c.group('chap')) return ret class FilterFormatExc(Exception): pass def num_filter(pstr): stripped = pstr.strip('!<>') pl = numToList(stripped) if len(pl) > 1: # we are dealing with a range here; only '!' or none are legal if pstr.startswith('!'): return lambda x: int(x) not in pl elif pstr != stripped: raise FilterFormatExc('Only "!" is allowed with multiple numbers') return lambda x: int(x) in pl # okay, there is only 1 number, all '!<>' are legal # if we don't use '!<>' if stripped == pstr: return lambda x: int(stripped) == int(x) elif pstr.startswith('!'): return lambda x: int(x) != int(stripped) elif pstr.startswith('>'): return lambda x: int(x) > int(stripped) elif pstr.startswith('<'): return lambda x: int(x) < int(stripped) raise FilterFormatExc('Only "!><" are allowed with a single number') def build_filter(pstr, num=True): if num: return num_filter(pstr) return lambda x: pstr.lower() not in x.lower() def pack_desc(pack): 'turn a pack into its desc string' desc = ' pack #%s on %s (%s gets @ %s)' % (pack['pack'], pack['bot'], pack['gets'], pack['size']) name = pack['name'].ljust(40) return name + desc def apply_filter(filt, x): try: return filt(x) except: return True def main(): global cookie sf, bf, vf, cf = False, False, False, False opts, querystr = handleOptions() if opts.string: sf = build_filter(opts.string, False) if opts.bot: _bf = build_filter(opts.bot, False) bf = lambda x: not _bf(x) if opts.vol: dbg('Filter: %s' % opts.vol) try: vf = build_filter(opts.vol) except FilterFormatExc, ex: print "Error: %s" % ex sys.exit(-1) if opts.chap: try: cf = build_filter(opts.chap) except FilterFormatExc, ex: print "Error: %s" % ex sys.exit(-1) dbg('query: <%s>' % querystr) cookie = get_cookie() # yum dbg('cookie: <%r>' % cookie) results = do_query(querystr) if sf: results = filter(lambda x: apply_filter(sf, pack_desc(x)), results) if bf: results = filter(lambda x: apply_filter(bf, x['bot']), results) if vf: results = filter(lambda x: apply_filter(vf, parse_name(x['name'])['vol']), results) if cf: results = filter(lambda x: apply_filter(cf, parse_name(x['name'])['chap']), results) if opts.onlyvols: results = filter(lambda x: parse_name(x['name'])['vol'] != None, results) if opts.onlychaps: results = filter(lambda x: parse_name(x['name'])['chap'] != None, results) for item in results: print pack_desc(item) if opts.xdccq: bots = set() for item in results: bots.add(item['bot']) for bot in bots: packs = [p['pack'] for p in results if p['bot'] == bot] print 'xdccq get %s %s' % (bot, ','.join(map(str, packs))) def handleOptions(): from optparse import OptionParser, OptionGroup parser = OptionParser(usage='%prog [options] Query', version=VERSION) parser.add_option('-x', '--xdccq', action='store_true', dest='xdccq', help='output for use in xdccq') group_filter = OptionGroup(parser, "Filtering options") group_filter.add_option('-s', '--string', action='store', dest='string', help='filter out by string match') group_filter.add_option('-b', '--bot', action='store', dest='bot', help='filter bots') group_filter.add_option('-v', '--vol', action='store', dest='vol', help='filter volumes') group_filter.add_option('-c', '--chap', action='store', dest='chap', help='filter chapters') group_filter.add_option('-V', '--vols-only', action='store_true', dest='onlyvols', help='only show volumes') group_filter.add_option('-C', '--chaps-only', action='store_true', dest='onlychaps', help='only show chapters') group_debug = OptionGroup(parser, "Debugging options") group_debug.add_option('', '--test', action='store_true', dest='test', help='run unit tests') parser.add_option_group(group_filter) parser.add_option_group(group_debug) options, args = parser.parse_args() querystr = ' '.join(args).strip() # if we have requested the unit test, run it and exit if options.test: for arg in sys.argv: sys.argv.remove(arg) dotests() # if we lack a query string, print the usage and exit if not querystr: parser.print_usage() sys.exit(-1) return options, querystr ### Testing ### testStrings = [ ("Hajime_no_Ippo_v70_c02[SC].zip", 70, 2), ("Migiko_Nippon_Ichi[solaris-svu].zip", None, None), ("Tsuki_no_Shippo_v04_c23[ShoujoCrusade][e-scans].zip", 4, 23), ("One_Piece_c477[FH].zip", None, 477), ("One_Piece_ c479[OPHQ].zip", None, 479), ("One_Piece-442[Null][KEFI].zip", None, 442) ] testNumtolistStrings = [ ("1", [1]), ("1,2", [1,2]), ("1,3-9,11", [1,3,4,5,6,7,8,9,11]) ] testFilters = [ ("!234", [1,2,234,5], [1,2,5]), ("<40", [38,39,40,41,42], [38,39]), (">10", [7,8,9,10,11,12], [11,12]), ("!5-8", [4,5,6,7,8,9], [4,9]) ] testStrFilter = [ ("fo", ['foo','faz','feo','fo'], ['faz','feo']) ] # yes, negative numbers are not allowed testNumToListException = ["10,5-2", "11,G,3", "11,-3,-1"] def listcmp(l,r): if len(l) != len(r): return False for item in zip(l,r): if item[0] != item[1]: return False return True class TestRegexMatching(unittest.TestCase): def testMatching(self): for s in testStrings: m = parse_name(s[0]) self.assertEqual(m['vol'], s[1]) self.assertEqual(m['chap'], s[2]) #TODO: test that bad patterns fail properly class TestFilterApplication(unittest.TestCase): def testNumberFiltering(self): for s in testFilters: filt = build_filter(s[0], True) l = filter(filt, s[1]) self.assertEquals(True, listcmp(l, s[2])) def testStrFiltering(self): for s in testStrFilter: filt = build_filter(s[0], False) l = filter(filt, s[1]) self.assertEquals(True, listcmp(l, s[2])) class TestNumtolist(unittest.TestCase): def testNumtolist(self): for s in testNumtolistStrings: l = numToList(s[0]) self.assertEquals(True, listcmp(l, s[1])) def testNumtolistExc(self): s = testNumToListException self.assertRaises(ValueError, numToList, s[0]) self.assertRaises(ValueError, numToList, s[1]) self.assertRaises(ValueError, numToList, s[2]) def dotests(): unittest.main() sys.exit(0) if __name__ == '__main__': main()