Skip to content

Commit be780e6

Browse files
committed
refactoring to use Dependency and DependencyVersion types
1 parent 36ced10 commit be780e6

16 files changed

+750
-397
lines changed

guarddog/cli.py

Lines changed: 18 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66

77
from functools import reduce
8-
import json as js
98
import logging
109
import os
1110
import sys
@@ -14,12 +13,12 @@
1413

1514
import click
1615
from prettytable import PrettyTable
17-
from termcolor import colored
1816

1917
from guarddog.analyzer.metadata import get_metadata_detectors
2018
from guarddog.analyzer.sourcecode import get_sourcecode_rules
2119
from guarddog.ecosystems import ECOSYSTEM
22-
from guarddog.reporters.sarif import report_verify_sarif
20+
from guarddog.reporters.reporter_factory import ReporterFactory, ReporterType
21+
2322
from guarddog.scanners import get_package_scanner, get_project_scanner
2423
from guarddog.utils.archives import safe_extract
2524

@@ -127,7 +126,7 @@ def _get_all_rules(ecosystem: ECOSYSTEM) -> set[str]:
127126

128127
def _get_rule_param(
129128
rules: tuple[str, ...], exclude_rules: tuple[str, ...], ecosystem: ECOSYSTEM
130-
) -> Optional[set]:
129+
) -> Optional[set[str]]:
131130
"""
132131
This function should return None if no rules are provided
133132
Else a set of rules to be used for scanning
@@ -162,28 +161,20 @@ def _verify(
162161
log.error(f"Command verify is not supported for ecosystem {ecosystem}")
163162
exit(1)
164163

165-
def display_result(result: dict) -> None:
166-
identifier = (
167-
result["dependency"]
168-
if result["version"] is None
169-
else f"{result['dependency']} version {result['version']}"
170-
)
171-
if output_format is None:
172-
print_scan_results(result.get("result"), identifier)
173-
174-
if len(result.get("errors", [])) > 0:
175-
print_errors(result.get("error"), identifier)
164+
dependencies, results = scanner.scan_local(path=path,rules=rule_param )
176165

177-
results = scanner.scan_local(path, rule_param, display_result)
178-
if output_format == "json":
179-
return_value = js.dumps(results)
166+
rule_docs = list(rule_param or _get_all_rules(ecosystem=ecosystem))
180167

181-
if output_format == "sarif":
182-
sarif_rules = _get_all_rules(ecosystem)
183-
return_value = report_verify_sarif(path, list(sarif_rules), results, ecosystem)
168+
reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format))
169+
stdout, stderr = reporter.render_verify(
170+
dependency_files=dependencies,
171+
rule_names=rule_docs,
172+
scan_results=results,
173+
ecosystem=ecosystem,
174+
)
184175

185-
if output_format is not None:
186-
print(return_value)
176+
sys.stdout.write(stdout)
177+
sys.stderr.write(stderr)
187178

188179
if exit_non_zero_on_finding:
189180
exit_with_status_code([result["result"] for result in results])
@@ -231,10 +222,10 @@ def _scan(
231222
log.error(f"Error occurred while scanning target {identifier}: '{e}'\n")
232223
sys.exit(1)
233224

234-
if output_format == "json":
235-
print(js.dumps(result))
236-
else:
237-
print_scan_results(result, result["package"])
225+
reporter = ReporterFactory.create_reporter(ReporterType.from_str(output_format))
226+
stdout, stderr = reporter.render_scan(result)
227+
sys.stdout.write(stdout)
228+
sys.stderr.write(stderr)
238229

239230
if exit_non_zero_on_finding:
240231
exit_with_status_code([result])
@@ -359,83 +350,6 @@ def scan(
359350
exit_non_zero_on_finding,
360351
ECOSYSTEM.PYPI,
361352
)
362-
363-
364-
# Pretty prints scan results for the console
365-
def print_scan_results(results, identifier):
366-
num_issues = results.get("issues")
367-
errors = results.get("errors", [])
368-
369-
if num_issues == 0:
370-
print(
371-
"Found "
372-
+ colored("0 potentially malicious indicators", "green", attrs=["bold"])
373-
+ " scanning "
374-
+ colored(identifier, None, attrs=["bold"])
375-
)
376-
print()
377-
else:
378-
print(
379-
"Found "
380-
+ colored(
381-
str(num_issues) + " potentially malicious indicators",
382-
"red",
383-
attrs=["bold"],
384-
)
385-
+ " in "
386-
+ colored(identifier, None, attrs=["bold"])
387-
)
388-
print()
389-
390-
findings = results.get("results", [])
391-
for finding in findings:
392-
description = findings[finding]
393-
if isinstance(description, str): # package metadata
394-
print(colored(finding, None, attrs=["bold"]) + ": " + description)
395-
print()
396-
elif isinstance(description, list): # semgrep rule result:
397-
source_code_findings = description
398-
print(
399-
colored(finding, None, attrs=["bold"])
400-
+ ": found "
401-
+ str(len(source_code_findings))
402-
+ " source code matches"
403-
)
404-
for finding in source_code_findings:
405-
print(
406-
" * "
407-
+ finding["message"]
408-
+ " at "
409-
+ finding["location"]
410-
+ "\n "
411-
+ format_code_line_for_output(finding["code"])
412-
)
413-
print()
414-
415-
if len(errors) > 0:
416-
print_errors(errors, identifier)
417-
print("\n")
418-
419-
420-
def print_errors(errors, identifier):
421-
print(
422-
colored("Some rules failed to run while scanning " + identifier + ":", "yellow")
423-
)
424-
print()
425-
for rule in errors:
426-
print(f"* {rule}: {errors[rule]}")
427-
print()
428-
429-
430-
def format_code_line_for_output(code):
431-
return " " + colored(
432-
code.strip().replace("\n", "\n ").replace("\t", " "),
433-
None,
434-
"on_red",
435-
attrs=["bold"],
436-
)
437-
438-
439353
# Given the results, exit with the appropriate status code
440354
def exit_with_status_code(results):
441355
for result in results:

guarddog/reporters/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from guarddog.scanners.scanner import DependencyFile
2+
from typing import List
3+
from guarddog.ecosystems import ECOSYSTEM
4+
5+
6+
class BaseReporter:
7+
"""
8+
Base class for all reporters.
9+
"""
10+
11+
@staticmethod
12+
def render_scan(scan_results: dict) -> tuple[str, str]:
13+
"""
14+
Report the scans results.
15+
"""
16+
raise NotImplementedError("Subclasses must implement this method.")
17+
18+
@staticmethod
19+
def render_verify(
20+
dependency_files: List[DependencyFile],
21+
rule_names: list[str],
22+
scan_results: list[dict],
23+
ecosystem: ECOSYSTEM,
24+
) -> tuple[str, str]:
25+
"""
26+
Report the scans results.
27+
"""
28+
raise NotImplementedError("Subclasses must implement this method.")
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from termcolor import colored
2+
from typing import Generator
3+
from guarddog.reporters import BaseReporter
4+
from typing import List
5+
from guarddog.scanners.scanner import DependencyFile
6+
from guarddog.ecosystems import ECOSYSTEM
7+
8+
9+
class HumanReadableReporter(BaseReporter):
10+
"""
11+
HumanReadableReporter is a class that formats and prints scan results in a human-readable format.
12+
"""
13+
14+
@staticmethod
15+
def print_errors(identifier: str, results: dict) -> Generator:
16+
errors = results.get("errors", [])
17+
if not errors:
18+
return
19+
yield ("")
20+
yield (
21+
colored(
22+
"Some rules failed to run while scanning " + identifier + ":",
23+
"yellow",
24+
)
25+
)
26+
yield ("")
27+
for rule in errors:
28+
yield (f"* {rule}: {errors[rule]}")
29+
30+
@staticmethod
31+
def print_scan_results(identifier:str, results:dict) -> Generator:
32+
33+
def _format_code_line_for_output(code) -> str:
34+
return " " + colored(
35+
code.strip().replace("\n", "\n ").replace("\t", " "),
36+
None,
37+
"on_red",
38+
attrs=["bold"],
39+
)
40+
num_issues = results.get("issues")
41+
42+
if num_issues == 0:
43+
yield (
44+
"Found "
45+
+ colored(
46+
"0 potentially malicious indicators", "green", attrs=["bold"]
47+
)
48+
+ " scanning "
49+
+ colored(identifier, None, attrs=["bold"])
50+
)
51+
yield ("")
52+
else:
53+
yield (
54+
"Found "
55+
+ colored(
56+
str(num_issues) + " potentially malicious indicators",
57+
"red",
58+
attrs=["bold"],
59+
)
60+
+ " in "
61+
+ colored(identifier, None, attrs=["bold"])
62+
)
63+
yield ("")
64+
65+
findings = results.get("results", [])
66+
for finding in findings:
67+
description = findings[finding]
68+
if isinstance(description, str): # package metadata
69+
yield (
70+
colored(finding, None, attrs=["bold"]) + ": " + description
71+
)
72+
yield ("")
73+
elif isinstance(description, list): # semgrep rule result:
74+
source_code_findings = description
75+
yield (
76+
colored(finding, None, attrs=["bold"])
77+
+ ": found "
78+
+ str(len(source_code_findings))
79+
+ " source code matches"
80+
)
81+
for finding in source_code_findings:
82+
yield (
83+
" * "
84+
+ finding["message"]
85+
+ " at "
86+
+ finding["location"]
87+
+ "\n "
88+
+ _format_code_line_for_output(finding["code"])
89+
)
90+
yield ("")
91+
92+
@staticmethod
93+
def render_scan(scan_results: dict) -> tuple[str, str]:
94+
"""
95+
Report the scans results in a human-readable format.
96+
97+
Args:
98+
scan_results (dict): The scan results to be reported.
99+
"""
100+
return (
101+
"\n".join(
102+
HumanReadableReporter.print_scan_results(
103+
identifier=scan_results["package"],
104+
results=scan_results)),
105+
"\n".join(
106+
HumanReadableReporter.print_errors(
107+
identifier=scan_results["package"],
108+
results=scan_results
109+
)
110+
),
111+
)
112+
113+
@staticmethod
114+
def render_verify(
115+
dependency_files: List[DependencyFile],
116+
rule_names: list[str],
117+
scan_results: list[dict],
118+
ecosystem: ECOSYSTEM,
119+
) -> tuple[str, str]:
120+
return (
121+
"\n".join(
122+
map(lambda s: "\n".join(
123+
HumanReadableReporter.print_scan_results(
124+
identifier=s["dependency"],
125+
results=s["result"]
126+
),
127+
), scan_results)
128+
),
129+
"\n".join(
130+
map(lambda s: "\n".join(
131+
HumanReadableReporter.print_errors(
132+
identifier=s["dependency"],
133+
results=s["result"]
134+
),
135+
), scan_results)
136+
)
137+
)
138+

guarddog/reporters/json.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import json
2+
from typing import List
3+
from guarddog.scanners.scanner import DependencyFile
4+
from guarddog.ecosystems import ECOSYSTEM
5+
6+
from guarddog.reporters import BaseReporter
7+
8+
9+
class JsonReporter(BaseReporter):
10+
@staticmethod
11+
def render_verify(
12+
dependency_files: List[DependencyFile],
13+
rule_names: list[str],
14+
scan_results: list[dict],
15+
ecosystem: ECOSYSTEM,
16+
) -> tuple[str, str]:
17+
return json.dumps(scan_results), ""
18+
19+
@staticmethod
20+
def render_scan(scan_results: dict) -> tuple[str, str]:
21+
"""
22+
Report the scans results in a json format.
23+
24+
Args:
25+
scan_results (dict): The scan results to be reported.
26+
"""
27+
# this reporter will output the errors in stdout
28+
return json.dumps(scan_results), ""

0 commit comments

Comments
 (0)