1616MAX_BYTES_DEFAULT  =  10_000_000 
1717SEMGREP_TIMEOUT_DEFAULT  =  10 
1818
19- SOURCECODE_RULES_PATH  =  os .path .join (
20-     os .path .dirname (__file__ ), "sourcecode" 
21- )
19+ SOURCECODE_RULES_PATH  =  os .path .join (os .path .dirname (__file__ ), "sourcecode" )
2220log  =  logging .getLogger ("guarddog" )
2321
2422
@@ -68,12 +66,13 @@ def __init__(self, ecosystem=ECOSYSTEM.PYPI) -> None:
6866        ]
6967
7068    def  analyze (
71-             self ,
72-             path ,
73-             info = None ,
74-             rules = None ,
75-             name : Optional [str ] =  None ,
76-             version : Optional [str ] =  None ) ->  dict :
69+         self ,
70+         path ,
71+         info = None ,
72+         rules = None ,
73+         name : Optional [str ] =  None ,
74+         version : Optional [str ] =  None ,
75+     ) ->  dict :
7776        """ 
7877        Analyzes a package in the given path 
7978
@@ -101,19 +100,16 @@ def analyze(
101100        results  =  metadata_results ["results" ] |  sourcecode_results ["results" ]
102101        errors  =  metadata_results ["errors" ] |  sourcecode_results ["errors" ]
103102
104-         return  {
105-             "issues" : issues ,
106-             "errors" : errors ,
107-             "results" : results ,
108-             "path" : path }
103+         return  {"issues" : issues , "errors" : errors , "results" : results , "path" : path }
109104
110105    def  analyze_metadata (
111-             self ,
112-             path : str ,
113-             info ,
114-             rules = None ,
115-             name : Optional [str ] =  None ,
116-             version : Optional [str ] =  None ) ->  dict :
106+         self ,
107+         path : str ,
108+         info ,
109+         rules = None ,
110+         name : Optional [str ] =  None ,
111+         version : Optional [str ] =  None ,
112+     ) ->  dict :
117113        """ 
118114        Analyzes the metadata of a given package 
119115
@@ -142,7 +138,9 @@ def analyze_metadata(
142138        for  rule  in  all_rules :
143139            try :
144140                log .debug (f"Running rule { rule } { name }  )
145-                 rule_matches , message  =  self .metadata_detectors [rule ].detect (info , path , name , version )
141+                 rule_matches , message  =  self .metadata_detectors [rule ].detect (
142+                     info , path , name , version 
143+                 )
146144                results [rule ] =  None 
147145                if  rule_matches :
148146                    issues  +=  1 
@@ -172,11 +170,7 @@ def analyze_sourcecode(self, path, rules=None) -> dict:
172170        results  =  semgrepscan_results ["results" ] |  yarascan_results ["results" ]
173171        errors  =  semgrepscan_results ["errors" ] |  yarascan_results ["errors" ]
174172
175-         return  {
176-             "issues" : issues ,
177-             "errors" : errors ,
178-             "results" : results ,
179-             "path" : path }
173+         return  {"issues" : issues , "errors" : errors , "results" : results , "path" : path }
180174
181175    def  analyze_yara (self , path : str , rules : Optional [set ] =  None ) ->  dict :
182176        """ 
@@ -221,7 +215,9 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
221215                        continue 
222216
223217                    scan_file_target_abspath  =  os .path .join (root , f )
224-                     scan_file_target_relpath  =  os .path .relpath (scan_file_target_abspath , path )
218+                     scan_file_target_relpath  =  os .path .relpath (
219+                         scan_file_target_abspath , path 
220+                     )
225221
226222                    matches  =  scan_rules .match (scan_file_target_abspath )
227223                    for  m  in  matches :
@@ -231,7 +227,9 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
231227                                finding  =  {
232228                                    "location" : f"{ scan_file_target_relpath } { i .offset }  ,
233229                                    "code" : self .trim_code_snippet (str (i .matched_data )),
234-                                     'message' : m .meta .get ("description" , f"{ m .rule }  )
230+                                     "message" : m .meta .get (
231+                                         "description" , f"{ m .rule }  
232+                                     ),
235233                                }
236234
237235                                # since yara can match the multiple times in the same file 
@@ -249,10 +247,7 @@ def analyze_yara(self, path: str, rules: Optional[set] = None) -> dict:
249247        except  Exception  as  e :
250248            errors ["rules-all" ] =  f"failed to run rule: { str (e )}  
251249
252-         return  {
253-             "results" : results  |  rule_results ,
254-             "errors" : errors ,
255-             "issues" : issues }
250+         return  {"results" : results  |  rule_results , "errors" : errors , "issues" : issues }
256251
257252    def  analyze_semgrep (self , path , rules = None ) ->  dict :
258253        """ 
@@ -277,8 +272,14 @@ def analyze_semgrep(self, path, rules=None) -> dict:
277272        errors  =  {}
278273        issues  =  0 
279274
280-         rules_path  =  list (map (
281-             lambda  rule_name : os .path .join (SOURCECODE_RULES_PATH , f"{ rule_name }  ), all_rules ))
275+         rules_path  =  list (
276+             map (
277+                 lambda  rule_name : os .path .join (
278+                     SOURCECODE_RULES_PATH , f"{ rule_name }  
279+                 ),
280+                 all_rules ,
281+             )
282+         )
282283
283284        if  len (rules_path ) ==  0 :
284285            log .debug ("No semgrep code rules to run" )
@@ -287,7 +288,9 @@ def analyze_semgrep(self, path, rules=None) -> dict:
287288        try :
288289            log .debug (f"Running semgrep code rules against { path }  )
289290            response  =  self ._invoke_semgrep (target = path , rules = rules_path )
290-             rule_results  =  self ._format_semgrep_response (response , targetpath = targetpath )
291+             rule_results  =  self ._format_semgrep_response (
292+                 response , targetpath = targetpath 
293+             )
291294            issues  +=  sum (len (res ) for  res  in  rule_results .values ())
292295
293296            results  =  results  |  rule_results 
@@ -299,9 +302,11 @@ def analyze_semgrep(self, path, rules=None) -> dict:
299302    def  _invoke_semgrep (self , target : str , rules : Iterable [str ]):
300303        try :
301304            SEMGREP_MAX_TARGET_BYTES  =  int (
302-                 os .getenv ("GUARDDOG_SEMGREP_MAX_TARGET_BYTES" , MAX_BYTES_DEFAULT ))
305+                 os .getenv ("GUARDDOG_SEMGREP_MAX_TARGET_BYTES" , MAX_BYTES_DEFAULT )
306+             )
303307            SEMGREP_TIMEOUT  =  int (
304-                 os .getenv ("GUARDDOG_SEMGREP_TIMEOUT" , SEMGREP_TIMEOUT_DEFAULT ))
308+                 os .getenv ("GUARDDOG_SEMGREP_TIMEOUT" , SEMGREP_TIMEOUT_DEFAULT )
309+             )
305310            cmd  =  ["semgrep" ]
306311            for  rule  in  rules :
307312                cmd .extend (["--config" , rule ])
@@ -316,7 +321,9 @@ def _invoke_semgrep(self, target: str, rules: Iterable[str]):
316321            cmd .append (f"--max-target-bytes={ SEMGREP_MAX_TARGET_BYTES }  )
317322            cmd .append (target )
318323            log .debug (f"Invoking semgrep with command line: { ' ' .join (cmd )}  )
319-             result  =  subprocess .run (cmd , capture_output = True , check = True , encoding = "utf-8" )
324+             result  =  subprocess .run (
325+                 cmd , capture_output = True , check = True , encoding = "utf-8" 
326+             )
320327            return  json .loads (str (result .stdout ))
321328        except  FileNotFoundError :
322329            raise  Exception ("unable to find semgrep binary" )
@@ -370,18 +377,18 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
370377            file_path  =  os .path .abspath (result ["path" ])
371378            code  =  self .trim_code_snippet (
372379                self .get_snippet (
373-                     file_path = file_path ,
374-                      start_line = start_line , 
375-                      end_line = end_line ) )
380+                     file_path = file_path ,  start_line = start_line ,  end_line = end_line 
381+                 ) 
382+             )
376383            if  targetpath :
377384                file_path  =  os .path .relpath (file_path , targetpath )
378385
379386            location  =  file_path  +  ":"  +  str (start_line )
380387
381388            finding  =  {
382-                 ' location' location ,
383-                 ' code' code ,
384-                 ' message' result ["extra" ]["message" ]
389+                 " location" location ,
390+                 " code" code ,
391+                 " message" result ["extra" ]["message" ], 
385392            }
386393
387394            rule_results  =  results [rule_name ]
@@ -391,11 +398,7 @@ def _format_semgrep_response(self, response, rule=None, targetpath=None):
391398
392399        return  results 
393400
394-     def  get_snippet (
395-             self ,
396-             file_path : str ,
397-             start_line : int ,
398-             end_line : int ) ->  str :
401+     def  get_snippet (self , file_path : str , start_line : int , end_line : int ) ->  str :
399402        """ 
400403        Returns the code snippet between start_line and stop_line in a file 
401404
@@ -409,7 +412,7 @@ def get_snippet(
409412        """ 
410413        snippet  =  []
411414        try :
412-             with  open (file_path , 'r' ) as  file :
415+             with  open (file_path , "r" ) as  file :
413416                for  current_line_number , line  in  enumerate (file , start = 1 ):
414417                    if  start_line  <=  current_line_number  <=  end_line :
415418                        snippet .append (line )
@@ -420,12 +423,12 @@ def get_snippet(
420423        except  Exception  as  e :
421424            log .error (f"Error reading file { file_path } { str (e )}  )
422425
423-         return  '' .join (snippet )
426+         return  "" .join (snippet )
424427
425428    # Makes sure the matching code to be displayed isn't too long 
426429    def  trim_code_snippet (self , code ):
427430        THRESHOLD  =  250 
428431        if  len (code ) >  THRESHOLD :
429-             return  code [: THRESHOLD  -  10 ] +  ' ...' +  code [len (code ) -  10 :]
432+             return  code [: THRESHOLD  -  10 ] +  " ..." +  code [len (code ) -  10   :]
430433        else :
431434            return  code 
0 commit comments