#! /usr/bin/python # coding: utf-8 # # axi-cache - Query apt-xapian-index database # # Copyright (C) 2010 Enrico Zini # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # TODO: # - save NAME save the query to be recalled later with @NAME (or if notmuch # has a syntax for saved queries, recycle it) from optparse import OptionParser from cStringIO import StringIO import sys import os, os.path import axi VERSION="0.45" # Setup configuration DEBTAGS_VOCABULARY = "/var/lib/debtags/vocabulary" #XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME", os.expanduser("~/.config")) XDG_CACHE_HOME = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache")) CACHEFILE = os.path.join(XDG_CACHE_HOME, "axi-cache.state") # Activate support for the CJK tokenizer os.environ["XAPIAN_CJK_NGRAM"] = "1" try: from ConfigParser import RawConfigParser import re import math import xapian import apt try: from debian import deb822 except ImportError: from debian_bundle import deb822 helponly = False except ImportError, e: print >>sys.stderr, "%s: only help functions are implemented, for the sake of help2man" % str(e) helponly = True if not helponly: class SavedStateError(Exception): pass class AptSilentProgress(apt.progress.text.OpProgress) : "Quiet progress so we don't get cache loading messages from Apt" def __init__(self): pass def done(self): pass def update(self, percent=None): pass def readVocabulary(): try: fin = open(DEBTAGS_VOCABULARY) except Exception, e: # Only show this when being verbose print >>sys.stderr, "Cannot read %s: %s. Please install `debtags' t" % (DEBTAGS_VOCABULARY, str(e)) return None, None facets = dict() tags = dict() for entry in deb822.Deb822.iter_paragraphs(fin): if "Facet" in entry: facets[entry["Facet"]] = entry elif "Tag" in entry: tags[entry["Tag"]] = entry return facets, tags class DB(object): class BasicFilter(xapian.ExpandDecider): def __init__(self, stemmer=None, exclude=None, prefix=None): super(DB.BasicFilter, self).__init__() self.stem = stemmer if stemmer else lambda x:x self.exclude = set([self.stem(x) for x in exclude]) if exclude else set() self.prefix = prefix def __call__(self, term): if len(term) < 4: return False if self.prefix is not None: # Skip leading uppercase chars t = term while t and t[0].isupper(): t = t[1:] if not t.startswith(self.prefix): return False if self.stem(term) in self.exclude: return False if term.startswith("XT") or term.startswith("XS"): return True return term[0].islower() class TermFilter(BasicFilter): def __call__(self, term): if len(term) < 4: return False if self.stem(term) in self.exclude: return False return term[0].islower() class TagFilter(xapian.ExpandDecider): def __call__(self, term): return term.startswith("XT") def __init__(self): # Access the Xapian index self.db = xapian.Database(axi.XAPIANINDEX) self.stem = xapian.Stem("english") # Build query parser self.qp = xapian.QueryParser() self.qp.set_default_op(xapian.Query.OP_AND) self.qp.set_database(self.db) self.qp.set_stemmer(self.stem) self.qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME) self.qp.add_prefix("pkg", "XP") self.qp.add_boolean_prefix("tag", "XT") self.qp.add_boolean_prefix("sec", "XS") #notmuch->value_range_processor = new Xapian::NumberValueRangeProcessor (NOTMUCH_VALUE_TIMESTAMP); #notmuch->query_parser->add_valuerangeprocessor (notmuch->value_range_processor); # Read state from previous runs self.cache = RawConfigParser() if os.path.exists(CACHEFILE): try: self.cache.read(CACHEFILE) except Exception, e: print >>sys.stderr, e print >>sys.stderr, "ignoring %s which seems to be corrupted" % CACHEFILE self.dirty = False self.facets = None self.tags = None def save(self): "Save the state so we find it next time" if self.dirty: if not os.path.exists(XDG_CACHE_HOME): os.makedirs(XDG_CACHE_HOME, mode=0700) self.cache.write(open(CACHEFILE, "w")) self.dirty = False def vocabulary(self): if self.facets is None: self.facets, self.tags = readVocabulary() return self.facets, self.tags def unprefix(self, term): "Convert DB prefixes to user prefixes" if term.startswith("XT"): return "tag:" + term[2:] elif term.startswith("XS"): return "sec:" + term[2:] elif term.startswith("XP"): return "pkg:" + term[2:] return term def is_tag(self, t): return self.db.term_exists("XT" + t) def set_query_args(self, args, secondary=False): def term_or_tag(t): if "::" in t and self.is_tag(t): return "tag:" + t else: return t args = map(term_or_tag, args) self.set_query_string(" ".join(args), secondary=secondary) def set_query_string(self, q, secondary=False): "Set the query in the cache" if not secondary: self.set_cache_last("query", q) self.unset_cache_last("secondary query") else: self.set_cache_last("secondary query", q) self.unset_cache_last("skip") def set_sort(self, key=None, cutoff=60): "Set sorting method (default is by relevance)" if key is None: self.unset_cache_last("sort") self.unset_cache_last("cutoff") else: self.set_cache_last("sort", key) self.set_cache_last("cutoff", str(cutoff)) self.unset_cache_last("skip") def build_query(self): "Build query from cached query info" q = self.get_cache_last("query") if not self.cache.has_option("last", "query"): raise SavedStateError("no saved query") self.query = self.qp.parse_query(q, xapian.QueryParser.FLAG_BOOLEAN | xapian.QueryParser.FLAG_LOVEHATE | xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE | xapian.QueryParser.FLAG_WILDCARD | xapian.QueryParser.FLAG_PURE_NOT | xapian.QueryParser.FLAG_SPELLING_CORRECTION | xapian.QueryParser.FLAG_AUTO_SYNONYMS) secondary = self.get_cache_last("secondary query", None) if secondary: secondary = self.qp.parse_query(secondary, xapian.QueryParser.FLAG_BOOLEAN | xapian.QueryParser.FLAG_LOVEHATE | xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE | xapian.QueryParser.FLAG_WILDCARD | xapian.QueryParser.FLAG_PURE_NOT | xapian.QueryParser.FLAG_SPELLING_CORRECTION | xapian.QueryParser.FLAG_AUTO_SYNONYMS) self.query = xapian.Query(xapian.Query.OP_AND, self.query, secondary) # print "Query:", self.query # Build the enquire with the query self.enquire = xapian.Enquire(self.db) self.enquire.set_query(self.query) sort = self.get_cache_last("sort") if sort is not None: values, descs = axi.readValueDB() # If we don't sort by relevance, we need to specify a cutoff in order to # remove poor results from the output # # Note: ept-cache implements an adaptive cutoff as follows: # 1. Retrieve only one result, with default sorting. Read its relevance as # the maximum relevance. # 2. Set the cutoff as some percentage of the maximum relevance # 3. Set sort by the wanted value # 4. Perform the query # TODO: didn't this use to work? #self.enquire.set_cutoff(int(self.get_cache_last("cutoff", 60))) reverse = sort[0] == '-' or sort[-1] == '-' sort = sort.strip('-') # Sort by the requested value self.enquire.set_sort_by_value(values[sort], reverse) def get_matches(self, first=None, count=20): """ Return a Xapian mset with the next page of results. """ if first is None: first = int(self.get_cache_last("skip", 0)) self.set_cache_last("lastfirst", first) self.set_cache_last("skip", first + count) matches = self.enquire.get_mset(first, count) return matches def get_all_matches(self, first=None): """ Generate Xapian matches for all query matches """ if first is None: first = int(self.get_cache_last("skip", 0)) self.unset_cache_last("lastfirst") self.unset_cache_last("skip") while True: matches = self.enquire.get_mset(first, 100) count = matches.size() if count == 0: break for m in matches: yield m first += 100 def get_spelling_correction(self): return self.qp.get_corrected_query_string() def get_suggestions(self, count=10, filter=None): """ Compute suggestions for more terms Return a Xapian ESet """ # Use the first 30 results as the key ones to use to compute relevant # terms rset = xapian.RSet() for m in self.enquire.get_mset(0, 30): rset.add_document(m.docid) # Get results, optionally filtered if filter is None: filter = self.BasicFilter() return self.enquire.get_eset(count, rset, filter) # ConfigParser access wrappers with lots of extra ifs, needed because the # ConfigParser API has been designed to throw exceptions in the most stupid # places one could possibly conceive def get_cache_last(self, key, default=None): if self.cache.has_option("last", key): return self.cache.get("last", key) return default def set_cache_last(self, key, val): if not self.cache.has_section("last"): self.cache.add_section("last") self.cache.set("last", key, val) self.dirty = True def unset_cache_last(self, key): if not self.cache.has_section("last"): return self.cache.remove_option("last", key) def get_rdeps(self, name, pfx): "Return all the rdeps of type @pfx for package @name" enquire = xapian.Enquire(self.db) enquire.set_query(xapian.Query(pfx+name)) first = 0 while True: found = 0 for m in enquire.get_mset(first, first + 20): found += 1 yield m.document.get_data() if found < 20: break first += 20 class Cmdline(object): BOOLWORDS = set(["and", "or", "not"]) def __init__(self): self.name = "axi-cache" class Parser(OptionParser): def __init__(self, cmdline, *args, **kwargs): OptionParser.__init__(self, *args, **kwargs) self.cmdline = cmdline def error(self, msg): sys.stderr.write("%s: error: %s\n\n" % (self.get_prog_name(), msg)) self.print_help(sys.stderr) sys.exit(2) def print_help(self, fd=sys.stdout): commands = StringIO() self.cmdline.format_help(commands) buf = StringIO() # Still in 2010 Cmdline is not an object. Oh dear. OptionParser.print_help(self, buf) buf = buf.getvalue().replace("ENDDESC", "\n\n" + commands.getvalue()[:-1]) fd.write(buf) self.parser = Parser(self, usage="usage: %prog [options] command [args]", version="%prog "+ VERSION, description="Query the Apt Xapian index.ENDDESC") self.parser.add_option("-s", "--sort", help="sort by the given value, as listed in %s. Add a '-' to reverse sort order" % axi.XAPIANDBVALUES) self.parser.add_option("--tags", action="store_true", help="show matching tags, rather than packages") self.parser.add_option("--tabcomplete", action="store", metavar="TYPE", help="suggest words for tab completion of the current command line (type is 'plain' or 'partial')") self.parser.add_option("--last", action="store_true", help="use 'show --last' to limit tab completion to only the packages from the last search results") self.parser.add_option("--all", action="store_true", help="disable pagination and always show all results. Note that search results are normally sorted by relevance, so you may find meaningless results at the end of the output") (opts, args) = self.parser.parse_args() self.opts = opts self.args = args if opts.tags and opts.all: self.parser.error("--tags conflicts with --all") def tabcomplete_query_args(self): if self.opts.tabcomplete == "partial" and self.args: queryargs = self.args[:-1] partial = self.args[-1] else: queryargs = self.args partial = None # Remove trailing boolean terms while queryargs and queryargs[-1].lower() in self.BOOLWORDS: queryargs = queryargs[:-1] return queryargs, partial def do_info(self, args): "info: print information about the apt-xapian-index environment" import time import datetime import axi.indexer # General info print "Main data directory:", axi.XAPIANDBPATH try: cur_timestamp = os.path.getmtime(axi.XAPIANDBSTAMP) cur_time = time.strftime("%c", time.localtime(cur_timestamp)) cur_time = "last updated: " + cur_time except e: cur_timestamp = 0 cur_time = "not available: " + str(e) print "Update timestamp: %s (%s)" % (axi.XAPIANDBSTAMP, cur_time) try: index_loc = open(axi.XAPIANINDEX).read().split(" ", 1)[1].strip() index_loc = "pointing to " + index_loc except e: index_loc = "not available: " + str(e) print "Index location: %s (%s)" % (axi.XAPIANINDEX, index_loc) def fileinfo(fname): if os.path.exists(fname): return fname else: return fname + " (available from next reindex)" print "Documentation of index contents:", fileinfo(axi.XAPIANDBDOC) print "Documentation of available prefixes:", fileinfo(axi.XAPIANDBPREFIXES) print "Documentation of available values:", fileinfo(axi.XAPIANDBVALUES) print "Plugin directory:", axi.PLUGINDIR # Aggregated plugin information # { name: { path=path, desc=desc, status=status } } plugins = dict() # Aggregated value information # { valuename: { val=num, desc=shortdesc, plugins=[plugin names] } } values, descs = axi.readValueDB() values = dict(((a, dict(val=b, desc=descs[a], plugins=[])) for a, b in values.iteritems())) # Aggregated data source information # { pathname: { desc=shortdesc, plugins=[plugin names] } } sources = dict() # Per-plugin info all_plugins_names = set((x for x in os.listdir(axi.PLUGINDIR) if x.endswith(".py"))) for plugin in axi.indexer.Plugins(): all_plugins_names.remove(plugin.filename) doc = plugin.obj.doc() or dict() # Last update status / whether an update is due # TODO: check if the data from the plugin is present in the index ts = plugin.info["timestamp"] if cur_timestamp == 0: status = "enabled, not indexed" elif ts == 0: status = "enabled, up to date" elif ts <= cur_timestamp: delta = datetime.timedelta(seconds=cur_timestamp-ts) status = "enabled, up to date (%s older than index)" % str(delta) else: delta = datetime.timedelta(seconds=ts-cur_timestamp) status = "enabled, needs indexing (%s newer than index)" % str(delta) desc = doc.get("shortDesc", "(description unavailable)") plugins[plugin.name] = dict( path=os.path.join(axi.PLUGINDIR, plugin.filename), desc=desc, status=status, ) # Aggregate info about values for v in plugin.info.get("values", []): name = v.get("name", None) if name is None: continue if name not in values: values[name] = dict(val=None, desc=v.get("desc", "(unknown"), plugins=[]) values[name].setdefault("plugins", []).append(plugin.name) # Data files used by the plugin for s in plugin.info.get("sources", []): path = s.get("path", "(unknown)") if path not in sources: sources[path] = dict(desc=s.get("desc", "(unknown)"), plugins=[]) sources[path].setdefault("plugins", []).append(plugin.name) # Disabled plugins for plugin in sorted(all_plugins_names): desc = doc.get("shortDesc", "(unavailable)") name = os.path.splitext(plugin)[0] plugins[name] = dict( path=os.path.join(axi.PLUGINDIR, plugin), desc="(plugin disabled, description unavailable)", status="disabled", ) # Plugin information # { name: { path=path, desc=desc, status=status } } #print "Plugins:" #maxname = max((len(x) for x in plugins.iterkeys())) #for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]): # print " %s:" % info["path"] # print " ", info["desc"] print "Plugin status:" maxname = max((len(x) for x in plugins.iterkeys())) for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]): print " ", name.ljust(maxname), info["status"] # Value information print "Values:" maxname = max((len(x) for x in values.iterkeys())) print " ", "Value".ljust(maxname), "Code", "Provided by" for name, val in sorted(values.iteritems(), key=lambda x:x[0]): plugins = val.get("plugins", []) if plugins: provider = ", ".join(plugins) else: provider = "update-apt-xapian-index" print " ", name.ljust(maxname), "%4d" % int(val["val"]), provider # Source files information print "Data sources:" maxpath = 0 maxdesc = 0 for k, v in sources.iteritems(): if len(k) > maxpath: maxpath = len(k) if len(v["desc"]) > maxdesc: maxdesc = len(v["desc"]) print " ", "Source".ljust(maxpath), "Description".ljust(maxdesc), "Used by" for path, info in sources.iteritems(): provider = ", ".join(info.get("plugins", [])) print " ", path.ljust(maxpath), info["desc"].ljust(maxdesc), provider return 0 def do_search(self, args): "search [terms]: start a new search" self.db = DB() self.db.set_query_args(args) if self.opts.sort: self.db.set_sort(self.opts.sort) else: self.db.set_sort(None) self.db.build_query() if self.opts.all: self.print_all_matches(self.db.get_all_matches()) else: self.print_matches(self.db.get_matches()) if not self.opts.tabcomplete: self.db.save() return 0 def complete_search(self): self.db = DB() if self.opts.tabcomplete == "partial" and len(self.args) == 1: # Simple prefix search terms = set() terms.update((str(term) for term in self.db.db.synonym_keys(self.args[0]))) terms.update((term.term for term in self.db.db.allterms(self.args[0]))) for term in sorted(terms): print term for term in self.db.db.allterms("XT" + self.args[0]): print term.term[2:] return 0 elif self.opts.tabcomplete == "plain" and not self.args: # Show a preset list of tags for facet in ["interface", "role", "use", "works-with"]: for term in self.db.db.allterms("XT" + facet + "::"): print term.term[2:] return 0 else: # Context-sensitive hints qargs, partial = self.tabcomplete_query_args() self.db.set_query_args(qargs) self.db.build_query() self.print_completions(self.db.get_matches()) return 0 def do_again(self, args): "again [query]: repeat the last search, possibly adding query terms" self.db = DB() self.db.set_query_args(args, secondary=True) if self.opts.sort: self.db.set_sort(self.opts.sort) else: self.db.set_sort(None) try: self.db.build_query() except SavedStateError, e: print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e)) return 1 self.print_matches(self.db.get_matches(first = 0)) if not self.opts.tabcomplete: self.db.save() return 0 def complete_again(self): self.db = DB() qargs, partial = self.tabcomplete_query_args() self.db.set_query_args(qargs, secondary=True) try: self.db.build_query() except SavedStateError, e: return 0 self.print_completions(self.db.get_matches(first = 0)) return 0 def do_last(self, args): "last [count]: show the last results again" self.db = DB() try: self.db.build_query() except SavedStateError, e: print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e)) return 1 count = int(args[0]) if args else 20 first = int(self.db.get_cache_last("lastfirst", 0)) self.print_matches(self.db.get_matches(first=first, count=count)) return 0 def do_more(self, args): "more [count]: show more terms from the last search" self.db = DB() try: self.db.build_query() except SavedStateError, e: print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e)) return 1 count = int(args[0]) if args else 20 self.print_matches(self.db.get_matches(count=count)) if not self.opts.tabcomplete: self.db.save() return 0 def do_show(self, args): "show pkgname[s]: run apt-cache show pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "show"] + args) def complete_show(self): self.db = DB() if self.opts.tabcomplete == "partial" and self.args: blacklist = set(self.args[:-1]) partial = self.args[-1] else: blacklist = set(self.args) partial = None if self.opts.last: # Expand from output of last query try: self.db.build_query() first = int(self.db.get_cache_last("lastfirst", 0)) for m in self.db.get_matches(first = first): pkg = m.document.get_data() if partial is not None and not pkg.startswith(partial): continue if pkg not in blacklist: print pkg except SavedStateError, e: return 0 else: # Prefix expand for term in self.db.db.allterms("XP" + (partial or "")): pkg = term.term[2:] if pkg not in blacklist: print pkg return 0 def do_showpkg(self, args): "showpkg pkgname[s]: run apt-cache showpkg pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "showpkg"] + args) complete_showpkg = complete_show def do_showsrc(self, args): "showsrc pkgname[s]: run apt-cache showsrc pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "showsrc"] + args) complete_showsrc = complete_show def do_depends(self, args): "depends pkgname[s]: run apt-cache depends pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "depends"] + args) complete_depends = complete_show def do_rdepends(self, args): "rdepends pkgname[s]: run apt-cache rdepends pkgname[s]" db = DB() for name in args: print name print "Reverse Depends:" for pfx in ("XRD", "XRR", "XRS", "XRE", "XRP", "XRB", "XRC"): for pkg in db.get_rdeps(name, pfx): print " ", pkg complete_rdepends = complete_show def do_rdetails(self, args): "rdetails pkgname[s]: show details of reverse relationships for the given packages" db = DB() for name in args: for pfx, tag in ( ("XRD", "dep"), ("XRR", "rec"), ("XRS", "sug"), ("XRE", "enh"), ("XRP", "pre"), ("XRB", "bre"), ("XRC", "con")): deps = list(db.get_rdeps(name, pfx)) if not deps: continue print name, tag, " ".join(deps) complete_rdetails = complete_show def do_policy(self, args): "policy pkgname[s]: run apt-cache policy pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "policy"] + args) complete_policy = complete_show def do_madison(self, args): "madison pkgname[s]: run apt-cache madison pkgname[s]" os.execvp("/usr/bin/apt-cache", ["apt-cache", "madison"] + args) complete_madison = complete_show def format_help(self, out): print >>out, "Commands:" itemlist = [] maxusage = 0 for k, v in sorted(self.__class__.__dict__.iteritems()): if not k.startswith("do_"): continue line = self.name + " " + v.__doc__ usage, desc = line.split(": ", 1) if len(usage) > maxusage: maxusage = len(usage) itemlist.append([usage, desc]) print >>out, " search commands:" for usage, desc in [x for x in itemlist if "run apt-cache" not in x[1]]: print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc) print >>out, " apt-cache front-ends:" for usage, desc in [x for x in itemlist if "run apt-cache" in x[1]]: print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc) def do_help(self, args): "help: show a summary of commands" self.parser.print_help() return 0 def clean_suggestions(self, eset): res = [] for r in eset: res.append(self.db.unprefix(r.term)) #res.sort() return res def print_completions(self, matches): if self.opts.tabcomplete == "partial": prefix = self.args[-1] if self.args else None exclude = self.args[:-1] else: prefix = None exclude = self.args exclude = [x for x in exclude if x.lower() not in self.BOOLWORDS] for s in self.clean_suggestions(self.db.get_suggestions(count=10, filter=DB.BasicFilter(stemmer=self.db.stem, exclude=exclude, prefix=prefix))): if s.startswith("tag:"): print s[4:] else: print s def print_matches(self, matches): if self.opts.tags: facets, tags = self.db.vocabulary() def describe_tag(tag): f, t = tag.split("::") try: fd = facets[f]["Description"].split("\n", 1)[0].strip() td = tags[tag]["Description"].split("\n", 1)[0].strip() return "%s: %s" % (fd, td) except KeyError: return None maxscore = None for res in self.db.get_suggestions(count=20, filter=DB.TagFilter()): # Normalise the score in the interval [0, 1] weight = math.log(res.weight) if maxscore == None: maxscore = weight score = float(weight) / maxscore tag = self.db.unprefix(res.term)[4:] desc = describe_tag(tag) if desc is None: print "%i%% %s" % (int(score * 100), tag) else: print "%i%% %s -- %s" % (int(score * 100), tag, desc) else: est = matches.get_matches_estimated() first = matches.get_firstitem() count = matches.size() print "%i results found." % est if count != 0: print "Results %i-%i:" % (first + 1, first + count) elif first != 0: print "No more results to show" self.print_all_matches((m for m in matches)) if first == 0: sc = self.db.get_spelling_correction() if sc: print "Did you mean:", sc, "?" sugg = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TermFilter(stemmer=self.db.stem, exclude=self.args))) print "More terms:", " ".join(sugg) stags = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TagFilter())) print "More tags:", " ".join([x[4:] for x in stags]) if first + count < est: print "`%s more' will give more results" % self.name if first > 0: print "`%s again' will restart the search" % self.name def print_all_matches(self, matches_iter): """ Given an iterator of Xapian matches, print them all out nicely formatted """ aptcache = apt.Cache(progress=AptSilentProgress()) for m in matches_iter: name = m.document.get_data() try: pkg = aptcache[name] except KeyError: pkg = None if pkg is not None and pkg.candidate: print "%i%% %s - %s" % (m.percent, name, pkg.candidate.summary) else: print "%i%% %s - (unknown by apt)" % (m.percent, name) def perform(self): self.cmd = "help" if not self.args else self.args.pop(0) if self.opts.tabcomplete is not None: f = getattr(self, "complete_" + self.cmd, None) if f is None: return 0 return f() else: f = getattr(self, "do_" + self.cmd, None) if f is None: print >>sys.stderr, "Invalid command: `%s'.\n" % self.cmd self.do_help(self.args) return 1 return f(self.args) if __name__ == "__main__": ui = Cmdline() if helponly: sys.exit(0) sys.exit(ui.perform())