Skip to content

Commit 70b9d49

Browse files
committed
chore: split checks into rules
Signed-off-by: Felipe Zipitria <felipe.zipitria@owasp.org>
1 parent b629009 commit 70b9d49

16 files changed

+866
-85
lines changed

src/crs_linter/cli.py

Lines changed: 7 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,9 @@
1111
from dulwich.contrib.release_robot import get_current_version
1212
from semver import Version
1313

14-
try:
15-
from linter import Check
16-
except ImportError:
17-
from crs_linter.linter import Check
18-
try:
19-
from logger import Logger, Output
20-
except ImportError:
21-
from crs_linter.logger import Logger, Output
14+
from crs_linter.linter import Linter
15+
from crs_linter.logger import Logger, Output
16+
from crs_linter.rules import indentation
2217

2318

2419
def remove_comments(data):
@@ -187,54 +182,6 @@ def get_crs_version(directory, version=None, head_ref=None, commit_message=None)
187182
return crs_version
188183

189184

190-
def check_indentation(filename, content):
191-
error = False
192-
193-
### make a diff to check the indentations
194-
try:
195-
with open(filename, "r") as fp:
196-
from_lines = fp.readlines()
197-
if os.path.basename(filename) == "crs-setup.conf.example":
198-
from_lines = remove_comments("".join(from_lines)).split("\n")
199-
from_lines = [l + "\n" for l in from_lines]
200-
except FileNotFoundError:
201-
logger.error(f"Can't open file for indentation check: {filename}")
202-
error = True
203-
204-
# virtual output
205-
writer = msc_pyparser.MSCWriter(content)
206-
writer.generate()
207-
output = []
208-
for l in writer.output:
209-
output += [l + "\n" for l in l.split("\n") if l != "\n"]
210-
211-
if len(from_lines) < len(output):
212-
from_lines.append("\n")
213-
elif len(from_lines) > len(output):
214-
output.append("\n")
215-
216-
diff = difflib.unified_diff(from_lines, output)
217-
if from_lines == output:
218-
logger.debug("Indentation check ok.")
219-
else:
220-
logger.debug("Indentation check found error(s)")
221-
error = True
222-
for d in diff:
223-
d = d.strip("\n")
224-
r = re.match(r"^@@ -(\d+),(\d+) \+\d+,\d+ @@$", d)
225-
if r:
226-
line1, line2 = [int(i) for i in r.groups()]
227-
logger.error(
228-
"an indentation error was found",
229-
file=filename,
230-
title="Indentation error",
231-
line=line1,
232-
end_line=line1 + line2,
233-
)
234-
235-
return error
236-
237-
238185
def read_files(filenames):
239186
global logger
240187

@@ -288,7 +235,7 @@ def _arg_in_argv(argv, args):
288235

289236
def parse_args(argv):
290237
parser = argparse.ArgumentParser(
291-
prog="crs-linter", description="CRS Rules Check tool"
238+
prog="crs-linter", description="CRS Rules Linter tool"
292239
)
293240
parser.add_argument(
294241
"-o",
@@ -421,7 +368,8 @@ def main():
421368
for f in parsed.keys():
422369
logger.start_group(f)
423370
logger.debug(f)
424-
c = Check(parsed[f], f, txvars)
371+
c = Linter(parsed[f], f, txvars)
372+
425373

426374
### check case usings
427375
c.check_ignore_case()
@@ -450,7 +398,7 @@ def main():
450398
title="Action order check",
451399
)
452400

453-
error = check_indentation(f, parsed[f])
401+
error = indentation.check(f, parsed[f])
454402
if error:
455403
retval = 1
456404

src/crs_linter/linter.py

100755100644
Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,48 @@
1-
import sys
21
import msc_pyparser
3-
import difflib
4-
import argparse
52
import re
6-
import subprocess
7-
import logging
8-
import os.path
93

10-
def parse_config(text):
11-
try:
12-
mparser = msc_pyparser.MSCParser()
13-
mparser.parser.parse(text)
14-
return mparser.configlines
4+
class LintProblem:
5+
"""Represents a linting problem found by crs-linter."""
6+
def __init__(self, line, end_line, column=None, desc='<no description>', rule=None):
7+
#: Line on which the problem was found (starting at 1)
8+
self.line = line
9+
#: Line on which the problem ends
10+
self.end_line = end_line
11+
#: Column on which the problem was found (starting at 1)
12+
self.column = column
13+
#: Human-readable description of the problem
14+
self.desc = desc
15+
#: Identifier of the rule that detected the problem
16+
self.rule = rule
17+
self.level = None
1518

16-
except Exception as e:
17-
print(e)
19+
@property
20+
def message(self):
21+
if self.rule is not None:
22+
return f'{self.desc} ({self.rule})'
23+
return self.desc
1824

25+
def __eq__(self, other):
26+
return (self.line == other.line and
27+
self.column == other.column and
28+
self.rule == other.rule)
1929

20-
def parse_file(filename):
21-
try:
22-
mparser = msc_pyparser.MSCParser()
23-
with open(filename, "r") as f:
24-
mparser.parser.parse(f.read())
25-
return mparser.configlines
30+
def __lt__(self, other):
31+
return (self.line < other.line or
32+
(self.line == other.line and self.column < other.column))
2633

27-
except Exception as e:
28-
print(e)
34+
def __repr__(self):
35+
return f'{self.line}:{self.column}: {self.message}'
2936

3037

31-
class Check():
32-
def __init__(self, data, filename=None, txvars={}):
38+
class Linter:
39+
ids = {} # list of rule id's and their location in files
40+
vars = {} # list of TX variables and their location in files
3341

42+
def __init__(self, data, filename=None, txvars=None):
3443
# txvars is a global used hash table, but processing of rules is a sequential flow
3544
# all rules need this global table
36-
self.globtxvars = txvars
45+
self.globtxvars = txvars or {}
3746
# list available operators, actions, transformations and ctl args
3847
self.operators = "beginsWith|containsWord|contains|detectSQLi|detectXSS|endsWith|eq|fuzzyHash|geoLookup|ge|gsbLookup|gt|inspectFile|ipMatch|ipMatchF|ipMatchFromFile|le|lt|noMatch|pmFromFile|pmf|pm|rbl|rsub|rx|streq|strmatch|unconditionalMatch|validateByteRange|validateDTD|validateHash|validateSchema|validateUrlEncoding|validateUtf8Encoding|verifyCC|verifyCPF|verifySSN|within".split(
3948
"|"
@@ -101,7 +110,6 @@ def __init__(self, data, filename=None, txvars={}):
101110
self.ids = {} # list of rule id's and their location in files
102111

103112
# Any of these variables below are used to store the errors
104-
105113
self.error_case_mistmatch = [] # list of case mismatch errors
106114
self.error_action_order = [] # list of ordered action errors
107115
self.error_wrong_ctl_auditlogparts = [] # list of wrong ctl:auditLogParts
@@ -120,10 +128,11 @@ def __init__(self, data, filename=None, txvars={}):
120128
self.error_tx_N_without_capture_action = (
121129
[]
122130
) # list of rules which uses TX.N without previous 'capture'
123-
self.error_rule_hasnotest = (
131+
self.error_rule_hasnotest = (
124132
[]
125133
) # list of rules which don't have any tests
126134
# regex to produce tag from filename:
135+
import os.path
127136
self.re_fname = re.compile(r"(REQUEST|RESPONSE)\-\d{3}\-")
128137
self.filename_tag_exclusions = []
129138

@@ -231,6 +240,7 @@ def check_ignore_case(self):
231240
e["message"] += f" (rule: {self.current_ruleid})"
232241

233242
def check_action_order(self):
243+
import sys
234244
for d in self.data:
235245
if "actions" in d:
236246
max_order = 0 # maximum position of read actions
@@ -771,6 +781,7 @@ def gen_crs_file_tag(self, fname=None):
771781
"""
772782
generate tag from filename
773783
"""
784+
import os.path
774785
filename_for_tag = fname if fname is not None else self.filename
775786
filename = self.re_fname.sub("", os.path.basename(os.path.splitext(filename_for_tag)[0]))
776787
filename = filename.replace("APPLICATION-", "")
@@ -993,3 +1004,32 @@ def find_ids_without_tests(self, test_cases, exclusion_list):
9931004
'message': f"rule does not have any tests; rule id: {rid}'"
9941005
})
9951006
return True
1007+
1008+
1009+
def parse_config(text):
1010+
try:
1011+
mparser = msc_pyparser.MSCParser()
1012+
mparser.parser.parse(text)
1013+
return mparser.configlines
1014+
1015+
except Exception as e:
1016+
print(e)
1017+
1018+
1019+
def parse_file(filename):
1020+
try:
1021+
mparser = msc_pyparser.MSCParser()
1022+
with open(filename, "r") as f:
1023+
mparser.parser.parse(f.read())
1024+
return mparser.configlines
1025+
1026+
except Exception as e:
1027+
print(e)
1028+
1029+
1030+
def get_id(actions):
1031+
""" Return the ID from actions """
1032+
for a in actions:
1033+
if a["act_name"] == "id":
1034+
return int(a["act_arg"])
1035+
return 0

src/crs_linter/rules/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# import all checks
2+
3+
from . import (
4+
approved_tags,
5+
capture,
6+
crs_tag,
7+
deprecated,
8+
duplicated,
9+
ignore_case,
10+
indentation,
11+
lowercase_ignorecase,
12+
ordered_actions,
13+
pl_consistency,
14+
pl_tags,
15+
variables_usage,
16+
version
17+
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from src.crs_linter.linter import LintProblem
2+
3+
4+
def check_tags(self, tags):
5+
"""
6+
check that only tags from the util/APPROVED_TAGS file are used
7+
"""
8+
chained = False
9+
ruleid = 0
10+
for d in self.data:
11+
if "actions" in d:
12+
for a in d["actions"]:
13+
if a["act_name"] == "tag":
14+
tag = a["act_arg"]
15+
# check wheter tag is in tagslist
16+
if tags.count(tag) == 0:
17+
yield LintProblem(
18+
rule_id=ruleid,
19+
line=a["lineno"],
20+
end_line=a["lineno"],
21+
desc=f'rule uses unknown tag: "{tag}"; only tags registered in the util/APPROVED_TAGS file may be used; rule id: {ruleid}'
22+
)

src/crs_linter/rules/capture.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
def check_capture_action(self):
2+
"""
3+
check that every chained rule has a `capture` action if it uses TX.N variable
4+
"""
5+
chained = False
6+
ruleid = 0
7+
chainlevel = 0
8+
capture_level = None
9+
re_number = re.compile(r"^\d$")
10+
has_capture = False
11+
use_captured_var = False
12+
captured_var_chain_level = 0
13+
for d in self.data:
14+
# only the SecRule object is relevant
15+
if d["type"].lower() == "secrule":
16+
for v in d["variables"]:
17+
if v["variable"].lower() == "tx" and re_number.match(
18+
v["variable_part"]
19+
):
20+
# only the first occurrence required
21+
if not use_captured_var:
22+
use_captured_var = True
23+
captured_var_chain_level = chainlevel
24+
if "actions" in d:
25+
if not chained:
26+
ruleid = 0
27+
chainlevel = 0
28+
else:
29+
chained = False
30+
for a in d["actions"]:
31+
if a["act_name"] == "id":
32+
ruleid = int(a["act_arg"])
33+
if a["act_name"] == "chain":
34+
chained = True
35+
chainlevel += 1
36+
if a["act_name"] == "capture":
37+
capture_level = chainlevel
38+
has_capture = True
39+
if ruleid > 0 and not chained: # end of chained rule
40+
if use_captured_var:
41+
# we allow if target with TX:N is in the first rule
42+
# of a chained rule without 'capture'
43+
if captured_var_chain_level > 0:
44+
if (
45+
not has_capture
46+
or captured_var_chain_level < capture_level
47+
):
48+
self.error_tx_N_without_capture_action.append(
49+
{
50+
"ruleid": ruleid,
51+
"line": a["lineno"],
52+
"endLine": a["lineno"],
53+
"message": f"rule uses TX.N without capture; rule id: {ruleid}'",
54+
}
55+
)
56+
# clear variables
57+
chained = False
58+
chainlevel = 0
59+
has_capture = False
60+
capture_level = 0
61+
captured_var_chain_level = 0
62+
use_captured_var = False
63+
ruleid = 0

src/crs_linter/rules/crs_tag.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
2+
def check_crs_tag(self):
3+
"""
4+
check that every rule has a `tag:'OWASP_CRS'` action
5+
"""
6+
chained = False
7+
ruleid = 0
8+
has_crs = False
9+
for d in self.data:
10+
if "actions" in d:
11+
chainlevel = 0
12+
13+
if not chained:
14+
ruleid = 0
15+
has_crs = False
16+
chainlevel = 0
17+
else:
18+
chained = False
19+
for a in d["actions"]:
20+
if a["act_name"] == "id":
21+
ruleid = int(a["act_arg"])
22+
if a["act_name"] == "chain":
23+
chained = True
24+
chainlevel += 1
25+
if a["act_name"] == "tag":
26+
if chainlevel == 0:
27+
if a["act_arg"] == "OWASP_CRS":
28+
has_crs = True
29+
if ruleid > 0 and not has_crs:
30+
self.error_no_crstag.append(
31+
{
32+
"ruleid": ruleid,
33+
"line": a["lineno"],
34+
"endLine": a["lineno"],
35+
"message": f"rule does not have tag with value 'OWASP_CRS'; rule id: {ruleid}",
36+
}
37+
)
38+

0 commit comments

Comments
 (0)