Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import re
import json
from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING
from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING, Union

from bc_jsonpath_ng.ext import parse
from networkx import DiGraph
Expand Down Expand Up @@ -42,6 +42,9 @@ def __init__(
self.value = value
self.is_jsonpath_check = is_jsonpath_check

def get_resource_types(self) -> List[str]:
return self.resource_types

def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
executer = ThreadPoolExecutor()
jobs = []
Expand All @@ -67,7 +70,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List
concurrent.futures.wait(jobs)
return passed_vertices, failed_vertices, unknown_vertices

def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]:
# if this value contains an underendered variable, then we cannot evaluate value checks,
# and will return None (for UNKNOWN)
# handle edge cases in some policies that explicitly look for blank values
Expand Down Expand Up @@ -116,21 +119,18 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
)

return result

result = self.resource_type_pred(vertex, self.resource_types) and self._get_operation(
vertex=vertex, attribute=self.attribute
)

debug.attribute_block(
resource_types=self.resource_types,
attribute=self.attribute,
operator=self.operator,
value=self.value,
resource=vertex,
status="passed" if result is True else "failed",
)

return result
result = self._get_operation(vertex=vertex, attribute=self.attribute)
if result is not None:
debug.attribute_block(
resource_types=self.resource_types,
attribute=self.attribute,
operator=self.operator,
value=self.value,
resource=vertex,
status="passed" if result is True else "failed",
)

return result

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
raise NotImplementedError
Expand Down
20 changes: 17 additions & 3 deletions checkov/common/checks_infra/solvers/complex_solvers/and_solver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict, Optional
from typing import List, Any, Dict, Optional, Union

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -16,12 +16,26 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None
def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
return reduce(and_, args)

def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that the return type changed like that, specifically if we want to return not found we should use Literal["not found"] to be more specific.
In addition - is it really needed? Can't we just return the value which will not change the check?
For example in and-solver we can return True and in or-solver return False, to make sure this is simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was my last resort, it's essential because otherwise it affects checks results, which is not what we want.

has_unrendered_attribute = False
# found flag indicates if at least one solver tested the vertex
found = False
for solver in self.solvers:
resource_types = solver.get_resource_types()
# In case that current solver's resource types aren't compatible with vertex, this operation should be
# skipped
if resource_types and not self.resource_type_pred(vertex, resource_types):
continue
found = True
operation = solver.get_operation(vertex)
if operation is None:
has_unrendered_attribute = True
elif operation == "not found":
continue
elif not operation:
return False
return None if has_unrendered_attribute else True
if not found:
return "not found"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be extracted to a const

elif has_unrendered_attribute:
return None
return True
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional
from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional, Union

from networkx import DiGraph

Expand Down Expand Up @@ -29,7 +29,7 @@ def _get_negative_op(self, *args: Any) -> Any:
return not self._get_operation(args)

@abstractmethod
def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]:
raise NotImplementedError()

def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
Expand All @@ -53,7 +53,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List
for _, data in graph_connector.nodes():
if self.resource_type_pred(data, self.resource_types):
result = self.get_operation(data)
if result is None:
if result is None or result == "not found":
unknown_vertices.append(data)
elif result:
passed_vertices.append(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict, Optional
from typing import List, Any, Dict, Optional, Union

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -18,6 +18,6 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
raise Exception('The "not" operator must have exactly one child')
return not args[0]

def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]:
result = self.solvers[0].get_operation(vertex)
return None if result is None else not result
24 changes: 19 additions & 5 deletions checkov/common/checks_infra/solvers/complex_solvers/or_solver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict, Optional
from typing import List, Any, Dict, Optional, Union

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -16,12 +16,26 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None
def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
return reduce(or_, args)

def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]:
has_unrendered_attribute = False
# found flag indicates if at least one solver tested the vertex
found = False
for solver in self.solvers:
resource_types = solver.get_resource_types()
# In case that current solver's resource types aren't compatible with vertex, this operation should be
# skipped
if resource_types and not self.resource_type_pred(vertex, resource_types):
continue
found = True
operation = solver.get_operation(vertex)
if operation:
return True
if operation is None:
has_unrendered_attribute = True
return None if has_unrendered_attribute else False
elif operation == "not found":
continue
if operation:
return True
if not found:
return "not found"
elif has_unrendered_attribute:
return None
return False
8 changes: 8 additions & 0 deletions checkov/common/graph/checks_infra/solvers/base_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
raise NotImplementedError()

def get_resource_types(self) -> List[str]:
"""
Returns the list of resource types supported by this solver.

:return: List of resource types
"""
return []

def resource_type_pred(self, v: Dict[str, Any], resource_types: List[str]) -> bool:
resource_type = CustomAttributes.RESOURCE_TYPE
if env_vars_config.CKV_SUPPORT_ALL_RESOURCE_TYPE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
metadata:
id: "CUSTOM_AWS_TAGGING_POLICY"
name: "HF_all_Policy"
guidelines: "Tagging needs to be done correctly."
category: "General"
severity: "critical"
scope:
provider: "aws"
definition:
and:
- cond_type: attribute
resource_types:
- aws_organizations_account
- aws_elasticache_cluster
- aws_db_instance
attribute: "tags.AccountType"
operator: regex_match
value: "^Lab$|^Sandbox$|^NonProd$|^Prod$|^PublicProd$|^PublicNonProd$"
- cond_type: attribute
resource_types:
- aws_db_instance #
- aws_organizations_account #
- aws_vpc
- aws_route53_zone
- aws_elasticache_cluster
- aws_lambda_function
- aws_ecs_cluster
- aws_instance
- aws_dynamodb_table
- aws_ebs_volume
- aws_efs_file_system
- aws_eks_cluster
- aws_fsx_windows_file_system
- aws_docdb_cluster
- aws_s3_bucket
attribute: "tags.Billing"
operator: regex_match
value: "APP-\\d{4}$|d{4}$" # Accepts APP-1234 or 4-digit dept codes
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
provider "aws" {
region = "us-east-1"
}
resource "aws_dynamodb_table" "example" {
name = "example-table"
billing_mode = "PAY_PER_REQUEST" # or "PROVISIONED"
hash_key = "id" attribute {
name = "id"
type = "S" # S = String, N = Number, B = Binary
}
tags = {
AppId = "APP-1234"
Billing = "APP-1234"
DynatraceMonitoring = true
EnvironmentType = "PROD"
DataClassification = "HighlyRestricted"
yor_name = "example"
yor_trace = "d9afaa27-0ffc-49c4-90fa-9c3a7649637f"
}
}
5 changes: 3 additions & 2 deletions tests/terraform/runner/test_plan_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def test_runner_nested_child_modules(self):
self.assertEqual(report.get_exit_code({'soft_fail': False, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 1)
self.assertEqual(report.get_exit_code({'soft_fail': True, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 0)

self.assertEqual(report.get_summary()["failed"], 15)
self.assertEqual(report.get_summary()["failed"], 18)
self.assertEqual(report.get_summary()["passed"], 3)

failed_check_ids = set([c.check_id for c in report.failed_checks])
Expand All @@ -327,7 +327,8 @@ def test_runner_nested_child_modules(self):
"CKV_AWS_38",
"CKV_AWS_39",
"CKV_AWS_58",
"CUSTOM_GRAPH_AWS_1"
"CUSTOM_GRAPH_AWS_1",
"CUSTOM_AWS_TAGGING_POLICY",
}

assert failed_check_ids == expected_failed_check_ids
Expand Down
24 changes: 21 additions & 3 deletions tests/terraform/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ def test_for_each_check(self):
assert 'module.simple[0].aws_s3_bucket_object.this_file' in failed_resources
assert 'module.simple[1].aws_s3_bucket_object.this_file' in failed_resources


def test_and_with_different_resource_types(self):
runner = Runner(db_connector=self.db_connector())
current_dir = os.path.dirname(os.path.realpath(__file__))
resources_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "resources", "and_with_different_resource_types")
extra_checks_dir_path = [current_dir + "/extra_yaml_checks"]

report = runner.run(root_folder=resources_path, external_checks_dir=extra_checks_dir_path,
runner_filter=RunnerFilter(framework=["terraform"], checks=["CUSTOM_AWS_TAGGING_POLICY"]))
self.assertEqual(len(report.passed_checks), 1)
self.assertEqual(len(report.failed_checks), 0)

# Remove external checks from registry.
runner.graph_registry.checks[:] = [check for check in runner.graph_registry.checks if
"CUSTOM" not in check.id]

def test_runner_passing_valid_tf(self):
current_dir = os.path.dirname(os.path.realpath(__file__))

Expand Down Expand Up @@ -1209,7 +1226,7 @@ def test_loading_external_checks_yaml(self):
current_dir = os.path.dirname(os.path.realpath(__file__))
extra_checks_dir_path = current_dir + "/extra_yaml_checks"
runner.load_external_checks([extra_checks_dir_path])
self.assertEqual(len(runner.graph_registry.checks), base_len + 5)
self.assertEqual(len(runner.graph_registry.checks), base_len + 6)
runner.graph_registry.checks = runner.graph_registry.checks[:base_len]

def test_loading_external_checks_yaml_multiple_times(self):
Expand All @@ -1218,14 +1235,15 @@ def test_loading_external_checks_yaml_multiple_times(self):
runner.graph_registry.checks = []
extra_checks_dir_path = [current_dir + "/extra_yaml_checks"]
runner.load_external_checks(extra_checks_dir_path)
self.assertEqual(len(runner.graph_registry.checks), 5)
self.assertEqual(len(runner.graph_registry.checks), 6)
runner.load_external_checks(extra_checks_dir_path)
self.assertEqual(len(runner.graph_registry.checks), 5)
self.assertEqual(len(runner.graph_registry.checks), 6)

graph_checks = [x.id for x in runner.graph_registry.checks]
self.assertIn('CUSTOM_GRAPH_AWS_1', graph_checks)
self.assertIn('CUSTOM_GRAPH_AWS_2', graph_checks)
self.assertIn('CKV2_CUSTOM_1', graph_checks)
self.assertIn('CUSTOM_AWS_TAGGING_POLICY', graph_checks)
runner.graph_registry.checks = []

def test_loading_external_checks_python(self):
Expand Down
Loading