|
| 1 | +# SPDX-License-Identifier: LGPL-2.1-or-later |
| 2 | +# *************************************************************************** |
| 3 | +# * * |
| 4 | +# * Copyright (c) 2025 The FreeCAD project association AISBL * |
| 5 | +# * * |
| 6 | +# * This file is part of FreeCAD. * |
| 7 | +# * * |
| 8 | +# * FreeCAD is free software: you can redistribute it and/or modify it * |
| 9 | +# * under the terms of the GNU Lesser General Public License as * |
| 10 | +# * published by the Free Software Foundation, either version 2.1 of the * |
| 11 | +# * License, or (at your option) any later version. * |
| 12 | +# * * |
| 13 | +# * FreeCAD is distributed in the hope that it will be useful, but * |
| 14 | +# * WITHOUT ANY WARRANTY; without even the implied warranty of * |
| 15 | +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * |
| 16 | +# * Lesser General Public License for more details. * |
| 17 | +# * * |
| 18 | +# * You should have received a copy of the GNU Lesser General Public * |
| 19 | +# * License along with FreeCAD. If not, see * |
| 20 | +# * <https://www.gnu.org/licenses/>. * |
| 21 | +# * * |
| 22 | +# *************************************************************************** |
| 23 | + |
| 24 | +"""Classes and utility functions to generate a remotely hosted cache of all addon catalog entries. |
| 25 | +Intended to be run by a server-side systemd timer to generate a file that is then loaded by the |
| 26 | +Addon Manager in each FreeCAD installation.""" |
| 27 | +import enum |
| 28 | +import xml.etree.ElementTree |
| 29 | +from dataclasses import dataclass, asdict |
| 30 | +from typing import List, Optional |
| 31 | + |
| 32 | +import base64 |
| 33 | +import io |
| 34 | +import json |
| 35 | +import os |
| 36 | +import requests |
| 37 | +import shutil |
| 38 | +import subprocess |
| 39 | +import zipfile |
| 40 | + |
| 41 | +import AddonCatalog |
| 42 | +import addonmanager_metadata |
| 43 | + |
| 44 | + |
| 45 | +ADDON_CATALOG_URL = ( |
| 46 | + "https://raw.githubusercontent.com/FreeCAD/FreeCAD-addons/master/AddonCatalog.json" |
| 47 | +) |
| 48 | +BASE_DIRECTORY = "./CatalogCache" |
| 49 | +MAX_COUNT = 10000 # Do at most this many repos (for testing purposes this can be made smaller) |
| 50 | + |
| 51 | +# Repos that are too large, or that should for some reason not be cloned here |
| 52 | +EXCLUDED_REPOS = ["parts_library"] |
| 53 | + |
| 54 | + |
| 55 | +@dataclass |
| 56 | +class CacheEntry: |
| 57 | + """All contents of a CacheEntry are the text contents of the file listed. The icon data is |
| 58 | + base64-encoded (although it was probably an SVG, other formats are supported).""" |
| 59 | + |
| 60 | + package_xml: str = "" |
| 61 | + requirements_txt: str = "" |
| 62 | + metadata_txt: str = "" |
| 63 | + icon_data: str = "" |
| 64 | + |
| 65 | + |
| 66 | +class GitRefType(enum.IntEnum): |
| 67 | + """Enum for the type of git ref (tag, branch, or hash).""" |
| 68 | + |
| 69 | + TAG = 1 |
| 70 | + BRANCH = 2 |
| 71 | + HASH = 3 |
| 72 | + |
| 73 | + |
| 74 | +class CatalogFetcher: |
| 75 | + """Fetches the addon catalog from the given URL and returns an AddonCatalog object. Separated |
| 76 | + from the main class for easy mocking during tests. Note that every instantiation of this class |
| 77 | + will run a new fetch of the catalog.""" |
| 78 | + |
| 79 | + def __init__(self, addon_catalog_url: str = ADDON_CATALOG_URL): |
| 80 | + self.addon_catalog_url = addon_catalog_url |
| 81 | + self.catalog = self.fetch_catalog() |
| 82 | + |
| 83 | + def fetch_catalog(self) -> AddonCatalog.AddonCatalog: |
| 84 | + """Fetch the addon catalog from the given URL and return an AddonCatalog object.""" |
| 85 | + response = requests.get(self.addon_catalog_url) |
| 86 | + if response.status_code != 200: |
| 87 | + raise RuntimeError( |
| 88 | + f"ERROR: Failed to fetch addon catalog from {self.addon_catalog_url}" |
| 89 | + ) |
| 90 | + return AddonCatalog.AddonCatalog(response.json()) |
| 91 | + |
| 92 | + |
| 93 | +class CacheWriter: |
| 94 | + """Writes a JSON file containing a cache of all addon catalog entries. The cache is a copy of |
| 95 | + the package.xml, requirements.txt, and metadata.txt files from the addon repositories, as well |
| 96 | + as a base64-encoded icon image. The cache is written to the current working directory.""" |
| 97 | + |
| 98 | + def __init__(self): |
| 99 | + self.catalog: AddonCatalog = None |
| 100 | + if os.path.isabs(BASE_DIRECTORY): |
| 101 | + self.cwd = BASE_DIRECTORY |
| 102 | + else: |
| 103 | + self.cwd = os.path.normpath(os.path.join(os.getcwd(), BASE_DIRECTORY)) |
| 104 | + self._cache = {} |
| 105 | + |
| 106 | + def write(self): |
| 107 | + original_working_directory = os.getcwd() |
| 108 | + os.makedirs(self.cwd, exist_ok=True) |
| 109 | + os.chdir(self.cwd) |
| 110 | + self.create_local_copy_of_addons() |
| 111 | + with open("addon_catalog_cache.json", "w", encoding="utf-8") as f: |
| 112 | + f.write(json.dumps(self._cache, indent=" ")) |
| 113 | + os.chdir(original_working_directory) |
| 114 | + print(f"Wrote cache to {os.path.join(self.cwd, 'addon_catalog_cache.json')}") |
| 115 | + |
| 116 | + def create_local_copy_of_addons(self): |
| 117 | + self.catalog = CatalogFetcher().catalog |
| 118 | + counter = 0 |
| 119 | + for addon_id, catalog_entries in self.catalog.get_catalog().items(): |
| 120 | + if addon_id in EXCLUDED_REPOS: |
| 121 | + continue |
| 122 | + self.create_local_copy_of_single_addon(addon_id, catalog_entries) |
| 123 | + counter += 1 |
| 124 | + if counter >= MAX_COUNT: |
| 125 | + break |
| 126 | + |
| 127 | + def create_local_copy_of_single_addon( |
| 128 | + self, addon_id: str, catalog_entries: List[AddonCatalog.AddonCatalogEntry] |
| 129 | + ): |
| 130 | + for index, catalog_entry in enumerate(catalog_entries): |
| 131 | + if catalog_entry.repository is not None: |
| 132 | + self.create_local_copy_of_single_addon_with_git(addon_id, index, catalog_entry) |
| 133 | + elif catalog_entry.zip_url is not None: |
| 134 | + self.create_local_copy_of_single_addon_with_zip(addon_id, index, catalog_entry) |
| 135 | + else: |
| 136 | + print( |
| 137 | + f"ERROR: Invalid catalog entry for {addon_id}. " |
| 138 | + "Neither git info nor zip info was specified." |
| 139 | + ) |
| 140 | + continue |
| 141 | + entry = self.generate_cache_entry(addon_id, index, catalog_entry) |
| 142 | + if addon_id not in self._cache: |
| 143 | + self._cache[addon_id] = [] |
| 144 | + if entry is not None: |
| 145 | + self._cache[addon_id].append(asdict(entry)) |
| 146 | + else: |
| 147 | + self._cache[addon_id].append({}) |
| 148 | + |
| 149 | + def generate_cache_entry( |
| 150 | + self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry |
| 151 | + ) -> Optional[CacheEntry]: |
| 152 | + """Create the cache entry for this catalog entry if there is data to cache. If there is |
| 153 | + nothing to cache, returns None.""" |
| 154 | + path_to_package_xml = self.find_file("package.xml", addon_id, index, catalog_entry) |
| 155 | + cache_entry = None |
| 156 | + if path_to_package_xml and os.path.exists(path_to_package_xml): |
| 157 | + cache_entry = self.generate_cache_entry_from_package_xml(path_to_package_xml) |
| 158 | + |
| 159 | + path_to_requirements = self.find_file("requirements.txt", addon_id, index, catalog_entry) |
| 160 | + if path_to_requirements and os.path.exists(path_to_requirements): |
| 161 | + if cache_entry is None: |
| 162 | + cache_entry = CacheEntry() |
| 163 | + with open(path_to_requirements, "r", encoding="utf-8") as f: |
| 164 | + cache_entry.requirements_txt = f.read() |
| 165 | + |
| 166 | + path_to_metadata = self.find_file("metadata.txt", addon_id, index, catalog_entry) |
| 167 | + if path_to_metadata and os.path.exists(path_to_metadata): |
| 168 | + if cache_entry is None: |
| 169 | + cache_entry = CacheEntry() |
| 170 | + with open(path_to_metadata, "r", encoding="utf-8") as f: |
| 171 | + cache_entry.metadata_txt = f.read() |
| 172 | + |
| 173 | + return cache_entry |
| 174 | + |
| 175 | + def generate_cache_entry_from_package_xml( |
| 176 | + self, path_to_package_xml: str |
| 177 | + ) -> Optional[CacheEntry]: |
| 178 | + cache_entry = CacheEntry() |
| 179 | + with open(path_to_package_xml, "r", encoding="utf-8") as f: |
| 180 | + cache_entry.package_xml = f.read() |
| 181 | + try: |
| 182 | + metadata = addonmanager_metadata.MetadataReader.from_bytes( |
| 183 | + cache_entry.package_xml.encode("utf-8") |
| 184 | + ) |
| 185 | + except xml.etree.ElementTree.ParseError: |
| 186 | + print(f"ERROR: Failed to parse XML from {path_to_package_xml}") |
| 187 | + return None |
| 188 | + except RuntimeError: |
| 189 | + print(f"ERROR: Failed to read metadata from {path_to_package_xml}") |
| 190 | + return None |
| 191 | + |
| 192 | + relative_icon_path = self.get_icon_from_metadata(metadata) |
| 193 | + if relative_icon_path is not None: |
| 194 | + absolute_icon_path = os.path.join( |
| 195 | + os.path.dirname(path_to_package_xml), relative_icon_path |
| 196 | + ) |
| 197 | + if os.path.exists(absolute_icon_path): |
| 198 | + with open(absolute_icon_path, "rb") as f: |
| 199 | + cache_entry.icon_data = base64.b64encode(f.read()).decode("utf-8") |
| 200 | + else: |
| 201 | + print(f"ERROR: Could not find icon file {absolute_icon_path}") |
| 202 | + return cache_entry |
| 203 | + |
| 204 | + def create_local_copy_of_single_addon_with_git( |
| 205 | + self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry |
| 206 | + ): |
| 207 | + expected_name = self.get_directory_name(addon_id, index, catalog_entry) |
| 208 | + self.clone_or_update(expected_name, catalog_entry.repository, catalog_entry.git_ref) |
| 209 | + |
| 210 | + @staticmethod |
| 211 | + def get_directory_name(addon_id, index, catalog_entry): |
| 212 | + expected_name = os.path.join(addon_id, str(index) + "-") |
| 213 | + if catalog_entry.branch_display_name: |
| 214 | + expected_name += catalog_entry.branch_display_name.replace("/", "-") |
| 215 | + elif catalog_entry.git_ref: |
| 216 | + expected_name += catalog_entry.git_ref.replace("/", "-") |
| 217 | + else: |
| 218 | + expected_name += "unknown-branch-name" |
| 219 | + return expected_name |
| 220 | + |
| 221 | + def create_local_copy_of_single_addon_with_zip( |
| 222 | + self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry |
| 223 | + ): |
| 224 | + response = requests.get(catalog_entry.zip_url) |
| 225 | + if response.status_code != 200: |
| 226 | + print(f"ERROR: Failed to fetch zip data for {addon_id} from {catalog_entry.zip_url}.") |
| 227 | + return |
| 228 | + extract_to_dir = self.get_directory_name(addon_id, index, catalog_entry) |
| 229 | + if os.path.exists(extract_to_dir): |
| 230 | + shutil.rmtree(extract_to_dir) |
| 231 | + os.makedirs(extract_to_dir, exist_ok=True) |
| 232 | + |
| 233 | + with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file: |
| 234 | + zip_file.extractall(path=extract_to_dir) |
| 235 | + |
| 236 | + @staticmethod |
| 237 | + def clone_or_update(name: str, url: str, branch: str) -> None: |
| 238 | + """If a directory called "name" exists, and it contains a subdirectory called .git, |
| 239 | + then 'git fetch' is called; otherwise we use 'git clone' to make a bare, shallow |
| 240 | + copy of the repo (in the normal case where minimal is True), or a normal clone, |
| 241 | + if minimal is set to False.""" |
| 242 | + |
| 243 | + if not os.path.exists(os.path.join(os.getcwd(), name, ".git")): |
| 244 | + print(f"Cloning {url} to {name}", flush=True) |
| 245 | + # Shallow, but do include the last commit on each branch and tag |
| 246 | + command = [ |
| 247 | + "git", |
| 248 | + "clone", |
| 249 | + "--depth", |
| 250 | + "1", |
| 251 | + "--branch", |
| 252 | + branch, |
| 253 | + url, |
| 254 | + name, |
| 255 | + ] |
| 256 | + completed_process = subprocess.run(command) |
| 257 | + if completed_process.returncode != 0: |
| 258 | + raise RuntimeError(f"Clone failed for {url}") |
| 259 | + else: |
| 260 | + print(f"Updating {name}", flush=True) |
| 261 | + old_dir = os.getcwd() |
| 262 | + os.chdir(os.path.join(old_dir, name)) |
| 263 | + # Determine if we are dealing with a tag, branch, or hash |
| 264 | + git_ref_type = CacheWriter.determine_git_ref_type(name, url, branch) |
| 265 | + command = ["git", "fetch"] |
| 266 | + completed_process = subprocess.run(command) |
| 267 | + if completed_process.returncode != 0: |
| 268 | + os.chdir(old_dir) |
| 269 | + raise RuntimeError(f"git fetch failed for {name}") |
| 270 | + command = ["git", "checkout", branch, "--quiet"] |
| 271 | + completed_process = subprocess.run(command) |
| 272 | + if completed_process.returncode != 0: |
| 273 | + os.chdir(old_dir) |
| 274 | + raise RuntimeError(f"git checkout failed for {name} branch {branch}") |
| 275 | + if git_ref_type == GitRefType.BRANCH: |
| 276 | + command = ["git", "merge", "--quiet"] |
| 277 | + completed_process = subprocess.run(command) |
| 278 | + if completed_process.returncode != 0: |
| 279 | + os.chdir(old_dir) |
| 280 | + raise RuntimeError(f"git merge failed for {name} branch {branch}") |
| 281 | + os.chdir(old_dir) |
| 282 | + |
| 283 | + def find_file( |
| 284 | + self, |
| 285 | + filename: str, |
| 286 | + addon_id: str, |
| 287 | + index: int, |
| 288 | + catalog_entry: AddonCatalog.AddonCatalogEntry, |
| 289 | + ) -> Optional[str]: |
| 290 | + """Find a given file in the downloaded cache for this addon. Returns None if the file does |
| 291 | + not exist.""" |
| 292 | + start_dir = os.path.join(self.cwd, self.get_directory_name(addon_id, index, catalog_entry)) |
| 293 | + for dirpath, _, filenames in os.walk(start_dir): |
| 294 | + if filename in filenames: |
| 295 | + return os.path.join(dirpath, filename) |
| 296 | + return None |
| 297 | + |
| 298 | + @staticmethod |
| 299 | + def get_icon_from_metadata(metadata: addonmanager_metadata.Metadata) -> Optional[str]: |
| 300 | + """Try to locate the icon file specified for this Addon. Recursively search through the |
| 301 | + levels of the metadata and return the first specified icon file path. Returns None of there |
| 302 | + is no icon specified for this Addon (which is not allowed by the standard, but we don't want |
| 303 | + to crash the cache writer).""" |
| 304 | + if metadata.icon: |
| 305 | + return metadata.icon |
| 306 | + for content_type in metadata.content: |
| 307 | + for content_item in metadata.content[content_type]: |
| 308 | + icon = CacheWriter.get_icon_from_metadata(content_item) |
| 309 | + if icon: |
| 310 | + return icon |
| 311 | + return None |
| 312 | + |
| 313 | + @staticmethod |
| 314 | + def determine_git_ref_type(name: str, url: str, branch: str) -> GitRefType: |
| 315 | + """Determine if the given branch, tag, or hash is a tag, branch, or hash. Returns the type |
| 316 | + if determinable, otherwise raises a RuntimeError.""" |
| 317 | + command = ["git", "show-ref", "--verify", f"refs/remotes/origin/{branch}"] |
| 318 | + completed_process = subprocess.run(command) |
| 319 | + if completed_process.returncode == 0: |
| 320 | + return GitRefType.BRANCH |
| 321 | + command = ["git", "show-ref", "--tags"] |
| 322 | + completed_process = subprocess.run(command, capture_output=True) |
| 323 | + completed_process_output = completed_process.stdout.decode("utf-8") |
| 324 | + if branch in completed_process_output: |
| 325 | + return GitRefType.TAG |
| 326 | + command = ["git", "rev-parse", branch] |
| 327 | + completed_process = subprocess.run(command) |
| 328 | + if completed_process.returncode == 0: |
| 329 | + return GitRefType.HASH |
| 330 | + raise RuntimeError( |
| 331 | + f"Could not determine if {branch} of {name} is a tag, branch, or hash. " |
| 332 | + f"Output was: {completed_process_output}" |
| 333 | + ) |
| 334 | + |
| 335 | + |
| 336 | +if __name__ == "__main__": |
| 337 | + writer = CacheWriter() |
| 338 | + writer.write() |
0 commit comments