From 6822f21a404d4673a344d719e014396ecea9b0ab Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Tue, 12 Aug 2025 14:27:46 -0700 Subject: [PATCH 1/6] Move root-log parser outside of the Analyzer class --- content_resolver/analyzer.py | 314 ++++++++++++++++++----------------- 1 file changed, 160 insertions(+), 154 deletions(-) diff --git a/content_resolver/analyzer.py b/content_resolver/analyzer.py index 177272d..16a26b0 100644 --- a/content_resolver/analyzer.py +++ b/content_resolver/analyzer.py @@ -15,6 +15,164 @@ def pkg_placeholder_name_to_nevr(placeholder_name): return placeholder_id +##################################################### +### Find build dependencies from a Koji root log ### +#################################################### + +def _get_build_deps_from_a_root_log(root_log): + """ + Given a packages Koji root_log, find its build dependencies. + """ + required_pkgs = [] + + # The individual states are nicely described inside the for loop. + # They're processed in order + state = 0 + + for file_line in root_log.splitlines(): + + # 0/ + # parts of the log I don't really care about + if state == 0: + + # The next installation is the build deps! + # So I start caring. Next state! + if "'builddep', '--installroot'" in file_line: + state += 1 + + + # 1/ + # getting the "already installed" packages to the list + elif state == 1: + + # "Package already installed" indicates it's directly required, + # so save it. + # DNF5 does this after "Repositories loaded" and quotes the NVR; + # DNF4 does this before "Dependencies resolved" without the quotes. + if "is already installed." in file_line: + pkg_name = file_line.split()[3].strip('"').rsplit("-",2)[0] + required_pkgs.append(pkg_name) + + # That's all! Next state! (DNF4) + elif "Dependencies resolved." in file_line: + state += 1 + + # That's all! Next state! (DNF5) + elif "Repositories loaded." in file_line: + state += 1 + + + # 2/ + # going through the log right before the first package name + elif state == 2: + + # "Package already installed" indicates it's directly required, + # so save it. + # DNF4 does this before "Dependencies resolved" without the quotes; + # DNF5 does this after "Repositories loaded" and quotes the NVR, but + # sometimes prints this in the middle of a dependency line. + if "is already installed." in file_line: + pkg_index = file_line.split().index("already") - 2 + pkg_name = file_line.split()[pkg_index].strip('"').rsplit("-",2)[0] + required_pkgs.append(pkg_name) + + # The next line will be the first package. Next state! + # DNF5 reports "Installing: ## packages" in the Transaction Summary, + # which we need to ignore + if "Installing:" in file_line and len(file_line.split()) == 3: + state += 1 + + + # 3/ + # And now just saving the packages until the "installing dependencies" part + # or the "transaction summary" part if there's no dependencies + elif state == 3: + + if "Installing dependencies:" in file_line: + state = 2 + + elif "Transaction Summary" in file_line: + state = 2 + + # Sometimes DNF5 prints "Package ... is already installed" in middle of the output. + elif file_line.split()[2] == "Package" and file_line.split()[-1] == "installed.": + pkg_name = file_line.split()[3].strip('"').rsplit("-",2)[0] + required_pkgs.append(pkg_name) + + else: + # I need to deal with the following thing... + # + # DEBUG util.py:446: gobject-introspection-devel aarch64 1.70.0-1.fc36 build 1.1 M + # DEBUG util.py:446: graphene-devel aarch64 1.10.6-3.fc35 build 159 k + # DEBUG util.py:446: gstreamer1-plugins-bad-free-devel + # DEBUG util.py:446: aarch64 1.19.2-1.fc36 build 244 k + # DEBUG util.py:446: json-glib-devel aarch64 1.6.6-1.fc36 build 173 k + # DEBUG util.py:446: libXcomposite-devel aarch64 0.4.5-6.fc35 build 16 k + # + # The "gstreamer1-plugins-bad-free-devel" package name is too long to fit in the column, + # so it gets split on two lines. + # + # Which if I take the usual file_line.split()[2] I get the correct name, + # but the next line gives me "aarch64" as a package name which is wrong. + # + # So the usual line has file_line.split() == 8 + # The one with the long package name has file_line.split() == 3 + # and the one following it has file_line.split() == 7 + # + # One more thing... long release! + # + # DEBUG util.py:446: qrencode-devel aarch64 4.0.2-8.fc35 build 13 k + # DEBUG util.py:446: systemtap-sdt-devel aarch64 4.6~pre16291338gf2c14776-1.fc36 + # DEBUG util.py:446: build 71 k + # DEBUG util.py:446: tpm2-tss-devel aarch64 3.1.0-4.fc36 build 315 k + # + # So the good one here is file_line.split() == 5. + # And the following is also file_line.split() == 5. Fun! + # + # So if it ends with B, k, M, G it's the wrong line, so skip, otherwise take the package name. + # + # I can also anticipate both get long... that would mean I need to skip file_line.split() == 4. + + if len(file_line.split()) == 10 or len(file_line.split()) == 11: + # Sometimes DNF5 prints "Package ... is already installed" in the middle of a line + pkg_index = file_line.split().index("already") - 2 + pkg_name = file_line.split()[pkg_index].strip('"').rsplit("-",2)[0] + required_pkgs.append(pkg_name) + if pkg_index == 3: + pkg_name = file_line.split()[7] + else: + pkg_name = file_line.split()[2] + required_pkgs.append(pkg_name) + + # TODO: len(file_line.split()) == 9 ?? + + elif len(file_line.split()) == 8 or len(file_line.split()) == 3: + pkg_name = file_line.split()[2] + required_pkgs.append(pkg_name) + + elif len(file_line.split()) == 7 or len(file_line.split()) == 4: + continue + + elif len(file_line.split()) == 6 or len(file_line.split()) == 5: + # DNF5 uses B/KiB/MiB/GiB, DNF4 uses B/k/M/G + if file_line.split()[4] in ["B", "KiB", "k", "MiB", "M", "GiB", "G"]: + continue + else: + pkg_name = file_line.split()[2] + required_pkgs.append(pkg_name) + + else: + raise KojiRootLogError + + + # 4/ + # I'm done. So I can break out of the loop. + elif state == 4: + break + + + return required_pkgs + class Analyzer(): ############################################################################### @@ -1602,158 +1760,6 @@ def _populate_buildroot_with_view_srpms(self, view_conf, arch): log("") - def _get_build_deps_from_a_root_log(self, root_log): - required_pkgs = [] - - # The individual states are nicely described inside the for loop. - # They're processed in order - state = 0 - - for file_line in root_log.splitlines(): - - # 0/ - # parts of the log I don't really care about - if state == 0: - - # The next installation is the build deps! - # So I start caring. Next state! - if "'builddep', '--installroot'" in file_line: - state += 1 - - - # 1/ - # getting the "already installed" packages to the list - elif state == 1: - - # "Package already installed" indicates it's directly required, - # so save it. - # DNF5 does this after "Repositories loaded" and quotes the NVR; - # DNF4 does this before "Dependencies resolved" without the quotes. - if "is already installed." in file_line: - pkg_name = file_line.split()[3].strip('"').rsplit("-",2)[0] - required_pkgs.append(pkg_name) - - # That's all! Next state! (DNF4) - elif "Dependencies resolved." in file_line: - state += 1 - - # That's all! Next state! (DNF5) - elif "Repositories loaded." in file_line: - state += 1 - - - # 2/ - # going through the log right before the first package name - elif state == 2: - - # "Package already installed" indicates it's directly required, - # so save it. - # DNF4 does this before "Dependencies resolved" without the quotes; - # DNF5 does this after "Repositories loaded" and quotes the NVR, but - # sometimes prints this in the middle of a dependency line. - if "is already installed." in file_line: - pkg_index = file_line.split().index("already") - 2 - pkg_name = file_line.split()[pkg_index].strip('"').rsplit("-",2)[0] - required_pkgs.append(pkg_name) - - # The next line will be the first package. Next state! - # DNF5 reports "Installing: ## packages" in the Transaction Summary, - # which we need to ignore - if "Installing:" in file_line and len(file_line.split()) == 3: - state += 1 - - - # 3/ - # And now just saving the packages until the "installing dependencies" part - # or the "transaction summary" part if there's no dependencies - elif state == 3: - - if "Installing dependencies:" in file_line: - state = 2 - - elif "Transaction Summary" in file_line: - state = 2 - - # Sometimes DNF5 prints "Package ... is already installed" in middle of the output. - elif file_line.split()[2] == "Package" and file_line.split()[-1] == "installed.": - pkg_name = file_line.split()[3].strip('"').rsplit("-",2)[0] - required_pkgs.append(pkg_name) - - else: - # I need to deal with the following thing... - # - # DEBUG util.py:446: gobject-introspection-devel aarch64 1.70.0-1.fc36 build 1.1 M - # DEBUG util.py:446: graphene-devel aarch64 1.10.6-3.fc35 build 159 k - # DEBUG util.py:446: gstreamer1-plugins-bad-free-devel - # DEBUG util.py:446: aarch64 1.19.2-1.fc36 build 244 k - # DEBUG util.py:446: json-glib-devel aarch64 1.6.6-1.fc36 build 173 k - # DEBUG util.py:446: libXcomposite-devel aarch64 0.4.5-6.fc35 build 16 k - # - # The "gstreamer1-plugins-bad-free-devel" package name is too long to fit in the column, - # so it gets split on two lines. - # - # Which if I take the usual file_line.split()[2] I get the correct name, - # but the next line gives me "aarch64" as a package name which is wrong. - # - # So the usual line has file_line.split() == 8 - # The one with the long package name has file_line.split() == 3 - # and the one following it has file_line.split() == 7 - # - # One more thing... long release! - # - # DEBUG util.py:446: qrencode-devel aarch64 4.0.2-8.fc35 build 13 k - # DEBUG util.py:446: systemtap-sdt-devel aarch64 4.6~pre16291338gf2c14776-1.fc36 - # DEBUG util.py:446: build 71 k - # DEBUG util.py:446: tpm2-tss-devel aarch64 3.1.0-4.fc36 build 315 k - # - # So the good one here is file_line.split() == 5. - # And the following is also file_line.split() == 5. Fun! - # - # So if it ends with B, k, M, G it's the wrong line, so skip, otherwise take the package name. - # - # I can also anticipate both get long... that would mean I need to skip file_line.split() == 4. - - if len(file_line.split()) == 10 or len(file_line.split()) == 11: - # Sometimes DNF5 prints "Package ... is already installed" in the middle of a line - pkg_index = file_line.split().index("already") - 2 - pkg_name = file_line.split()[pkg_index].strip('"').rsplit("-",2)[0] - required_pkgs.append(pkg_name) - if pkg_index == 3: - pkg_name = file_line.split()[7] - else: - pkg_name = file_line.split()[2] - required_pkgs.append(pkg_name) - - # TODO: len(file_line.split()) == 9 ?? - - elif len(file_line.split()) == 8 or len(file_line.split()) == 3: - pkg_name = file_line.split()[2] - required_pkgs.append(pkg_name) - - elif len(file_line.split()) == 7 or len(file_line.split()) == 4: - continue - - elif len(file_line.split()) == 6 or len(file_line.split()) == 5: - # DNF5 uses B/KiB/MiB/GiB, DNF4 uses B/k/M/G - if file_line.split()[4] in ["B", "KiB", "k", "MiB", "M", "GiB", "G"]: - continue - else: - pkg_name = file_line.split()[2] - required_pkgs.append(pkg_name) - - else: - raise KojiRootLogError - - - # 4/ - # I'm done. So I can break out of the loop. - elif state == 4: - break - - - return required_pkgs - - def _resolve_srpm_using_root_log(self, srpm_id, arch, koji_session, koji_files_url): # Buildroot grows pretty quickly. Use a fake one for development. @@ -1826,7 +1832,7 @@ def _resolve_srpm_using_root_log(self, srpm_id, arch, koji_session, koji_files_u raise KojiRootLogError("Could not get a root.log file") log(" Parsing the root.log file...") - directly_required_pkg_names = self._get_build_deps_from_a_root_log(root_log_contents) + directly_required_pkg_names = _get_build_deps_from_a_root_log(root_log_contents) log(" Done!") return directly_required_pkg_names @@ -3421,4 +3427,4 @@ def analyze_things(self): self._record_metric("finished analyze_things()") - return self.data \ No newline at end of file + return self.data From 796d273e21dbdf321dfbb0797b8741b0b3f139d5 Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Tue, 12 Aug 2025 14:48:25 -0700 Subject: [PATCH 2/6] Move getting the log path outside of the Analyzer class --- content_resolver/analyzer.py | 53 ++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/content_resolver/analyzer.py b/content_resolver/analyzer.py index 16a26b0..d010ccf 100644 --- a/content_resolver/analyzer.py +++ b/content_resolver/analyzer.py @@ -173,6 +173,35 @@ def _get_build_deps_from_a_root_log(root_log): return required_pkgs + +def _get_koji_log_path(srpm_id, arch, koji_session): + """ + Get koji log path for a given SRPM. + """ + MAX_TRIES = 10 + attempts = 0 + + while attempts < MAX_TRIES: + try: + koji_pkg_data = koji_session.getRPM(f"{srpm_id}.src") + koji_logs = koji_session.getBuildLogs(koji_pkg_data["build_id"]) + break + except Exception: + attempts += 1 + if attempts == MAX_TRIES: + raise KojiRootLogError("Could not talk to Koji API") + time.sleep(1) + + koji_log_path = None + for koji_log in koji_logs: + if koji_log["name"] == "root.log": + if koji_log["dir"] == arch or koji_log["dir"] == "noarch": + koji_log_path = koji_log["path"] + break + + return koji_log_path + + class Analyzer(): ############################################################################### @@ -1780,30 +1809,8 @@ def _resolve_srpm_using_root_log(self, srpm_id, arch, koji_session, koji_files_u # Starting for real! log(" Talking to Koji API...") - # This sometimes hangs, so I'm giving it a timeout and - # a few extra tries before totally giving up! - MAX_TRIES = 10 - attempts = 0 - success = False - while attempts < MAX_TRIES: - try: - koji_pkg_data = koji_session.getRPM("{}.src".format(srpm_id)) - koji_logs = koji_session.getBuildLogs(koji_pkg_data["build_id"]) - success = True - break - except: - attempts +=1 - log(" Error talking to Koji API... retrying...") - if not success: - raise KojiRootLogError("Could not talk to Koji API") - - koji_log_path = None + koji_log_path = _get_koji_log_path(srpm_id, arch, koji_session) - for koji_log in koji_logs: - if koji_log["name"] == "root.log": - if koji_log["dir"] == arch or koji_log["dir"] == "noarch": - koji_log_path = koji_log["path"] - if not koji_log_path: return [] From 53b25074d18e8474542200b3816db7959d10a962 Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Tue, 12 Aug 2025 14:56:07 -0700 Subject: [PATCH 3/6] Move downloading the logs outside of the Analyzer class --- content_resolver/analyzer.py | 37 +++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/content_resolver/analyzer.py b/content_resolver/analyzer.py index d010ccf..cc2fda7 100644 --- a/content_resolver/analyzer.py +++ b/content_resolver/analyzer.py @@ -202,6 +202,25 @@ def _get_koji_log_path(srpm_id, arch, koji_session): return koji_log_path +def _download_root_log_with_retry(root_log_url): + """ + Download root.log file with retry logic. + """ + MAX_TRIES = 10 + attempts = 0 + + while attempts < MAX_TRIES: + try: + with urllib.request.urlopen(root_log_url, timeout=20) as response: + root_log_data = response.read() + return root_log_data.decode('utf-8') + except Exception: + attempts += 1 + if attempts == MAX_TRIES: + raise KojiRootLogError(f"Could not download root.log from {root_log_url}") + time.sleep(1) + + class Analyzer(): ############################################################################### @@ -1820,23 +1839,7 @@ def _resolve_srpm_using_root_log(self, srpm_id, arch, koji_session, koji_files_u ) log(" Downloading the root.log file...") - # This sometimes hangs, so I'm giving it a timeout and - # a few extra tries before totally giving up! - MAX_TRIES = 10 - attempts = 0 - success = False - while attempts < MAX_TRIES: - try: - with urllib.request.urlopen(root_log_url, timeout=20) as response: - root_log_data = response.read() - root_log_contents = root_log_data.decode('utf-8') - success = True - break - except: - attempts +=1 - log(" Error getting the root log... retrying...") - if not success: - raise KojiRootLogError("Could not get a root.log file") + root_log_contents = _download_root_log_with_retry(root_log_url) log(" Parsing the root.log file...") directly_required_pkg_names = _get_build_deps_from_a_root_log(root_log_contents) From 4ede2a2220b614254c1a69bf757e947d74203144 Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Wed, 13 Aug 2025 09:33:05 -0700 Subject: [PATCH 4/6] Update tests to work with changes --- test_root_log_function.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_root_log_function.py b/test_root_log_function.py index e413531..e630863 100755 --- a/test_root_log_function.py +++ b/test_root_log_function.py @@ -1,6 +1,6 @@ #! /usr/bin/python3 -from content_resolver.analyzer import Analyzer +import content_resolver.analyzer import sys import urllib @@ -16,6 +16,6 @@ root_log_data = response.read() root_log_contents = root_log_data.decode('utf-8') -required_pkg_names = Analyzer._get_build_deps_from_a_root_log(None, root_log_contents) +required_pkg_names = content_resolver.analyzer._get_build_deps_from_a_root_log(root_log_contents) print(required_pkg_names) From 9bd8a68fddbdc01e3d6ebda43e4758022a090262 Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Wed, 13 Aug 2025 09:45:43 -0700 Subject: [PATCH 5/6] Process root logs in parallel --- content_resolver/analyzer.py | 276 ++++++++++++++++++++++++----------- 1 file changed, 188 insertions(+), 88 deletions(-) diff --git a/content_resolver/analyzer.py b/content_resolver/analyzer.py index cc2fda7..7bf784b 100644 --- a/content_resolver/analyzer.py +++ b/content_resolver/analyzer.py @@ -1,4 +1,6 @@ import tempfile, os, json, datetime, dnf, urllib.request, sys, koji +import re, time +from concurrent.futures import ProcessPoolExecutor, as_completed import multiprocessing, asyncio from content_resolver.utils import dump_data, load_data, log, err_log, pkg_id_to_name, size, workload_id_to_conf_id, url_to_id @@ -220,6 +222,92 @@ def _download_root_log_with_retry(root_log_url): raise KojiRootLogError(f"Could not download root.log from {root_log_url}") time.sleep(1) +def process_single_srpm_root_log(work_item): + """ + Process a single SRPM's root.log file. + + Args: + work_item (dict): Contains koji_api_url, koji_files_url, srpm_id, arch, dev_buildroot + + Returns: + dict: Contains srpm_id, arch, deps (list), error (str or None) + """ + try: + koji_api_url = work_item['koji_api_url'] + koji_files_url = work_item['koji_files_url'] + srpm_id = work_item['srpm_id'] + arch = work_item['arch'] + dev_buildroot = work_item.get('dev_buildroot', False) + + # Handle development buildroot mode + if dev_buildroot: + # Making sure there are 3 passes at least, but that it won't get overwhelmed + srpm_name = srpm_id.rsplit("-", 2)[0] + if srpm_name in ["bash", "make", "unzip"]: + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': ["gawk", "xz", "findutils"], + 'error': None + } + elif srpm_name in ["gawk", "xz", "findutils"]: + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': ['cpio', 'diffutils'], + 'error': None + } + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': ["bash", "make", "unzip"], + 'error': None + } + + # Handle special cases + if srpm_id.rsplit("-", 2)[0] in ["shim"]: + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': [], + 'error': None + } + + # Create koji session + koji_session = koji.ClientSession(koji_api_url, opts={"timeout": 20}) + + # Get koji log path + koji_log_path = _get_koji_log_path(srpm_id, arch, koji_session) + + if not koji_log_path: + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': [], + 'error': None + } + + # Download root.log + root_log_url = f"{koji_files_url}/{koji_log_path}" + root_log_contents = _download_root_log_with_retry(root_log_url) + + # Parse dependencies + deps = _get_build_deps_from_a_root_log(root_log_contents) + + return { + 'srpm_id': srpm_id, + 'arch': arch, + 'deps': deps, + 'error': None + } + + except Exception as e: + return { + 'srpm_id': work_item.get('srpm_id', 'unknown'), + 'arch': work_item.get('arch', 'unknown'), + 'deps': [], + 'error': str(e) + } class Analyzer(): @@ -1808,65 +1896,15 @@ def _populate_buildroot_with_view_srpms(self, view_conf, arch): log("") - def _resolve_srpm_using_root_log(self, srpm_id, arch, koji_session, koji_files_url): - - # Buildroot grows pretty quickly. Use a fake one for development. - if self.settings["dev_buildroot"]: - # Making sure there are 3 passes at least, but that it won't get overwhelmed - if srpm_id.rsplit("-",2)[0] in ["bash", "make", "unzip"]: - return ["gawk", "xz", "findutils"] - - elif srpm_id.rsplit("-",2)[0] in ["gawk", "xz", "findutils"]: - return ['cpio', 'diffutils'] - - return ["bash", "make", "unzip"] - - # Shim is special. - if srpm_id.rsplit("-",2)[0] in ["shim"]: - log( "It's shim! It gets sometiems tagged from wherever... Let's not even bother!") - return [] - - # Starting for real! - log(" Talking to Koji API...") - koji_log_path = _get_koji_log_path(srpm_id, arch, koji_session) - - if not koji_log_path: - return [] - - root_log_url = "{koji_files_url}/{koji_log_path}".format( - koji_files_url=koji_files_url, - koji_log_path=koji_log_path - ) - - log(" Downloading the root.log file...") - root_log_contents = _download_root_log_with_retry(root_log_url) - - log(" Parsing the root.log file...") - directly_required_pkg_names = _get_build_deps_from_a_root_log(root_log_contents) - - log(" Done!") - return directly_required_pkg_names - - - def _resolve_srpms_using_root_logs(self, pass_counter): - # This function is idempotent! - # - # That means it can be run many times without affecting the old results. - - log("== Resolving SRPMs using root logs - pass {} ========".format(pass_counter)) + def _resolve_srpms_using_root_logs_parallel(self, pass_counter): + """ + This function is idempotent! + """ + log("== Resolving SRPMs using root logs - pass {} (PARALLEL) ========".format(pass_counter)) - # Prepare a counter for the log + # Collect work items (skip cached entries) + work_items = [] total_srpms_to_resolve = 0 - for koji_id in self.data["buildroot"]["koji_srpms"]: - for arch in self.data["buildroot"]["koji_srpms"][koji_id]: - total_srpms_to_resolve += len(self.data["buildroot"]["koji_srpms"][koji_id][arch]) - srpms_to_resolve_counter = 0 - - # I need to keep sessions open to Koji - # And because in some cases (in mixed repos) packages - # could have been in different koji instances, I need - # multiple Koji sesions! - koji_sessions = {} for koji_id in self.data["buildroot"]["koji_srpms"]: koji_urls = self.data["buildroot"]["koji_urls"][koji_id] @@ -1877,10 +1915,6 @@ def _resolve_srpms_using_root_logs(self, pass_counter): if koji_id not in self.cache["root_log_deps"]["next"]: self.cache["root_log_deps"]["next"][koji_id] = {} - # Initiate Koji sessions - if koji_id not in koji_sessions: - koji_sessions[koji_id] = koji.ClientSession(koji_urls["api"], opts = {"timeout": 20}) - for arch in self.data["buildroot"]["koji_srpms"][koji_id]: # If the cache is empty, initialise it @@ -1888,42 +1922,108 @@ def _resolve_srpms_using_root_logs(self, pass_counter): self.cache["root_log_deps"]["current"][koji_id][arch] = {} if arch not in self.cache["root_log_deps"]["next"][koji_id]: self.cache["root_log_deps"]["next"][koji_id][arch] = {} - for srpm_id, srpm in self.data["buildroot"]["koji_srpms"][koji_id][arch].items(): - srpms_to_resolve_counter += 1 - - log("") - log("[ Buildroot - pass {} - {} of {} ]".format(pass_counter, srpms_to_resolve_counter, total_srpms_to_resolve)) - log("Koji root_log {srpm_id} {arch}".format( - srpm_id=srpm_id, - arch=arch - )) - if not srpm["directly_required_pkg_names"]: - if srpm_id in self.cache["root_log_deps"]["current"][koji_id][arch]: - log(" Using Cache!") - directly_required_pkg_names = self.cache["root_log_deps"]["current"][koji_id][arch][srpm_id] - - elif srpm_id in self.cache["root_log_deps"]["next"][koji_id][arch]: - log(" Using Cache!") - directly_required_pkg_names = self.cache["root_log_deps"]["next"][koji_id][arch][srpm_id] - - else: - log(" Resolving...") - directly_required_pkg_names = self._resolve_srpm_using_root_log(srpm_id, arch, koji_sessions[koji_id], koji_urls["files"]) - + total_srpms_to_resolve += 1 + + # Skip if already processed or cached + if srpm["directly_required_pkg_names"]: + log(f" Skipping {srpm_id} {arch} (already done before)") + continue + + if srpm_id in self.cache["root_log_deps"]["current"][koji_id][arch]: + log(f" Using Cache for {srpm_id} {arch}!") + directly_required_pkg_names = self.cache["root_log_deps"]["current"][koji_id][arch][srpm_id] self.cache["root_log_deps"]["next"][koji_id][arch][srpm_id] = directly_required_pkg_names + srpm["directly_required_pkg_names"].update(directly_required_pkg_names) + continue + + if srpm_id in self.cache["root_log_deps"]["next"][koji_id][arch]: + log(f" Using Cache for {srpm_id} {arch}!") + directly_required_pkg_names = self.cache["root_log_deps"]["next"][koji_id][arch][srpm_id] + srpm["directly_required_pkg_names"].update(directly_required_pkg_names) + continue + + # Add to work queue + work_items.append({ + 'koji_id': koji_id, + 'koji_api_url': koji_urls["api"], + 'koji_files_url': koji_urls["files"], + 'srpm_id': srpm_id, + 'arch': arch, + 'dev_buildroot': self.settings.get("dev_buildroot", False) + }) + + if not work_items: + log("All SRPMs already cached or processed!") + return + + log(f"Processing {len(work_items)} SRPMs in parallel (out of {total_srpms_to_resolve} total)") + + # Process in parallel using ProcessPoolExecutor + max_workers = min(self.settings["max_subprocesses"], len(work_items)) + + with ProcessPoolExecutor(max_workers=max_workers) as executor: + # Submit all jobs + future_to_item = { + executor.submit(process_single_srpm_root_log, item): item + for item in work_items + } - # Here it's important to add the packages to the already initiated - # set, because its reference is shared between the koji_srpms and the srpm sections - self.data["buildroot"]["koji_srpms"][koji_id][arch][srpm_id]["directly_required_pkg_names"].update(directly_required_pkg_names) + # Collect results as they complete + completed_count = 0 + total_count = len(work_items) + + for future in as_completed(future_to_item): + completed_count += 1 + work_item = future_to_item[future] + + try: + result = future.result() + self._apply_srpm_result(work_item, result) + + if result['error']: + log(f"[ Buildroot - pass {pass_counter} - {completed_count} of {total_count} ] " + f"Failed {result['srpm_id']} {result['arch']}: {result['error']}") else: - log(" Skipping! (already done before)") + log(f"[ Buildroot - pass {pass_counter} - {completed_count} of {total_count} ] " + f"Completed {result['srpm_id']} {result['arch']} - found {len(result['deps'])} deps") + + except Exception as e: + log(f"Failed to process {work_item['srpm_id']}: {e}") + # Apply empty result for failed processing + error_result = { + 'srpm_id': work_item['srpm_id'], + 'arch': work_item['arch'], + 'deps': [], + 'error': str(e) + } + self._apply_srpm_result(work_item, error_result) + + # Save updated cache + dump_data(self.settings["root_log_deps_cache_path"], self.cache["root_log_deps"]["next"]) + log("") log(" DONE!") log("") + def _apply_srpm_result(self, work_item, result): + """Apply worker result back to main data structures""" + koji_id = work_item['koji_id'] + arch = work_item['arch'] + srpm_id = work_item['srpm_id'] + deps = result['deps'] + + # Update cache + self.cache["root_log_deps"]["next"][koji_id][arch][srpm_id] = deps + + # Update main data + # Here it's important to add the packages to the already initiated + # set, because its reference is shared between the koji_srpms and the srpm sections + self.data["buildroot"]["koji_srpms"][koji_id][arch][srpm_id]["directly_required_pkg_names"].update(deps) + + def _analyze_build_groups(self): log("") @@ -2189,7 +2289,7 @@ def _analyze_buildroot(self): # ... which also updates: # data["buildroot"]["srpms"]... # ... because it's interlinked. - self._resolve_srpms_using_root_logs(pass_counter) + self._resolve_srpms_using_root_logs_parallel(pass_counter) self._record_metric(" finished _resolve_srpms_using_root_logs") From 4a978a82f78afb0a4fa526965ec123af1e223b6a Mon Sep 17 00:00:00 2001 From: Troy Dawson Date: Wed, 13 Aug 2025 10:19:27 -0700 Subject: [PATCH 6/6] Add a command line option to change the number of parallel processes --- content_resolver/analyzer.py | 2 +- content_resolver/config_manager.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/content_resolver/analyzer.py b/content_resolver/analyzer.py index 7bf784b..eaaffd3 100644 --- a/content_resolver/analyzer.py +++ b/content_resolver/analyzer.py @@ -1961,7 +1961,7 @@ def _resolve_srpms_using_root_logs_parallel(self, pass_counter): log(f"Processing {len(work_items)} SRPMs in parallel (out of {total_srpms_to_resolve} total)") # Process in parallel using ProcessPoolExecutor - max_workers = min(self.settings["max_subprocesses"], len(work_items)) + max_workers = min(self.settings["parallel_max"], len(work_items)) with ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all jobs diff --git a/content_resolver/config_manager.py b/content_resolver/config_manager.py index a06a133..e4aaa8f 100644 --- a/content_resolver/config_manager.py +++ b/content_resolver/config_manager.py @@ -23,6 +23,7 @@ def load_settings(self): parser.add_argument("--use-cache", dest="use_cache", action='store_true', help="Use local data instead of pulling Content Resolver. Saves a lot of time! Needs a 'cache_data.json' file at the same location as the script is at.") parser.add_argument("--dev-buildroot", dest="dev_buildroot", action='store_true', help="Buildroot grows pretty quickly. Use a fake one for development.") parser.add_argument("--dnf-cache-dir", dest="dnf_cache_dir_override", help="Override the dnf cache_dir.") + parser.add_argument("--parallel-max", dest="parallel_max", default=os.cpu_count(), type=int, help="Max parallel processes to run") args = parser.parse_args() settings["configs"] = args.configs @@ -30,6 +31,7 @@ def load_settings(self): settings["use_cache"] = args.use_cache settings["dev_buildroot"] = args.dev_buildroot settings["dnf_cache_dir_override"] = args.dnf_cache_dir_override + settings["parallel_max"] = args.parallel_max settings["root_log_deps_cache_path"] = "cache_root_log_deps.json"