-
Notifications
You must be signed in to change notification settings - Fork 71
Add extensions ecosystem support for Guarddog #589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
64d915d
Add extensions ecosystem support for Guarddog
tesnim5hamdouni 4bce5bd
fix type-check and formatting-induced errors
tesnim5hamdouni 8e2cd4a
Remove metadata rules for now, default yara rules to all ecosystems, …
tesnim5hamdouni 2564a61
nits ...
tesnim5hamdouni 1fdda88
nits ...
tesnim5hamdouni 54b72e3
implement requested changes
tesnim5hamdouni e39c0ec
remove credits in yara rules| remove DTO structures in extension_scanner
tesnim5hamdouni File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,13 @@ def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None: | |
| ".semgrep_logs", | ||
| ] | ||
|
|
||
| def analyze(self, path, info=None, rules=None, name: Optional[str] = None, version: Optional[str] = None) -> dict: | ||
| def analyze( | ||
| self, | ||
| path, | ||
| info=None, | ||
| rules=None, | ||
| name: Optional[str] = None, | ||
| version: Optional[str] = None) -> dict: | ||
| """ | ||
| Analyzes a package in the given path | ||
|
|
||
|
|
@@ -87,18 +93,33 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi | |
| sourcecode_results = None | ||
|
|
||
| # populate results, errors, and number of issues | ||
| metadata_results = self.analyze_metadata(path, info, rules, name, version) | ||
| metadata_results = self.analyze_metadata( | ||
| path, info, rules, name, version) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| sourcecode_results = self.analyze_sourcecode(path, rules) | ||
|
|
||
| # Concatenate dictionaries together | ||
| issues = metadata_results["issues"] + sourcecode_results["issues"] | ||
| results = metadata_results["results"] | sourcecode_results["results"] | ||
| errors = metadata_results["errors"] | sourcecode_results["errors"] | ||
|
|
||
| return {"issues": issues, "errors": errors, "results": results, "path": path} | ||
|
|
||
| def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = None, | ||
| version: Optional[str] = None) -> dict: | ||
| output = { | ||
| "issues": issues, | ||
| "errors": errors, | ||
| "results": results, | ||
| "path": path} | ||
| # Including extension info - pending discussion | ||
| # if info is not None: | ||
| # output["package_info"] = info | ||
|
|
||
| return output | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def analyze_metadata( | ||
| self, | ||
| path: str, | ||
| info, | ||
| rules=None, | ||
| name: Optional[str] = None, | ||
| version: Optional[str] = None) -> dict: | ||
|
||
| """ | ||
| Analyzes the metadata of a given package | ||
|
|
||
|
|
@@ -127,7 +148,8 @@ def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = No | |
| for rule in all_rules: | ||
| try: | ||
| log.debug(f"Running rule {rule} against package '{name}'") | ||
| rule_matches, message = self.metadata_detectors[rule].detect(info, path, name, version) | ||
| rule_matches, message = self.metadata_detectors[rule].detect( | ||
| info, path, name, version) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| results[rule] = None | ||
| if rule_matches: | ||
| issues += 1 | ||
|
|
@@ -157,7 +179,11 @@ def analyze_sourcecode(self, path, rules=None) -> dict: | |
| results = semgrepscan_results["results"] | yarascan_results["results"] | ||
| errors = semgrepscan_results["errors"] | yarascan_results["errors"] | ||
|
|
||
| return {"issues": issues, "errors": errors, "results": results, "path": path} | ||
| return { | ||
| "issues": issues, | ||
| "errors": errors, | ||
| "results": results, | ||
| "path": path} | ||
|
|
||
| def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: | ||
| """ | ||
|
|
@@ -183,6 +209,11 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: | |
|
|
||
| rule_results: defaultdict[dict, list[dict]] = defaultdict(list) | ||
|
|
||
| # Define verbose rules that should only show "file-level" triggers | ||
| verbose_rules = { | ||
| "DETECT_FILE_obfuscator_dot_io", | ||
|
||
| } | ||
|
|
||
| rules_path = { | ||
| rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yar") | ||
| for rule_name in all_rules | ||
|
|
@@ -202,34 +233,61 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict: | |
| continue | ||
|
|
||
| scan_file_target_abspath = os.path.join(root, f) | ||
| scan_file_target_relpath = os.path.relpath(scan_file_target_abspath, path) | ||
| scan_file_target_relpath = os.path.relpath( | ||
| scan_file_target_abspath, path) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| matches = scan_rules.match(scan_file_target_abspath) | ||
| for m in matches: | ||
| for s in m.strings: | ||
| for i in s.instances: | ||
| rule_name = m.rule | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if rule_name in verbose_rules: | ||
| # For verbose rules, we only show that the rule was triggered in the matching file | ||
| # We're logging appearances once instead of | ||
| # issue-counting | ||
| file_already_reported = any( | ||
| finding["location"].startswith(scan_file_target_relpath + ":") | ||
| for finding in rule_results[rule_name] | ||
| ) | ||
|
|
||
| if not file_already_reported: | ||
| finding = { | ||
| "location": f"{scan_file_target_relpath}:{i.offset}", | ||
| "code": self.trim_code_snippet(str(i.matched_data)), | ||
| 'message': m.meta.get("description", f"{m.rule} rule matched") | ||
| } | ||
|
|
||
| # since yara can match the multiple times in the same file | ||
| # leading to finding several times the same word or pattern | ||
| # this dedup the matches | ||
| if [ | ||
| f | ||
| for f in rule_results[m.rule] | ||
| if finding["code"] == f["code"] | ||
| ]: | ||
| continue | ||
|
|
||
| issues += len(m.strings) | ||
| rule_results[m.rule].append(finding) | ||
| "location": f"{scan_file_target_relpath}:1", | ||
| "code": f'{"Rule triggered in file (matches hidden for brevity)"}', | ||
| 'message': m.meta.get( | ||
| "description", | ||
| f"{rule_name} rule matched")} | ||
| issues += 1 | ||
| rule_results[rule_name].append(finding) | ||
| else: | ||
| # For non-verbose rules, show detailed matches as | ||
| # before | ||
| for s in m.strings: | ||
| for i in s.instances: | ||
| finding = { | ||
| "location": f"{scan_file_target_relpath}:{i.offset}", | ||
| "code": self.trim_code_snippet(str(i.matched_data)), | ||
| 'message': m.meta.get("description", f"{rule_name} rule matched") | ||
| } | ||
|
|
||
| # since yara can match the multiple times in the same file | ||
| # leading to finding several times the same word or pattern | ||
| # this dedup the matches | ||
| if [ | ||
| f | ||
| for f in rule_results[rule_name] | ||
| if finding["code"] == f["code"] | ||
| ]: | ||
| continue | ||
|
|
||
| issues += len(m.strings) | ||
| rule_results[rule_name].append(finding) | ||
| except Exception as e: | ||
| errors["rules-all"] = f"failed to run rule: {str(e)}" | ||
|
|
||
| return {"results": results | rule_results, "errors": errors, "issues": issues} | ||
| return { | ||
| "results": results | rule_results, | ||
| "errors": errors, | ||
| "issues": issues} | ||
|
|
||
| def analyze_semgrep(self, path, rules=None) -> dict: | ||
| """ | ||
|
|
@@ -254,10 +312,8 @@ def analyze_semgrep(self, path, rules=None) -> dict: | |
| errors = {} | ||
| issues = 0 | ||
|
|
||
| rules_path = list(map( | ||
| lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), | ||
| all_rules | ||
| )) | ||
| rules_path = list(map(lambda rule_name: os.path.join( | ||
| SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules)) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if len(rules_path) == 0: | ||
| log.debug("No semgrep code rules to run") | ||
|
|
@@ -266,7 +322,8 @@ def analyze_semgrep(self, path, rules=None) -> dict: | |
| try: | ||
| log.debug(f"Running semgrep code rules against {path}") | ||
| response = self._invoke_semgrep(target=path, rules=rules_path) | ||
| rule_results = self._format_semgrep_response(response, targetpath=targetpath) | ||
| rule_results = self._format_semgrep_response( | ||
| response, targetpath=targetpath) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| issues += sum(len(res) for res in rule_results.values()) | ||
|
|
||
| results = results | rule_results | ||
|
|
@@ -278,7 +335,9 @@ def analyze_semgrep(self, path, rules=None) -> dict: | |
| def _invoke_semgrep(self, target: str, rules: Iterable[str]): | ||
| try: | ||
| SEMGREP_MAX_TARGET_BYTES = int( | ||
| os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT)) | ||
| os.getenv( | ||
| "GUARDDOG_SEMGREP_MAX_TARGET_BYTES", | ||
| MAX_BYTES_DEFAULT)) | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| SEMGREP_TIMEOUT = int( | ||
| os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT)) | ||
| cmd = ["semgrep"] | ||
|
|
@@ -295,7 +354,11 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]): | |
| cmd.append(f"--max-target-bytes={SEMGREP_MAX_TARGET_BYTES}") | ||
| cmd.append(target) | ||
| log.debug(f"Invoking semgrep with command line: {' '.join(cmd)}") | ||
| result = subprocess.run(cmd, capture_output=True, check=True, encoding="utf-8") | ||
| result = subprocess.run( | ||
| cmd, | ||
| capture_output=True, | ||
| check=True, | ||
| encoding="utf-8") | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return json.loads(str(result.stdout)) | ||
| except FileNotFoundError: | ||
| raise Exception("unable to find semgrep binary") | ||
|
|
@@ -349,9 +412,9 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None): | |
| file_path = os.path.abspath(result["path"]) | ||
| code = self.trim_code_snippet( | ||
| self.get_snippet( | ||
| file_path=file_path, start_line=start_line, end_line=end_line | ||
| ) | ||
| ) | ||
| file_path=file_path, | ||
| start_line=start_line, | ||
| end_line=end_line)) | ||
| if targetpath: | ||
| file_path = os.path.relpath(file_path, targetpath) | ||
|
|
||
|
|
@@ -370,7 +433,11 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None): | |
|
|
||
| return results | ||
|
|
||
| def get_snippet(self, file_path: str, start_line: int, end_line: int) -> str: | ||
| def get_snippet( | ||
| self, | ||
| file_path: str, | ||
| start_line: int, | ||
| end_line: int) -> str: | ||
| """ | ||
| Returns the code snippet between start_line and stop_line in a file | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| from typing import Type | ||
|
|
||
| from guarddog.analyzer.metadata import Detector | ||
| from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector | ||
| from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector | ||
| from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector | ||
|
|
||
| EXTENSION_METADATA_RULES = {} | ||
|
|
||
| classes: list[Type[Detector]] = [ | ||
| ExtensionEmptyInfoDetector, | ||
| ExtensionSuspiciousPublisherDetector, | ||
| ExtensionSuspiciousPermissionsDetector, | ||
| ] | ||
|
|
||
| for detectorClass in classes: | ||
| detectorInstance = detectorClass() # type: ignore | ||
| EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| """ Empty Information Detector for Extensions | ||
| Detects if an extension contains an empty description | ||
| """ | ||
| import logging | ||
| from typing import Optional | ||
|
|
||
| from guarddog.analyzer.metadata.empty_information import EmptyInfoDetector | ||
|
|
||
| log = logging.getLogger("guarddog") | ||
|
|
||
|
|
||
| class ExtensionEmptyInfoDetector(EmptyInfoDetector): | ||
| """Detects extensions with empty description information""" | ||
|
|
||
| def detect(self, | ||
| package_info, | ||
| path: Optional[str] = None, | ||
| name: Optional[str] = None, | ||
| version: Optional[str] = None) -> tuple[bool, | ||
| str]: | ||
|
|
||
| log.debug( | ||
| f"Running extension empty description heuristic on extension {name} version {version}") | ||
|
|
||
| if not package_info or not isinstance(package_info, dict): | ||
| return True, "Extension has no package information" | ||
|
|
||
| manifest = package_info.get("manifest", {}) | ||
| marketplace = package_info.get("marketplace", {}) | ||
| source = package_info.get("source", "unknown") | ||
|
|
||
| return self._detect_with_metadata(manifest, marketplace, source) | ||
|
|
||
| def _detect_with_metadata(self, | ||
tesnim5hamdouni marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| manifest: dict, | ||
| marketplace: dict, | ||
| source: str) -> tuple[bool, | ||
| str]: | ||
| """Detect empty information with pre-extracted metadata""" | ||
|
|
||
| manifest_description = manifest.get("description", "").strip() | ||
| manifest_display_name = manifest.get("displayName", "").strip() | ||
|
|
||
| if not manifest_description and not manifest_display_name: | ||
| return True, self.MESSAGE_TEMPLATE % "Extension Marketplace (manifest)" | ||
| return False, "" | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.