# Copyright (C) 2011 Canonical # # Authors: # Michael Vogt # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; version 3. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA from __future__ import print_function import apt from apt.cache import FetchFailedException import apt_pkg import difflib import glob import hashlib import logging import os import re import shutil import stat import subprocess import tarfile import tempfile import time from io import BytesIO, open if "APT_CLONE_DEBUG_RESOLVER" in os.environ: apt_pkg.config.set("Debug::pkgProblemResolver", "1") apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1") class LowLevelCommands(object): """ calls to the lowlevel operations to install debs or repack a deb """ dpkg_repack = "/usr/bin/dpkg-repack" def install_debs(self, debfiles, targetdir): if not debfiles: return True install_cmd = ["dpkg", "-i"] if targetdir != "/": install_cmd.insert(0, "chroot") install_cmd.insert(1, targetdir) ret = subprocess.call(install_cmd + debfiles) return (ret == 0) def repack_deb(self, pkgname, targetdir): """ dpkg-repack pkgname into targetdir """ if not os.path.exists(self.dpkg_repack): raise IOError("no '%s' found" % self.dpkg_repack) repack_cmd = [self.dpkg_repack] if not os.getuid() == 0: if not os.path.exists("/usr/bin/fakeroot"): return repack_cmd = ["fakeroot", "-u"] + repack_cmd ret = subprocess.call(repack_cmd + [pkgname], cwd=targetdir) return (ret == 0) def debootstrap(self, targetdir, distro=None): if distro is None: import lsb_release distro = lsb_release.get_distro_information()['CODENAME'] ret = subprocess.call(["debootstrap", distro, targetdir]) return (ret == 0) def merge_keys(self, fromkeyfile, intokeyfile): ret = subprocess.call(['apt-key', '--keyring', intokeyfile, 'add', fromkeyfile]) return (ret == 0) def bind_mount(self, olddir, newdir): ret = subprocess.call(["mount", "--bind", olddir, newdir]) return (ret == 0) def bind_umount(self, binddir): ret = subprocess.call(["umount", binddir]) return (ret == 0) class AptClone(object): """ clone the package selection/installation of a existing system using the information that apt provides If dpkg-repack is installed, it will be used to generate debs for the obsolete ones. """ CLONE_FILENAME = "apt-clone-state-%s.tar.gz" % os.uname()[1] TARPREFIX = "./" def __init__(self, fetch_progress=None, install_progress=None, cache_cls=None): self.not_downloadable = set() self.version_mismatch = set() self.commands = LowLevelCommands() # fetch if fetch_progress: self.fetch_progress = fetch_progress else: self.fetch_progress = apt.progress.text.AcquireProgress() # install if install_progress: self.install_progress = install_progress else: self.install_progress = apt.progress.base.InstallProgress() # FIXME: SIIIIILLLLLLLYYYYYYYYY use mock.patch instead to patch # the apt.Cache() to a mock # cache class (e.g. apt.Cache) if cache_cls: self._cache_cls = cache_cls else: self._cache_cls = apt.Cache # save def save_state(self, sourcedir, target, with_dpkg_repack=False, with_dpkg_status=False, scrub_sources=False): """ save the current system state (installed pacakges, enabled repositories ...) into the apt-state.tar.gz file in targetdir """ if os.path.isdir(target): target = os.path.join(target, self.CLONE_FILENAME) else: if not target.endswith(".tar.gz"): target += ".apt-clone.tar.gz" if sourcedir != '/': apt_pkg.init_config() apt_pkg.config.set("Dir", sourcedir) apt_pkg.config.set("Dir::State::status", os.path.join(sourcedir, 'var/lib/dpkg/status')) apt_pkg.init_system() with tarfile.open(name=target, mode="w:gz") as tar: self._write_uname(tar) self._write_state_installed_pkgs(sourcedir, tar) self._write_state_auto_installed(tar) self._write_state_sources_list(tar, scrub_sources) self._write_state_apt_preferences(tar) self._write_state_apt_keyring(tar) if with_dpkg_status: self._write_state_dpkg_status(tar) if with_dpkg_repack: self._dpkg_repack(tar) def _get_host_info_dict(self): # not really uname host_info = { 'hostname' : os.uname()[1], 'kernel' : os.uname()[2], 'uname_arch' : os.uname()[4], 'arch' : apt_pkg.config.find("APT::Architecture") } return host_info def _write_uname(self, tar): host_info = self._get_host_info_dict() # save it f = tempfile.NamedTemporaryFile(mode='w') info = "\n".join(["%s: %s" % (key, value) for (key, value) in host_info.items()]) f.write(info+"\n") f.flush() tar.add(f.name, arcname="./var/lib/apt-clone/uname") def _write_state_installed_pkgs(self, sourcedir, tar): cache = self._cache_cls(rootdir=sourcedir) s = "" for pkg in cache: if pkg.is_installed: # a version identifies the pacakge s += "%s %s %s\n" % ( pkg.name, pkg.installed.version, int(pkg.is_auto_installed)) if not pkg.candidate or not pkg.candidate.downloadable: self.not_downloadable.add(pkg.name) elif not (pkg.installed.downloadable and pkg.candidate.downloadable): self.version_mismatch.add(pkg.name) # store the installed.pkgs tarinfo = tarfile.TarInfo("./var/lib/apt-clone/installed.pkgs") s = s.encode('utf-8') tarinfo.size = len(s) tarinfo.mtime = time.time() tar.addfile(tarinfo, BytesIO(s)) def _write_state_dpkg_status(self, tar): # store dpkg-status, this is not strictly needed as installed.pkgs # should contain all we need, but we still keep it for debugging # reasons dpkg_status = apt_pkg.config.find_file("dir::state::status") tar.add(dpkg_status, arcname="./var/lib/apt-clone/dpkg-status") def _write_state_auto_installed(self, tar): extended_states = apt_pkg.config.find_file( "Dir::State::extended_states") if os.path.exists(extended_states): tar.add(extended_states, "./var/lib/apt-clone/extended_states") def _write_state_apt_preferences(self, tar): f = apt_pkg.config.find_file("Dir::Etc::preferences") if os.path.exists(f): tar.add(f, arcname="./etc/apt/preferences") p = apt_pkg.config.find_dir("Dir::Etc::preferencesparts", "/etc/apt/preferences.d") if os.path.exists(p): tar.add(p, arcname="./etc/apt/preferences.d") def _write_state_apt_keyring(self, tar): f = apt_pkg.config.find_file("Dir::Etc::trusted") if os.path.exists(f): tar.add(f, arcname="./etc/apt/trusted.gpg") p = apt_pkg.config.find_dir("Dir::Etc::trustedparts", "/etc/apt/trusted.gpg.d") if os.path.exists(p): tar.add(p, arcname="./etc/apt/trusted.gpg.d") def _write_state_sources_list(self, tar, scrub=False): sources_list = apt_pkg.config.find_file("Dir::Etc::sourcelist") self._add_file_to_tar_with_password_check(tar, sources_list, scrub, "./etc/apt/sources.list") source_parts = apt_pkg.config.find_dir("Dir::Etc::sourceparts") if os.path.exists(source_parts): tar.add(source_parts, arcname="./etc/apt/sources.list.d", recursive=False) for source in os.listdir(source_parts): if source.startswith('.'): continue if not source.endswith('.list'): continue sources_file_name = '%s/%s' % (source_parts, source) if os.path.isdir(sources_file_name): continue self._add_file_to_tar_with_password_check(tar, sources_file_name, scrub, "./etc/apt/sources.list.d/"+source) def _add_file_to_tar_with_password_check(self, tar, sources, scrub, arcname): if scrub: with tempfile.NamedTemporaryFile(mode='wb') as source_copy, open(sources, 'rb') as f: for line in f.readlines(): # compat with both py2/py3 if type(line) is bytes: line = line.decode("UTF-8") if re.search('/[^/@:]*:[^/@:]*@', line): line = re.sub('/[^/@:]*:[^/@:]*@', '/USERNAME:PASSWORD@', line) # tempfile.NamedTemporaryFile cannot easily be made to # open in Unicode mode in Python 2. We can remove this # once ubuntu-release-upgrader is run under Python 3 # (i.e. after Ubuntu 14.04). #if line is not bytes: # line = line.encode("UTF-8") source_copy.write(line.encode("utf-8")) source_copy.flush() tar.add(source_copy.name, arcname=arcname) else: tar.add(sources, arcname=arcname) def _write_modified_files_from_etc(self, tar): #etcdir = os.path.join(apt_pkg.config.get("Dir"), "etc") pass def _dpkg_repack(self, tar): tdir = tempfile.mkdtemp() for pkgname in self.not_downloadable: self.commands.repack_deb(pkgname, tdir) tar.add(tdir, arcname="./var/lib/apt-clone/debs") shutil.rmtree(tdir) #print(tdir) # detect prefix def _detect_tarprefix(self, tar): #print(tar.getnames()) if tar.getnames()[-1].startswith("./"): self.TARPREFIX = "./" else: self.TARPREFIX = "" # info def _get_info_distro(self, statefile): with tarfile.open(statefile) as tar: self._detect_tarprefix(tar) # guess distro infos f = tar.extractfile(self.TARPREFIX+"etc/apt/sources.list") for line in f.readlines(): line = line.decode("utf-8") if line.startswith("#") or line.strip() == "": continue l = line.split() if len(l) > 2 and not l[2].endswith("/"): return l[2] return None def _get_clone_info_dict(self, statefile): distro = self._get_info_distro(statefile) or "unknown" # nr installed with tarfile.open(statefile) as tar: f = tar.extractfile( self.TARPREFIX + "var/lib/apt-clone/installed.pkgs") installed = autoinstalled = 0 meta = [] for line in f.readlines(): line = line.decode("utf-8") (name, version, auto) = line.strip().split() installed += 1 if int(auto): autoinstalled += 1 # FIXME: this is a bad way to figure out about the # meta-packages if name.endswith("-desktop"): meta.append(name) # date m = tar.getmember( self.TARPREFIX + "var/lib/apt-clone/installed.pkgs") date = m.mtime # check hostname (if found) hostname = "unknown" arch = "unknown" if self.TARPREFIX+"var/lib/apt-clone/uname" in tar.getnames(): info = tar.extractfile( self.TARPREFIX + "var/lib/apt-clone/uname").read() section = apt_pkg.TagSection(info) hostname = section.get("hostname", "unknown") arch = section.get("arch", "unknown") return { 'hostname' : hostname, 'distro' : distro, 'meta' : ", ".join(meta), 'installed' : installed, 'autoinstalled' : autoinstalled, 'date' : time.ctime(date), 'arch' : arch, } def info(self, statefile): return "Hostname: %(hostname)s\n"\ "Arch: %(arch)s\n"\ "Distro: %(distro)s\n"\ "Meta: %(meta)s\n"\ "Installed: %(installed)s pkgs (%(autoinstalled)s automatic)\n"\ "Date: %(date)s\n" % self._get_clone_info_dict(statefile) # show-diff def _get_file_diff_against_clone(self, statefile, system_file, targetdir): with tarfile.open(statefile) as tar: self._detect_tarprefix(tar) clone_file = tar.extractfile(self.TARPREFIX+system_file[1:]) clone_file_lines = [] # FIXME: is there a better way for this? something to tell # tarfile that really its all utf8? for line in clone_file.readlines(): clone_file_lines.append(line.decode("utf-8")) system_file = targetdir+system_file if os.path.exists(system_file): with open(system_file) as fp: system_file_lines = fp.readlines() else: system_file_lines = [] gen = difflib.unified_diff( system_file_lines, clone_file_lines, fromfile="current-system%s" % system_file, tofile=system_file) diff = [] for line in gen: diff.append(line) return diff def show_diff(self, statefile, targetdir="/"): if targetdir != "/": apt_pkg.config.set("DPkg::Chroot-Directory", targetdir) # show info/uname diff print("Clone info differences: ") host_info = self._get_host_info_dict() clone_info = self._get_clone_info_dict(statefile) for key in host_info: if host_info.get(key, None) != clone_info.get(key, None): print(" '%s': clone='%s' system='%s'" % ( key, clone_info.get(key, None), host_info.get(key, None))) print("") # show sources.list{,.d} diff sources_list_system = "/etc/apt/sources.list" diff = self._get_file_diff_against_clone( statefile, sources_list_system, targetdir) if diff: print("".join(diff)) # FIXME: do sources.list.d diff too # FIXME: do apt-keyring diff #self._restore_package_selection(statefile, targetdir, protect_installed) # create new cache in the rootdir cache = self._cache_cls(rootdir=targetdir) with tarfile.open(statefile) as tar: f = tar.extractfile( self.TARPREFIX + "var/lib/apt-clone/installed.pkgs") # get the data installed_in_clone = {} for line in f.readlines(): line = line.strip().decode('utf-8') if line.startswith("#") or line == "": continue (name, version, auto) = line.split() installed_in_clone[name] = (version, auto) installed_on_system = {} for pkg in cache: if not pkg.installed: continue installed_on_system[pkg.name] = ( pkg.installed.version, str(pkg.is_auto_installed)) only_on_system = set(installed_on_system.keys()) - set(installed_in_clone.keys()) if only_on_system: print("Installed on the system but not in the clone-file:") print(" ".join(sorted(only_on_system))) print("\n") only_in_clone = set(installed_in_clone.keys()) - set(installed_on_system.keys()) if only_in_clone: print("Installed in the clone-file but not in the system:") print(" ".join(sorted(only_in_clone))) print("\n") # show version differences pkgversion_differences = set() for pkgname in sorted(installed_in_clone): if not pkgname in installed_on_system: continue clone_file_pkgversion, clone_is_auto = installed_in_clone[pkgname] system_pkgversion, sys_is_auto = installed_on_system[pkgname] if clone_file_pkgversion != system_pkgversion: pkgversion_differences.add( (pkgname, clone_file_pkgversion, system_pkgversion)) if pkgversion_differences: print("Version differences: ") print("Pkgname ") for pkgname, clone_ver, system_ver in pkgversion_differences: print(" %s <%s> <%s>" % (pkgname, clone_ver, system_ver)) # restore def restore_state(self, statefile, targetdir="/", new_distro=None, protect_installed=False): """ take a statefile produced via (like apt-state.tar.gz) save_state() and restore the packages/repositories into targetdir (that is usually "/") """ if targetdir != "/": apt_pkg.config.set("DPkg::Chroot-Directory", targetdir) self.commands.bind_mount("/proc", os.path.join(targetdir, "proc")) self.commands.bind_mount("/sys", os.path.join(targetdir, "sys")) # detect prefix with tarfile.open(statefile) as tar: self._detect_tarprefix(tar) if not os.path.exists(targetdir): print("Dir '%s' does not exist, need to bootstrap first" % targetdir) distro = self._get_info_distro(statefile) self.commands.debootstrap(targetdir, distro) self._restore_sources_list(statefile, targetdir) self._restore_apt_keyring(statefile, targetdir) if new_distro: self._rewrite_sources_list(targetdir, new_distro) self._restore_package_selection(statefile, targetdir, protect_installed) # FIXME: this needs to check if there are conflicts, e.g. via # gdebi self._restore_not_downloadable_debs(statefile, targetdir) # and umount again if targetdir != "/": self.commands.bind_umount(os.path.join(targetdir, "proc")) self.commands.bind_umount(os.path.join(targetdir, "sys")) # simulate restore and return list of missing pkgs def simulate_restore_state(self, statefile, new_distro=None): # create tmp target (with host system dpkg-status) to simulate in target = tempfile.mkdtemp() dpkg_status = apt_pkg.config.find_file("dir::state::status") if not os.path.exists(target+os.path.dirname(dpkg_status)): os.makedirs(target+os.path.dirname(dpkg_status)) shutil.copy(dpkg_status, target+dpkg_status) # restore sources.list and update cache in tmp target self._restore_sources_list(statefile, target) # optionally rewrite on new distro if new_distro: self._rewrite_sources_list(target, new_distro) cache = self._cache_cls(rootdir=target) try: cache.update(apt.progress.base.AcquireProgress()) except FetchFailedException: # This cannot be resolved here, but it should not be interpreted as # a fatal error. pass cache.open() # try to replay cache and see thats missing missing = self._restore_package_selection_in_cache(statefile, cache) shutil.rmtree(target) return missing def _restore_sources_list(self, statefile, targetdir): with tarfile.open(statefile) as tar: existing = os.path.join(targetdir, "etc", "apt", "sources.list") if os.path.exists(existing): shutil.copy(existing, '%s.apt-clone' % existing) tar.extract(self.TARPREFIX+"etc/apt/sources.list", targetdir) td_sources = os.path.join(targetdir, "etc", "apt", "sources.list") os.chmod(td_sources, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) try: tar.extract(self.TARPREFIX+"etc/apt/sources.list.d", targetdir) except KeyError: pass def _restore_apt_keyring(self, statefile, targetdir): existing = os.path.join(targetdir, "etc", "apt", "trusted.gpg") backup = '%s.apt-clone' % existing if os.path.exists(existing): shutil.copy(existing, backup) with tarfile.open(statefile) as tar: try: tar.extract(self.TARPREFIX+"etc/apt/trusted.gpg", targetdir) except KeyError: pass try: tar.extract(self.TARPREFIX+"etc/apt/trusted.gpg.d", targetdir) except KeyError: pass if os.path.exists(backup): self.commands.merge_keys(backup, existing) os.remove(backup) def _restore_package_selection_in_cache(self, statefile, cache, protect_installed=False): # reinstall packages missing = set() pkgs = set() # procted installed pkgs resolver = apt_pkg.ProblemResolver(cache._depcache) if protect_installed: for pkg in cache: if pkg.is_installed: resolver.protect(pkg._pkg) # get the installed.pkgs data with tarfile.open(statefile) as tar: f = tar.extractfile( self.TARPREFIX + "var/lib/apt-clone/installed.pkgs") # the actiongroup will help libapt to speed up the following loop with cache.actiongroup(): for line in f.readlines(): line = line.strip().decode('utf-8') if line.startswith("#") or line == "": continue (name, version, auto) = line.split() pkgs.add(name) auto_installed = int(auto) from_user = not auto_installed if name in cache: try: # special mode, most useful for release-upgrades if protect_installed: cache[name].mark_install(from_user=from_user, auto_fix=False) if cache.broken_count > 0: resolver.resolve() if not cache[name].marked_install: raise SystemError("pkg %s not marked upgrade" % name) else: # normal mode, this assume the system is consistent cache[name].mark_install(from_user=from_user) except SystemError as e: logging.warning("can't add %s (%s)" % (name, e)) missing.add(name) # ensure the auto install info is cache[name].mark_auto(auto_installed) # check what is broken and try to fix if cache.broken_count > 0: resolver.resolve() # now go over and see what is missing for pkg in pkgs: if not pkg in cache: missing.add(pkg) continue if not (cache[pkg].is_installed or cache[pkg].marked_install): missing.add(pkg) return missing def _restore_package_selection(self, statefile, targetdir, protect_installed): # create new cache cache = self._cache_cls(rootdir=targetdir) # python-apt Cache(rootdir=) will mangle dir::bin, fix that apt.apt_pkg.config.set("Dir::Bin", "/") apt.apt_pkg.config.set("Dir::Bin::dpkg", "/usr/bin/dpkg") try: cache.update(self.fetch_progress) except FetchFailedException: # This cannot be resolved here, but it should not be interpreted as # a fatal error. pass cache.open() self._restore_package_selection_in_cache(statefile, cache, protect_installed) # do it cache.commit(self.fetch_progress, self.install_progress) def _restore_not_downloadable_debs(self, statefile, targetdir): with tarfile.open(statefile) as tar: try: debsdir = [ tarinfo for tarinfo in tar.getmembers() if tarinfo.name.startswith(self.TARPREFIX+"var/lib/apt-clone/debs/")] tar.extractall(targetdir,debsdir) except KeyError: return debs = [] path = os.path.join(targetdir, "./var/lib/apt-clone/debs") for deb in glob.glob(os.path.join(path, "*.deb")): debpath = os.path.join(path, deb) debs.append(debpath) self.commands.install_debs(debs, targetdir) def _rewrite_sources_list(self, targetdir, new_distro): from aptsources.sourceslist import SourcesList, SourceEntry apt_pkg.config.set( "Dir::Etc::sourcelist", os.path.abspath(os.path.join(targetdir, "etc", "apt", "sources.list"))) apt_pkg.config.set( "Dir::Etc::sourceparts", os.path.abspath(os.path.join(targetdir, "etc", "apt", "sources.list.d"))) sources = SourcesList() for entry in sources.list[:]: if entry.invalid or entry.disabled: continue replacement = '' for pocket in ('updates', 'security', 'backports'): if entry.dist.endswith('-%s' % pocket): replacement = '%s-%s' % (new_distro, pocket) break if replacement: entry.dist = replacement else: entry.dist = new_distro existing = os.path.join(targetdir, "etc", "apt", "sources.list.apt-clone") sourcelist = apt_pkg.config.find_file("Dir::Etc::sourcelist") if os.path.exists(existing): with open(existing, 'r') as fp: for line in fp: src = SourceEntry(line, sourcelist) if (src.invalid or src.disabled) or src not in sources: sources.list.append(src) os.remove(existing) for entry in sources.list: if entry.uri.startswith('cdrom:'): # Make sure CD entries come first. sources.list.remove(entry) sources.list.insert(0, entry) entry.disabled = True sources.save() def _find_unowned_in_etc(self, sourcedir=""): if sourcedir: etcdir = os.path.join(sourcedir, "etc") else: etcdir = "/etc" # get all the files that dpkg "owns" owned = set() dpkg_basedir = os.path.dirname(apt_pkg.config.get("Dir::State::status")) for f in glob.glob(os.path.join(dpkg_basedir, "info", "*.list")): with open(f, encoding="utf-8") as fp: for line in fp: if line.startswith("/etc/"): owned.add(line.strip()) # now go over etc unowned = set() for dirpath, dirnames, filenames in os.walk(etcdir): for name in filenames: fullname = os.path.join(dirpath[len(sourcedir):], name) if not fullname in owned: unowned.add(fullname) return unowned def _find_modified_conffiles(self, sourcedir="/"): dpkg_status = sourcedir+apt_pkg.config.find("Dir::State::status") modified = set() # iterate dpkg-status file with open(dpkg_status) as fp: tag = apt_pkg.TagFile(fp) for entry in tag: if "conffiles" in entry: for line in entry["conffiles"].split("\n"): obsolete = None if len(line.split()) == 3: name, md5sum, obsolete = line.split() else: name, md5sum = line.split() # update path = sourcedir+name md5sum = md5sum.strip() # ignore oboslete conffiles if obsolete == "obsolete": continue # user removed conffile if not os.path.exists(path): logging.debug("conffile %s removed" % path) modified.add(path) continue # check content md5 = hashlib.md5() with open(path, 'rb') as fp: md5.update(fp.read()) if md5.hexdigest() != md5sum: logging.debug("conffile %s (%s != %s)" % ( path, md5.hexdigest(), md5sum)) modified.add(path) return modified def _dump_debconf_database(self, sourcedir): print("not implemented yet") # debconf-copydb configdb newdb --config=Name:newdb --config=Driver:File --config=Filename:/tmp/lala.db # # debconf-copydb newdb configdb --config=Name:newdb --config=Driver:File --config=Filename:/tmp/lala.db # # dump to text with: # debconf-copydb configdb pipe --config=Name:pipe # --config=Driver:Pipe --config=InFd:none # # restore from text with: # ssh remotehost debconf-copydb pipe configdb --config=Name:pipe --config=Driver:Pipe