diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index befda32..f0ef1a9 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -341,7 +341,7 @@ def _iter_unordered_manifests_for_path(self, path, recursive=False): elif recursive and gemato.util.path_starts_with(d, path): yield (k, d, v) - def _iter_manifests_for_path(self, path, recursive=False): + def _iter_manifests_for_path(self, path, recursive=False, sort_key=lambda kdv: len(kdv[1])): """ Iterate over loaded Manifests that can apply to path. If @recursive is True, returns also Manifests for subdirectories @@ -354,7 +354,7 @@ def _iter_manifests_for_path(self, path, recursive=False): return sorted( self._iter_unordered_manifests_for_path( path, recursive=recursive), - key=lambda kdv: len(kdv[1]), + key=sort_key, reverse=True) def load_manifests_for_path(self, path, recursive=False, verify=True): @@ -368,20 +368,38 @@ def load_manifests_for_path(self, path, recursive=False, verify=True): on mismatch. Otherwise, sub-Manifests will be loaded unconditionally of whether they match parent checksums. """ + for curmpath, relpath, m in self._iter_load_manifests_for_path( + path, recursive=recursive, verify=verify): + self.loaded_manifests[curmpath] = m + def _iter_load_manifests_for_path(self, path, recursive=False, verify=True, + sort_key=lambda kdv: kdv[1].split(os.sep)): + """ + Traverse manifests in depth-first order with directories sorted by + name. Only hold references to a minimum number of ManifestFile + instances, in order to conserve memory. + + The caller can traverse manifest and directory iterators in unison, + minimizing the amount of data in memory. + """ pool = multiprocessing.Pool(processes=self.max_jobs) + # Manifests pop from the stack in depth-first order + manifest_stack = list(self._iter_manifests_for_path(path, + recursive=recursive, sort_key=sort_key)) + traversed = set(curmpath for curmpath, relpath, m in manifest_stack) try: # TODO: figure out how to avoid confusing uses of 'recursive' - while True: + while manifest_stack: to_load = [] - for curmpath, relpath, m in self._iter_manifests_for_path( - path, recursive): - for e in m.entries: + curmpath, relpath, m = manifest_stack.pop() + yield (curmpath, relpath, m) + + for e in m.entries: if e.tag != 'MANIFEST': continue mpath = os.path.join(relpath, e.path) - if curmpath == mpath or mpath in self.loaded_manifests: + if curmpath == mpath or mpath in traversed: continue mdir = os.path.dirname(mpath) if not verify: @@ -390,12 +408,19 @@ def load_manifests_for_path(self, path, recursive=False, verify=True): to_load.append((mpath, e)) elif recursive and gemato.util.path_starts_with(mdir, path): to_load.append((mpath, e)) - if not to_load: - break manifests = pool.imap_unordered(self.manifest_loader, to_load, chunksize=16) - self.loaded_manifests.update(manifests) + + manifests = [(mpath, os.path.dirname(mpath), e) + for mpath, e in manifests] + + # Manifests pop from the stack in depth-first order + manifests.sort(key=sort_key, reverse=True) + for mpath, mdir, m in manifests: + traversed.add(mpath) + manifest_stack.append( + (mpath, os.path.dirname(mpath), m)) pool.close() pool.join() @@ -511,13 +536,54 @@ def get_file_entry_dict(self, path='', only_types=None, be verified against MANIFEST entries. Pass False only when doing updates. """ + out = {} + for dirpath, dirout in self._iter_file_entry_dict( + path=path, only_types=only_types, + verify_manifests=verify_manifests): + other = out.get(dirpath) + if other is None: + out[dirpath] = dirout + else: + # This happens due to the relpath = '' setting + # for all DIST entries. + for filename, e in dirout.items(): + if filename in other: + e = self._merge_entries(other[filename], e) + other[filename] = e + return out - self.load_manifests_for_path(path, recursive=True, - verify=verify_manifests) + @staticmethod + def _merge_entries(e1, e2): + # compare the two entries + ret, diff = gemato.verify.verify_entry_compatibility( + e1, e2) + if not ret: + raise gemato.exceptions.ManifestIncompatibleEntry( + e1, e2, diff) + # we need to construct a single entry with both checksums + if diff: + new_checksums = dict(e2.checksums) + for k, d1, d2 in diff: + if d2 is None: + new_checksums[k] = d1 + e1 = type(e1)(e1.path, e1.size, new_checksums) + return e1 + + def _iter_file_entry_dict(self, path='', only_types=None, + verify_manifests=True, + sort_key=lambda p: p.split(os.sep)): out = {} - for mpath, relpath, m in self._iter_manifests_for_path(path, - recursive=True): - for e in m.entries: + dir_stack = [path] + iter_load = self._iter_load_manifests_for_path(path, + recursive=True, verify=verify_manifests) + mpath, mdir, m = next(iter_load, (None, None, None)) + + while dir_stack or mdir is not None: + if not dir_stack or (mdir is not None and + sort_key(mdir) <= sort_key(dir_stack[-1])): + subdirs = [] + relpath = mdir + for e in m.entries: if only_types is not None: if e.tag not in only_types: continue @@ -533,23 +599,20 @@ def get_file_entry_dict(self, path='', only_types=None, if gemato.util.path_starts_with(fullpath, path): dirpath = os.path.dirname(fullpath) filename = os.path.basename(e.path) + subdirs.append(dirpath) dirout = out.setdefault(dirpath, {}) if filename in dirout: - # compare the two entries - ret, diff = gemato.verify.verify_entry_compatibility( - dirout[filename], e) - if not ret: - raise gemato.exceptions.ManifestIncompatibleEntry( - dirout[filename], e, diff) - # we need to construct a single entry with both checksums - if diff: - new_checksums = dict(e.checksums) - for k, d1, d2 in diff: - if d2 is None: - new_checksums[k] = d1 - e = type(e)(e.path, e.size, new_checksums) + e = self._merge_entries(dirout[filename], e) dirout[filename] = e - return out + subdirs.sort(key=sort_key, reverse=True) + dir_stack.extend(subdirs) + mpath, mdir, m = next(iter_load, (None, None, None)) + else: + dirpath = dir_stack.pop() + try: + yield dirpath, out.pop(dirpath) + except KeyError: + pass def assert_directory_verifies(self, path='', fail_handler=gemato.util.throw_exception, @@ -580,22 +643,83 @@ def assert_directory_verifies(self, path='', to None (the default), the number of system CPUs will be used. """ - entry_dict = self.get_file_entry_dict(path) + remaining_entries = {} + entry_iter = self._iter_file_entry_dict(path) it = os.walk(os.path.join(self.root_directory, path), onerror=gemato.util.throw_exception, followlinks=True) + sort_key = lambda p: p.split(os.sep) + dir_stack = [] def _walk_directory(it): """ Pre-process os.walk() result for verification. Yield objects suitable to passing to subprocesses. """ - for dirpath, dirnames, filenames in it: - relpath = os.path.relpath(dirpath, self.root_directory) - # strip dot to avoid matching problems - if relpath == '.': - relpath = '' - dirdict = entry_dict.pop(relpath, {}) + pop_until = None + entry_dir, entry_dict = next(entry_iter, (None, None)) + while True: + if pop_until is not None: + dirpath, dirnames, filenames, relpath = dir_stack.pop() + if pop_until is relpath: + pop_until = None + elif (dir_stack and entry_dir is not None and + gemato.util.path_starts_with(dir_stack[-1][-1], entry_dir)): + dirpath, dirnames, filenames, relpath = dir_stack.pop() + else: + try: + dirpath, dirnames, filenames = next(it) + except StopIteration: + while entry_dir is not None: + remaining_entries[entry_dir] = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + break + + relpath = os.path.relpath(dirpath, self.root_directory) + + # strip dot to avoid matching problems + if relpath == '.': + relpath = '' + + dirnames.sort() + + if relpath == entry_dir: + dirdict = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + elif entry_dir is not None and gemato.util.path_starts_with(relpath, entry_dir): + dirdict = {} + else: + relpath_key = sort_key(relpath) + if dir_stack and entry_dir is not None: + entry_dir_key = sort_key(entry_dir) + if relpath_key > entry_dir_key and entry_dir_key <= sort_key(dir_stack[-1][-1]): + # Try to insert it into the stack for later processing. + for i, item in enumerate(dir_stack): + if item[-1] and relpath_key > sort_key(item[-1]): + dir_stack.insert(i, (dirpath, dirnames, filenames, relpath)) + dirpath = None + break + if dirpath is None: + if pop_until is None: + pop_until = relpath + continue + while entry_dir is not None and relpath_key > sort_key(entry_dir): + remaining_entries[entry_dir] = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + + if relpath == entry_dir: + dirdict = entry_dict + entry_dir, entry_dict = next(entry_iter, (None, None)) + elif entry_dir is not None: + relpath_key = sort_key(relpath) + entry_dir_key = sort_key(entry_dir) + if relpath_key < entry_dir_key and len(relpath_key) <= len(entry_dir_key): + dir_stack.append((dirpath, dirnames, filenames, relpath)) + continue + else: + dirdict = {} + else: + dirdict = {} skip_dirs = [] for d in dirnames: @@ -643,7 +767,7 @@ def _walk_directory(it): pool.close() # check for missing directories - for relpath, dirdict in entry_dict.items(): + for relpath, dirdict in remaining_entries.items(): for f, e in dirdict.items(): fpath = os.path.join(relpath, f) syspath = os.path.join(self.root_directory, fpath)