From a0ce724dca8c05dcb4da17c8760952a15bbc44bd Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 4 Dec 2017 02:34:56 -0800 Subject: [PATCH] recursiveloader: use less memory in assert_directory_verifies In order to avoid loading all manifest entries into memory at once, make assert_directory_verifies use a shared sort_key to iterate over directories and manifest entries in the same order. For verification of the entire gentoo tree, my tests have shown that this change reduces the memory footprint by about 63%, while consuming about 20% more time. --- gemato/recursiveloader.py | 198 +++++++++++++++++++++++++++++++------- 1 file changed, 161 insertions(+), 37 deletions(-) diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index befda32..f0ef1a9 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -341,7 +341,7 @@ def _iter_unordered_manifests_for_path(self, path, recursive=False): elif recursive and gemato.util.path_starts_with(d, path): yield (k, d, v) - def _iter_manifests_for_path(self, path, recursive=False): + def _iter_manifests_for_path(self, path, recursive=False, sort_key=lambda kdv: len(kdv[1])): """ Iterate over loaded Manifests that can apply to path. If @recursive is True, returns also Manifests for subdirectories @@ -354,7 +354,7 @@ def _iter_manifests_for_path(self, path, recursive=False): return sorted( self._iter_unordered_manifests_for_path( path, recursive=recursive), - key=lambda kdv: len(kdv[1]), + key=sort_key, reverse=True) def load_manifests_for_path(self, path, recursive=False, verify=True): @@ -368,20 +368,38 @@ def load_manifests_for_path(self, path, recursive=False, verify=True): on mismatch. Otherwise, sub-Manifests will be loaded unconditionally of whether they match parent checksums. """ + for curmpath, relpath, m in self._iter_load_manifests_for_path( + path, recursive=recursive, verify=verify): + self.loaded_manifests[curmpath] = m + def _iter_load_manifests_for_path(self, path, recursive=False, verify=True, + sort_key=lambda kdv: kdv[1].split(os.sep)): + """ + Traverse manifests in depth-first order with directories sorted by + name. Only hold references to a minimum number of ManifestFile + instances, in order to conserve memory. + + The caller can traverse manifest and directory iterators in unison, + minimizing the amount of data in memory. + """ pool = multiprocessing.Pool(processes=self.max_jobs) + # Manifests pop from the stack in depth-first order + manifest_stack = list(self._iter_manifests_for_path(path, + recursive=recursive, sort_key=sort_key)) + traversed = set(curmpath for curmpath, relpath, m in manifest_stack) try: # TODO: figure out how to avoid confusing uses of 'recursive' - while True: + while manifest_stack: to_load = [] - for curmpath, relpath, m in self._iter_manifests_for_path( - path, recursive): - for e in m.entries: + curmpath, relpath, m = manifest_stack.pop() + yield (curmpath, relpath, m) + + for e in m.entries: if e.tag != 'MANIFEST': continue mpath = os.path.join(relpath, e.path) - if curmpath == mpath or mpath in self.loaded_manifests: + if curmpath == mpath or mpath in traversed: continue mdir = os.path.dirname(mpath) if not verify: @@ -390,12 +408,19 @@ def load_manifests_for_path(self, path, recursive=False, verify=True): to_load.append((mpath, e)) elif recursive and gemato.util.path_starts_with(mdir, path): to_load.append((mpath, e)) - if not to_load: - break manifests = pool.imap_unordered(self.manifest_loader, to_load, chunksize=16) - self.loaded_manifests.update(manifests) + + manifests = [(mpath, os.path.dirname(mpath), e) + for mpath, e in manifests] + + # Manifests pop from the stack in depth-first order + manifests.sort(key=sort_key, reverse=True) + for mpath, mdir, m in manifests: + traversed.add(mpath) + manifest_stack.append( + (mpath, os.path.dirname(mpath), m)) pool.close() pool.join() @@ -511,13 +536,54 @@ def get_file_entry_dict(self, path='', only_types=None, be verified against MANIFEST entries. Pass False only when doing updates. """ + out = {} + for dirpath, dirout in self._iter_file_entry_dict( + path=path, only_types=only_types, + verify_manifests=verify_manifests): + other = out.get(dirpath) + if other is None: + out[dirpath] = dirout + else: + # This happens due to the relpath = '' setting + # for all DIST entries. + for filename, e in dirout.items(): + if filename in other: + e = self._merge_entries(other[filename], e) + other[filename] = e + return out - self.load_manifests_for_path(path, recursive=True, - verify=verify_manifests) + @staticmethod + def _merge_entries(e1, e2): + # compare the two entries + ret, diff = gemato.verify.verify_entry_compatibility( + e1, e2) + if not ret: + raise gemato.exceptions.ManifestIncompatibleEntry( + e1, e2, diff) + # we need to construct a single entry with both checksums + if diff: + new_checksums = dict(e2.checksums) + for k, d1, d2 in diff: + if d2 is None: + new_checksums[k] = d1 + e1 = type(e1)(e1.path, e1.size, new_checksums) + return e1 + + def _iter_file_entry_dict(self, path='', only_types=None, + verify_manifests=True, + sort_key=lambda p: p.split(os.sep)): out = {} - for mpath, relpath, m in self._iter_manifests_for_path(path, - recursive=True): - for e in m.entries: + dir_stack = [path] + iter_load = self._iter_load_manifests_for_path(path, + recursive=True, verify=verify_manifests) + mpath, mdir, m = next(iter_load, (None, None, None)) + + while dir_stack or mdir is not None: + if not dir_stack or (mdir is not None and + sort_key(mdir) <= sort_key(dir_stack[-1])): + subdirs = [] + relpath = mdir + for e in m.entries: if only_types is not None: if e.tag not in only_types: continue @@ -533,23 +599,20 @@ def get_file_entry_dict(self, path='', only_types=None, if gemato.util.path_starts_with(fullpath, path): dirpath = os.path.dirname(fullpath) filename = os.path.basename(e.path) + subdirs.append(dirpath) dirout = out.setdefault(dirpath, {}) if filename in dirout: - # compare the two entries - ret, diff = gemato.verify.verify_entry_compatibility( - dirout[filename], e) - if not ret: - raise gemato.exceptions.ManifestIncompatibleEntry( - dirout[filename], e, diff) - # we need to construct a single entry with both checksums - if diff: - new_checksums = dict(e.checksums) - for k, d1, d2 in diff: - if d2 is None: - new_checksums[k] = d1 - e = type(e)(e.path, e.size, new_checksums) + e = self._merge_entries(dirout[filename], e) dirout[filename] = e - return out + subdirs.sort(key=sort_key, reverse=True) + dir_stack.extend(subdirs) + mpath, mdir, m = next(iter_load, (None, None, None)) + else: + dirpath = dir_stack.pop() + try: + yield dirpath, out.pop(dirpath) + except KeyError: + pass def assert_directory_verifies(self, path='', fail_handler=gemato.util.throw_exception, @@ -580,22 +643,83 @@ def assert_directory_verifies(self, path='', to None (the default), the number of system CPUs will be used. """ - entry_dict = self.get_file_entry_dict(path) + remaining_entries = {} + entry_iter = self._iter_file_entry_dict(path) it = os.walk(os.path.join(self.root_directory, path), onerror=gemato.util.throw_exception, followlinks=True) + sort_key = lambda p: p.split(os.sep) + dir_stack = [] def _walk_directory(it): """ Pre-process os.walk() result for verification. Yield objects suitable to passing to subprocesses. """ - for dirpath, dirnames, filenames in it: - relpath = os.path.relpath(dirpath, self.root_directory) - # strip dot to avoid matching problems - if relpath == '.': - relpath = '' - dirdict = entry_dict.pop(relpath, {}) + pop_until = None + entry_dir, entry_dict = next(entry_iter, (None, None)) + while True: + if pop_until is not None: + dirpath, dirnames, filenames, relpath = dir_stack.pop() + if pop_until is relpath: + pop_until = None + elif (dir_stack and entry_dir is not None and + gemato.util.path_starts_with(dir_stack[-1][-1], entry_dir)): + dirpath, dirnames, filenames, relpath = dir_stack.pop() + else: + try: + dirpath, dirnames, filenames = next(it) + except StopIteration: + while entry_dir is not None: + remaining_entries[entry_dir] = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + break + + relpath = os.path.relpath(dirpath, self.root_directory) + + # strip dot to avoid matching problems + if relpath == '.': + relpath = '' + + dirnames.sort() + + if relpath == entry_dir: + dirdict = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + elif entry_dir is not None and gemato.util.path_starts_with(relpath, entry_dir): + dirdict = {} + else: + relpath_key = sort_key(relpath) + if dir_stack and entry_dir is not None: + entry_dir_key = sort_key(entry_dir) + if relpath_key > entry_dir_key and entry_dir_key <= sort_key(dir_stack[-1][-1]): + # Try to insert it into the stack for later processing. + for i, item in enumerate(dir_stack): + if item[-1] and relpath_key > sort_key(item[-1]): + dir_stack.insert(i, (dirpath, dirnames, filenames, relpath)) + dirpath = None + break + if dirpath is None: + if pop_until is None: + pop_until = relpath + continue + while entry_dir is not None and relpath_key > sort_key(entry_dir): + remaining_entries[entry_dir] = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + + if relpath == entry_dir: + dirdict = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + elif entry_dir is not None: + relpath_key = sort_key(relpath) + entry_dir_key = sort_key(entry_dir) + if relpath_key < entry_dir_key and len(relpath_key) <= len(entry_dir_key): + dir_stack.append((dirpath, dirnames, filenames, relpath)) + continue + else: + dirdict = {} + else: + dirdict = {} skip_dirs = [] for d in dirnames: @@ -643,7 +767,7 @@ def _walk_directory(it): pool.close() # check for missing directories - for relpath, dirdict in entry_dict.items(): + for relpath, dirdict in remaining_entries.items(): for f, e in dirdict.items(): fpath = os.path.join(relpath, f) syspath = os.path.join(self.root_directory, fpath)