Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:

jobs:


type-check:
runs-on: ubuntu-latest
steps:
Expand All @@ -35,6 +36,22 @@ jobs:
- name: Type check with mypy
run: make type-check

format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
- name: Set up Python 3.10
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Format test with black
run: make format

lint:
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ lint:
flake8 guarddog --count --select=E9,F63,F7,F82 --show-source --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data
flake8 guarddog --count --max-line-length=120 --statistics --exclude tests/analyzer/sourcecode,tests/analyzer/metadata/resources,evaluator/data --ignore=E203,W503

format:
black --check guarddog && black guarddog || (black guarddog && exit 1)
black --check scripts && black scripts || (black scripts && exit 1)

test-semgrep-rules:
semgrep --metrics off --quiet --test --config guarddog/analyzer/sourcecode tests/analyzer/sourcecode

Expand Down
107 changes: 55 additions & 52 deletions guarddog/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
MAX_BYTES_DEFAULT = 10_000_000
SEMGREP_TIMEOUT_DEFAULT = 10

SOURCECODE_RULES_PATH = os.path.join(
os.path.dirname(__file__), "sourcecode"
)
SOURCECODE_RULES_PATH = os.path.join(os.path.dirname(__file__), "sourcecode")
log = logging.getLogger("guarddog")


Expand Down Expand Up @@ -68,12 +66,13 @@ def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None:
]

def analyze(
self,
path,
info=None,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None) -> dict:
self,
path,
info=None,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> dict:
"""
Analyzes a package in the given path

Expand Down Expand Up @@ -101,19 +100,16 @@ def analyze(
results = metadata_results["results"] | sourcecode_results["results"]
errors = metadata_results["errors"] | sourcecode_results["errors"]

return {
"issues": issues,
"errors": errors,
"results": results,
"path": path}
return {"issues": issues, "errors": errors, "results": results, "path": path}

def analyze_metadata(
self,
path: str,
info,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None) -> dict:
self,
path: str,
info,
rules=None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> dict:
"""
Analyzes the metadata of a given package

Expand Down Expand Up @@ -142,7 +138,9 @@ def analyze_metadata(
for rule in all_rules:
try:
log.debug(f"Running rule {rule} against package '{name}'")
rule_matches, message = self.metadata_detectors[rule].detect(info, path, name, version)
rule_matches, message = self.metadata_detectors[rule].detect(
info, path, name, version
)
results[rule] = None
if rule_matches:
issues += 1
Expand Down Expand Up @@ -172,11 +170,7 @@ def analyze_sourcecode(self, path, rules=None) -> dict:
results = semgrepscan_results["results"] | yarascan_results["results"]
errors = semgrepscan_results["errors"] | yarascan_results["errors"]

return {
"issues": issues,
"errors": errors,
"results": results,
"path": path}
return {"issues": issues, "errors": errors, "results": results, "path": path}

def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
"""
Expand Down Expand Up @@ -221,7 +215,9 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
continue

scan_file_target_abspath = os.path.join(root, f)
scan_file_target_relpath = os.path.relpath(scan_file_target_abspath, path)
scan_file_target_relpath = os.path.relpath(
scan_file_target_abspath, path
)

matches = scan_rules.match(scan_file_target_abspath)
for m in matches:
Expand All @@ -231,7 +227,9 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
finding = {
"location": f"{scan_file_target_relpath}:{i.offset}",
"code": self.trim_code_snippet(str(i.matched_data)),
'message': m.meta.get("description", f"{m.rule} rule matched")
"message": m.meta.get(
"description", f"{m.rule} rule matched"
),
}

# since yara can match the multiple times in the same file
Expand All @@ -249,10 +247,7 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
except Exception as e:
errors["rules-all"] = f"failed to run rule: {str(e)}"

return {
"results": results | rule_results,
"errors": errors,
"issues": issues}
return {"results": results | rule_results, "errors": errors, "issues": issues}

def analyze_semgrep(self, path, rules=None) -> dict:
"""
Expand All @@ -277,8 +272,14 @@ def analyze_semgrep(self, path, rules=None) -> dict:
errors = {}
issues = 0

rules_path = list(map(
lambda rule_name: os.path.join(SOURCECODE_RULES_PATH, f"{rule_name}.yml"), all_rules))
rules_path = list(
map(
lambda rule_name: os.path.join(
SOURCECODE_RULES_PATH, f"{rule_name}.yml"
),
all_rules,
)
)

if len(rules_path) == 0:
log.debug("No semgrep code rules to run")
Expand All @@ -287,7 +288,9 @@ def analyze_semgrep(self, path, rules=None) -> dict:
try:
log.debug(f"Running semgrep code rules against {path}")
response = self._invoke_semgrep(target=path, rules=rules_path)
rule_results = self._format_semgrep_response(response, targetpath=targetpath)
rule_results = self._format_semgrep_response(
response, targetpath=targetpath
)
issues += sum(len(res) for res in rule_results.values())

results = results | rule_results
Expand All @@ -299,9 +302,11 @@ def analyze_semgrep(self, path, rules=None) -> dict:
def _invoke_semgrep(self, target: str, rules: Iterable[str]):
try:
SEMGREP_MAX_TARGET_BYTES = int(
os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT))
os.getenv("GUARDDOG_SEMGREP_MAX_TARGET_BYTES", MAX_BYTES_DEFAULT)
)
SEMGREP_TIMEOUT = int(
os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT))
os.getenv("GUARDDOG_SEMGREP_TIMEOUT", SEMGREP_TIMEOUT_DEFAULT)
)
cmd = ["semgrep"]
for rule in rules:
cmd.extend(["--config", rule])
Expand All @@ -316,7 +321,9 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]):
cmd.append(f"--max-target-bytes={SEMGREP_MAX_TARGET_BYTES}")
cmd.append(target)
log.debug(f"Invoking semgrep with command line: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, check=True, encoding="utf-8")
result = subprocess.run(
cmd, capture_output=True, check=True, encoding="utf-8"
)
return json.loads(str(result.stdout))
except FileNotFoundError:
raise Exception("unable to find semgrep binary")
Expand Down Expand Up @@ -370,18 +377,18 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
file_path = os.path.abspath(result["path"])
code = self.trim_code_snippet(
self.get_snippet(
file_path=file_path,
start_line=start_line,
end_line=end_line))
file_path=file_path, start_line=start_line, end_line=end_line
)
)
if targetpath:
file_path = os.path.relpath(file_path, targetpath)

location = file_path + ":" + str(start_line)

finding = {
'location': location,
'code': code,
'message': result["extra"]["message"]
"location": location,
"code": code,
"message": result["extra"]["message"],
}

rule_results = results[rule_name]
Expand All @@ -391,11 +398,7 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):

return results

def get_snippet(
self,
file_path: str,
start_line: int,
end_line: int) -> str:
def get_snippet(self, file_path: str, start_line: int, end_line: int) -> str:
"""
Returns the code snippet between start_line and stop_line in a file

Expand All @@ -409,7 +412,7 @@ def get_snippet(
"""
snippet = []
try:
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
for current_line_number, line in enumerate(file, start=1):
if start_line <= current_line_number <= end_line:
snippet.append(line)
Expand All @@ -420,12 +423,12 @@ def get_snippet(
except Exception as e:
log.error(f"Error reading file {file_path}: {str(e)}")

return ''.join(snippet)
return "".join(snippet)

# Makes sure the matching code to be displayed isn't too long
def trim_code_snippet(self, code):
THRESHOLD = 250
if len(code) > THRESHOLD:
return code[: THRESHOLD - 10] + '...' + code[len(code) - 10:]
return code[: THRESHOLD - 10] + "..." + code[len(code) - 10 :]
else:
return code
12 changes: 6 additions & 6 deletions guarddog/analyzer/metadata/bundled_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class BundledBinary(Detector):
# magic bytes are the first few bytes of a file that can be used to identify the file type
# regardless of their extension
magic_bytes = {
"exe": b"\x4D\x5A",
"elf": b"\x7F\x45\x4C\x46",
"macho32": b"\xFE\xED\xFA\xCE",
"macho64": b"\xFE\xED\xFA\xCF",
"exe": b"\x4d\x5a",
"elf": b"\x7f\x45\x4c\x46",
"macho32": b"\xfe\xed\xfa\xce",
"macho64": b"\xfe\xed\xfa\xcf",
}

def __init__(self):
Expand All @@ -40,7 +40,7 @@ def format_file(file: str, kind: str) -> str:
def sha256(file: str) -> str:
with open(file, "rb") as f:
hasher = hashlib.sha256()
while (chunk := f.read(4096)):
while chunk := f.read(4096):
hasher.update(chunk)
return hasher.hexdigest()

Expand All @@ -65,7 +65,7 @@ def sha256(file: str) -> str:
if not bin_files:
return False, ""

output_lines = '\n'.join(
output_lines = "\n".join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is irritating tbh. Single quotes for single chars is totally acceptable.

f"{digest}: {', '.join(files)}" for digest, files in bin_files.items()
)
return True, f"Binary file/s detected in package:\n{output_lines}"
Expand Down
4 changes: 3 additions & 1 deletion guarddog/analyzer/metadata/deceptive_author.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ def get_suspicious_email_domains() -> set:
# read internal maintained list of placeholder email domains
# this domains are usually used by authors who want to don't want to reveal their real email
placeholder_email_domains_data = None
with open(placeholder_email_domains_path, "r") as placeholder_email_domains_file:
with open(
placeholder_email_domains_path, "r"
) as placeholder_email_domains_file:
placeholder_email_domains_data = set(
placeholder_email_domains_file.read().split("\n")
)
Expand Down
9 changes: 7 additions & 2 deletions guarddog/analyzer/metadata/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ def __init__(self, name: str, description: str) -> None:

# returns (ruleMatches, message)
@abstractmethod
def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None,
version: Optional[str] = None) -> tuple[bool, Optional[str]]:
def detect(
self,
package_info,
path: Optional[str] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> tuple[bool, Optional[str]]:
pass # pragma: no cover

def get_name(self) -> str:
Expand Down
11 changes: 8 additions & 3 deletions guarddog/analyzer/metadata/empty_information.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ class EmptyInfoDetector(Detector):
def __init__(self):
super().__init__(
name="empty_information",
description="Identify packages with an empty description field"
description="Identify packages with an empty description field",
)

@abstractmethod
def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None,
version: Optional[str] = None) -> tuple[bool, str]:
def detect(
self,
package_info,
path: Optional[str] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> tuple[bool, str]:
"""
Uses a package's information from PyPI's JSON API to determine
if the package has an empty description
Expand Down
7 changes: 4 additions & 3 deletions guarddog/analyzer/metadata/go/typosquatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def _get_top_packages(self) -> set:

if top_packages_information is None:
raise Exception(
f"Could not retrieve top Go packages from {top_packages_path}")
f"Could not retrieve top Go packages from {top_packages_path}"
)

return set(top_packages_information)

Expand Down Expand Up @@ -104,8 +105,8 @@ def _get_confused_forms(self, package_name) -> list:
continue

# Get form when replacing or removing go/golang term
replaced_form = terms[:i] + [confused_term] + terms[i + 1:]
removed_form = terms[:i] + terms[i + 1:]
replaced_form = terms[:i] + [confused_term] + terms[i + 1 :]
removed_form = terms[:i] + terms[i + 1 :]

for form in (replaced_form, removed_form):
confused_forms.append("-".join(form))
Expand Down
9 changes: 7 additions & 2 deletions guarddog/analyzer/metadata/npm/bundled_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@


class NPMBundledBinary(BundledBinary):
def detect(self, package_info, path: Optional[str] = None, name: Optional[str] = None,
version: Optional[str] = None) -> tuple[bool, str]:
def detect(
self,
package_info,
path: Optional[str] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> tuple[bool, str]:
return super().detect(package_info, path, name, version)
2 changes: 1 addition & 1 deletion guarddog/analyzer/metadata/npm/deceptive_author.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Deceptive Author Detector
"""Deceptive Author Detector

Detects when an author of is using a disposable email
"""
Expand Down
Loading