Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
<img src="./docs/images/logo.png" alt="GuardDog" width="300" />
</p>

GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages or Go modules. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata.
GuardDog is a CLI tool that allows to identify malicious PyPI and npm packages, Go modules, or VSCode extensions. It runs a set of heuristics on the package source code (through Semgrep rules) and on the package metadata.

GuardDog can be used to scan local or remote PyPI and npm packages or Go modules using any of the available [heuristics](#heuristics).
GuardDog can be used to scan local or remote PyPI and npm packages, Go modules, or VSCode extensions using any of the available [heuristics](#heuristics).

It downloads and scans code from:

* NPM: Packages hosted in [npmjs.org](https://www.npmjs.com/)
* PyPI: Source files (tar.gz) packages hosted in [PyPI.org](https://pypi.org/)
* Go: GoLang source files of repositories hosted in [GitHub.com](https://github.com)
* GitHub Actions: Javascript source files of repositories hosted in [GitHub.com](https://github.com)
* VSCode Extensions: Extensions (.vsix) packages hosted in [marketplace.visualstudio.com](https://marketplace.visualstudio.com/)

![GuardDog demo usage](docs/images/demo.png)

Expand Down Expand Up @@ -78,6 +79,15 @@ guarddog github_action scan DataDog/synthetics-ci-github-action

guarddog github_action verify /tmp/repo/.github/workflows/main.yml

# Scan VSCode extensions from the marketplace
guarddog extension scan ms-python.python

# Scan a specific version of a VSCode extension
guarddog extension scan ms-python.python --version 2023.20.0

# Scan a local VSCode extension directory or VSIX archive
guarddog extension scan /tmp/my-extension/

# Run in debug mode
guarddog --log-level debug npm scan express
```
Expand Down Expand Up @@ -192,6 +202,37 @@ Source code heuristics:
| npm-steganography | Identify when a package retrieves hidden data from an image and executes it |
| npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL |
| npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system |


### VSCode Extensions

For VSCode extensions that are implemented in JavaScript/TypeScript,
GuardDog will run the same source code heuristics as for npm packages, plus specialized YARA rules.

Source code heuristics:

| **Heuristic** | **Description** |
|:-------------:|:---------------:|
| npm-serialize-environment | Identify when a package serializes 'process.env' to exfiltrate environment variables |
| npm-obfuscation | Identify when a package uses a common obfuscation method often used by malware |
| npm-silent-process-execution | Identify when a package silently executes an executable |
| shady-links | Identify when a package contains an URL to a domain with a suspicious extension |
| npm-exec-base64 | Identify when a package dynamically executes code through 'eval' |
| npm-install-script | Identify when a package has a pre or post-install script automatically running commands |
| npm-steganography | Identify when a package retrieves hidden data from an image and executes it |
| npm-dll-hijacking | Identifies when a malicious package manipulates a trusted application into loading a malicious DLL |
| npm-exfiltrate-sensitive-data | Identify when a package reads and exfiltrates sensitive data from the local system |
| extension_obfuscator_dot_io | Detect usage of obfuscator.io service patterns |
| extension_powershell_policy_bypass | Identify PowerShell execution policy bypass attempts |
| extension_suspicious_passwd_access_linux | Detect suspicious access to Linux password files |

Metadata heuristics:

| **Heuristic** | **Description** |
|:-------------:|:---------------:|
| empty_information | Identify extensions with an empty description field |
| suspicious-permissions | Identify extensions with potentially dangerous permissions or suspicious characteristics |
| suspicious-publisher | Identify extensions with suspicious publisher verification status and typosquatting |
<!-- END_RULE_LIST -->

## Custom Rules
Expand Down
147 changes: 107 additions & 40 deletions guarddog/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None:
".semgrep_logs",
]

def analyze(self, path, info=None, rules=None, name: Optional[str] = None, version: Optional[str] = None) -> dict:
def analyze(
self,
path,
info=None,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None) -> dict:
"""
Analyzes a package in the given path

Expand All @@ -87,18 +93,33 @@ def analyze(self, path, info=None, rules=None, name: Optional[str] = None, versi
sourcecode_results = None

# populate results, errors, and number of issues
metadata_results = self.analyze_metadata(path, info, rules, name, version)
metadata_results = self.analyze_metadata(
path, info, rules, name, version)
sourcecode_results = self.analyze_sourcecode(path, rules)

# Concatenate dictionaries together
issues = metadata_results["issues"] + sourcecode_results["issues"]
results = metadata_results["results"] | sourcecode_results["results"]
errors = metadata_results["errors"] | sourcecode_results["errors"]

return {"issues": issues, "errors": errors, "results": results, "path": path}

def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = None,
version: Optional[str] = None) -> dict:
output = {
"issues": issues,
"errors": errors,
"results": results,
"path": path}
# Including extension info - pending discussion
# if info is not None:
# output["package_info"] = info

return output

def analyze_metadata(
self,
path: str,
info,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None) -> dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this entire block changed nothing, let's keep the new analyze_metadata formated one

"""
Analyzes the metadata of a given package

Expand Down Expand Up @@ -127,7 +148,8 @@ def analyze_metadata(self, path: str, info, rules=None, name: Optional[str] = No
for rule in all_rules:
try:
log.debug(f"Running rule {rule} against package '{name}'")
rule_matches, message = self.metadata_detectors[rule].detect(info, path, name, version)
rule_matches, message = self.metadata_detectors[rule].detect(
info, path, name, version)
results[rule] = None
if rule_matches:
issues += 1
Expand Down Expand Up @@ -157,7 +179,11 @@ def analyze_sourcecode(self, path, rules=None) -> dict:
results = semgrepscan_results["results"] | yarascan_results["results"]
errors = semgrepscan_results["errors"] | yarascan_results["errors"]

return {"issues": issues, "errors": errors, "results": results, "path": path}
return {
"issues": issues,
"errors": errors,
"results": results,
"path": path}

def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
"""
Expand All @@ -183,6 +209,11 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:

rule_results: defaultdict[dict, list[dict]] = defaultdict(list)

# Define verbose rules that should only show "file-level" triggers
verbose_rules = {
"DETECT_FILE_obfuscator_dot_io",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this should not be hardcoded, let's find another way

}

rules_path = {
rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yar")
for rule_name in all_rules
Expand All @@ -202,34 +233,61 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
continue

scan_file_target_abspath = os.path.join(root, f)
scan_file_target_relpath = os.path.relpath(scan_file_target_abspath, path)
scan_file_target_relpath = os.path.relpath(
scan_file_target_abspath, path)

matches = scan_rules.match(scan_file_target_abspath)
for m in matches:
for s in m.strings:
for i in s.instances:
rule_name = m.rule

if rule_name in verbose_rules:
# For verbose rules, we only show that the rule was triggered in the matching file
# We're logging appearances once instead of
# issue-counting
file_already_reported = any(
finding["location"].startswith(scan_file_target_relpath + ":")
for finding in rule_results[rule_name]
)

if not file_already_reported:
finding = {
"location": f"{scan_file_target_relpath}:{i.offset}",
"code": self.trim_code_snippet(str(i.matched_data)),
'message': m.meta.get("description", f"{m.rule} rule matched")
}

# since yara can match the multiple times in the same file
# leading to finding several times the same word or pattern
# this dedup the matches
if [
f
for f in rule_results[m.rule]
if finding["code"] == f["code"]
]:
continue

issues += len(m.strings)
rule_results[m.rule].append(finding)
"location": f"{scan_file_target_relpath}:1",
"code": f'{"Rule triggered in file (matches hidden for brevity)"}',
'message': m.meta.get(
"description",
f"{rule_name} rule matched")}
issues += 1
rule_results[rule_name].append(finding)
else:
# For non-verbose rules, show detailed matches as
# before
for s in m.strings:
for i in s.instances:
finding = {
"location": f"{scan_file_target_relpath}:{i.offset}",
"code": self.trim_code_snippet(str(i.matched_data)),
'message': m.meta.get("description", f"{rule_name} rule matched")
}

# since yara can match the multiple times in the same file
# leading to finding several times the same word or pattern
# this dedup the matches
if [
f
for f in rule_results[rule_name]
if finding["code"] == f["code"]
]:
continue

issues += len(m.strings)
rule_results[rule_name].append(finding)
except Exception as e:
errors["rules-all"] = f"failed to run rule: {str(e)}"

return {"results": results | rule_results, "errors": errors, "issues": issues}
return {
"results": results | rule_results,
"errors": errors,
"issues": issues}

def analyze_semgrep(self, path, rules=None) -> dict:
"""
Expand All @@ -254,10 +312,8 @@ def analyze_semgrep(self, path, rules=None) -> dict:
errors = {}
issues = 0

rules_path = list(map(
lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"),
all_rules
))
rules_path = list(map(lambda rule_name: os.path.join(
SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules))

if len(rules_path) == 0:
log.debug("No semgrep code rules to run")
Expand All @@ -266,7 +322,8 @@ def analyze_semgrep(self, path, rules=None) -> dict:
try:
log.debug(f"Running semgrep code rules against {path}")
response = self._invoke_semgrep(target=path, rules=rules_path)
rule_results = self._format_semgrep_response(response, targetpath=targetpath)
rule_results = self._format_semgrep_response(
response, targetpath=targetpath)
issues += sum(len(res) for res in rule_results.values())

results = results | rule_results
Expand All @@ -278,7 +335,9 @@ def analyze_semgrep(self, path, rules=None) -> dict:
def _invoke_semgrep(self, target: str, rules: Iterable[str]):
try:
SEMGREP_MAX_TARGET_BYTES = int(
os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT))
os.getenv(
"GUARDDOG_SEMGREP_MAX_TARGET_BYTES",
MAX_BYTES_DEFAULT))
SEMGREP_TIMEOUT = int(
os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT))
cmd = ["semgrep"]
Expand All @@ -295,7 +354,11 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]):
cmd.append(f"--max-target-bytes={SEMGREP_MAX_TARGET_BYTES}")
cmd.append(target)
log.debug(f"Invoking semgrep with command line: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, check=True, encoding="utf-8")
result = subprocess.run(
cmd,
capture_output=True,
check=True,
encoding="utf-8")
return json.loads(str(result.stdout))
except FileNotFoundError:
raise Exception("unable to find semgrep binary")
Expand Down Expand Up @@ -349,9 +412,9 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
file_path = os.path.abspath(result["path"])
code = self.trim_code_snippet(
self.get_snippet(
file_path=file_path, start_line=start_line, end_line=end_line
)
)
file_path=file_path,
start_line=start_line,
end_line=end_line))
if targetpath:
file_path = os.path.relpath(file_path, targetpath)

Expand All @@ -370,7 +433,11 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):

return results

def get_snippet(self, file_path: str, start_line: int, end_line: int) -> str:
def get_snippet(
self,
file_path: str,
start_line: int,
end_line: int) -> str:
"""
Returns the code snippet between start_line and stop_line in a file

Expand Down
3 changes: 3 additions & 0 deletions guarddog/analyzer/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from guarddog.analyzer.metadata.pypi import PYPI_METADATA_RULES
from guarddog.analyzer.metadata.go import GO_METADATA_RULES
from guarddog.analyzer.metadata.github_action import GITHUB_ACTION_METADATA_RULES
from guarddog.analyzer.metadata.extension import EXTENSION_METADATA_RULES
from guarddog.ecosystems import ECOSYSTEM


Expand All @@ -16,3 +17,5 @@ def get_metadata_detectors(ecosystem: ECOSYSTEM) -> dict[str, Detector]:
return GO_METADATA_RULES
case ECOSYSTEM.GITHUB_ACTION:
return GITHUB_ACTION_METADATA_RULES
case ECOSYSTEM.EXTENSION:
return EXTENSION_METADATA_RULES
18 changes: 18 additions & 0 deletions guarddog/analyzer/metadata/extension/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Type

from guarddog.analyzer.metadata import Detector
from guarddog.analyzer.metadata.extension.empty_information import ExtensionEmptyInfoDetector
from guarddog.analyzer.metadata.extension.suspicious_publisher import ExtensionSuspiciousPublisherDetector
from guarddog.analyzer.metadata.extension.suspicious_permissions import ExtensionSuspiciousPermissionsDetector

EXTENSION_METADATA_RULES = {}

classes: list[Type[Detector]] = [
ExtensionEmptyInfoDetector,
ExtensionSuspiciousPublisherDetector,
ExtensionSuspiciousPermissionsDetector,
]

for detectorClass in classes:
detectorInstance = detectorClass() # type: ignore
EXTENSION_METADATA_RULES[detectorInstance.get_name()] = detectorInstance
47 changes: 47 additions & 0 deletions guarddog/analyzer/metadata/extension/empty_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
""" Empty Information Detector for Extensions
Detects if an extension contains an empty description
"""
import logging
from typing import Optional

from guarddog.analyzer.metadata.empty_information import EmptyInfoDetector

log = logging.getLogger("guarddog")


class ExtensionEmptyInfoDetector(EmptyInfoDetector):
"""Detects extensions with empty description information"""

def detect(self,
package_info,
path: Optional[str] = None,
name: Optional[str] = None,
version: Optional[str] = None) -> tuple[bool,
str]:

log.debug(
f"Running extension empty description heuristic on extension {name} version {version}")

if not package_info or not isinstance(package_info, dict):
return True, "Extension has no package information"

manifest = package_info.get("manifest", {})
marketplace = package_info.get("marketplace", {})
source = package_info.get("source", "unknown")

return self._detect_with_metadata(manifest, marketplace, source)

def _detect_with_metadata(self,
manifest: dict,
marketplace: dict,
source: str) -> tuple[bool,
str]:
"""Detect empty information with pre-extracted metadata"""

manifest_description = manifest.get("description", "").strip()
manifest_display_name = manifest.get("displayName", "").strip()

if not manifest_description and not manifest_display_name:
return True, self.MESSAGE_TEMPLATE % "Extension Marketplace (manifest)"
return False, ""
Loading