diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index 231aeeb173..010be46215 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -4,7 +4,7 @@ import logging import re import json -from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING +from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING, Union from bc_jsonpath_ng.ext import parse from networkx import DiGraph @@ -42,6 +42,9 @@ def __init__( self.value = value self.is_jsonpath_check = is_jsonpath_check + def get_resource_types(self) -> List[str]: + return self.resource_types + def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: executer = ThreadPoolExecutor() jobs = [] @@ -67,7 +70,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List concurrent.futures.wait(jobs) return passed_vertices, failed_vertices, unknown_vertices - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: # if this value contains an underendered variable, then we cannot evaluate value checks, # and will return None (for UNKNOWN) # handle edge cases in some policies that explicitly look for blank values @@ -116,21 +119,18 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: ) return result - - result = self.resource_type_pred(vertex, self.resource_types) and self._get_operation( - vertex=vertex, attribute=self.attribute - ) - - debug.attribute_block( - resource_types=self.resource_types, - attribute=self.attribute, - operator=self.operator, - value=self.value, - resource=vertex, - status="passed" if result is True else "failed", - ) - - return result + result = self._get_operation(vertex=vertex, attribute=self.attribute) + if result is not None: + debug.attribute_block( + resource_types=self.resource_types, + attribute=self.attribute, + operator=self.operator, + value=self.value, + resource=vertex, + status="passed" if result is True else "failed", + ) + + return result def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool: raise NotImplementedError diff --git a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py index f35f559724..7435b9d9f6 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py @@ -1,8 +1,8 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver -from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver +from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver, NOT_FOUND from functools import reduce from operator import and_ @@ -16,12 +16,26 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None def _get_operation(self, *args: Any, **kwargs: Any) -> Any: return reduce(and_, args) - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: has_unrendered_attribute = False + # found flag indicates if at least one solver tested the vertex + found = False for solver in self.solvers: + resource_types = solver.get_resource_types() + # In case that current solver's resource types aren't compatible with vertex, this operation should be + # skipped + if resource_types and not self.resource_type_pred(vertex, resource_types): + continue + found = True operation = solver.get_operation(vertex) if operation is None: has_unrendered_attribute = True + elif operation == NOT_FOUND: + continue elif not operation: return False - return None if has_unrendered_attribute else True + if not found: + return NOT_FOUND + elif has_unrendered_attribute: + return None + return True diff --git a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py index ccd77afde0..df0dd6f0f8 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional +from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional, Union from networkx import DiGraph @@ -11,6 +11,8 @@ if TYPE_CHECKING: from checkov.common.typing import LibraryGraph +NOT_FOUND = "NOT_FOUND" + class BaseComplexSolver(BaseSolver): operator = "" # noqa: CCE003 # a static attribute @@ -29,7 +31,7 @@ def _get_negative_op(self, *args: Any) -> Any: return not self._get_operation(args) @abstractmethod - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: raise NotImplementedError() def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: @@ -53,7 +55,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List for _, data in graph_connector.nodes(): if self.resource_type_pred(data, self.resource_types): result = self.get_operation(data) - if result is None: + if result is None or result == "not found": unknown_vertices.append(data) elif result: passed_vertices.append(data) diff --git a/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py index 1c2467a836..87c0109d03 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py @@ -1,4 +1,4 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver @@ -18,6 +18,6 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: raise Exception('The "not" operator must have exactly one child') return not args[0] - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: result = self.solvers[0].get_operation(vertex) return None if result is None else not result diff --git a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py index 36875f3ceb..6bd13e8335 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py @@ -1,8 +1,8 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver -from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver +from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver, NOT_FOUND from functools import reduce from operator import or_ @@ -16,12 +16,26 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None def _get_operation(self, *args: Any, **kwargs: Any) -> Any: return reduce(or_, args) - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: has_unrendered_attribute = False + # found flag indicates if at least one solver tested the vertex + found = False for solver in self.solvers: + resource_types = solver.get_resource_types() + # In case that current solver's resource types aren't compatible with vertex, this operation should be + # skipped + if resource_types and not self.resource_type_pred(vertex, resource_types): + continue + found = True operation = solver.get_operation(vertex) - if operation: - return True if operation is None: has_unrendered_attribute = True - return None if has_unrendered_attribute else False + elif operation == NOT_FOUND: + continue + if operation: + return True + if not found: + return NOT_FOUND + elif has_unrendered_attribute: + return None + return False diff --git a/checkov/common/graph/checks_infra/solvers/base_solver.py b/checkov/common/graph/checks_infra/solvers/base_solver.py index 2258a7ba1b..c150b5fe04 100644 --- a/checkov/common/graph/checks_infra/solvers/base_solver.py +++ b/checkov/common/graph/checks_infra/solvers/base_solver.py @@ -35,6 +35,14 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: raise NotImplementedError() + def get_resource_types(self) -> List[str]: + """ + Returns the list of resource types supported by this solver. + + :return: List of resource types + """ + return [] + def resource_type_pred(self, v: Dict[str, Any], resource_types: List[str]) -> bool: resource_type = CustomAttributes.RESOURCE_TYPE if env_vars_config.CKV_SUPPORT_ALL_RESOURCE_TYPE: diff --git a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml new file mode 100644 index 0000000000..87d0a35084 --- /dev/null +++ b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml @@ -0,0 +1,20 @@ +metadata: + id: "CUSTOM_AWS_TAGGING_POLICY" + name: "HF_all_Policy" + category: "General" +scope: + provider: "aws" +definition: + and: + - cond_type: attribute + resource_types: + - aws_s3_bucket + attribute: "tags.Name" + operator: regex_match + value: "^bucket-[a-zA-Z0-9-]+$" + - cond_type: attribute + resource_types: + - aws_dynamodb_table + attribute: "tags.Billing" + operator: regex_match + value: "APP-\\d{4}$|d{4}$" # Accepts APP-1234 or 4-digit dept codes \ No newline at end of file diff --git a/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf b/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf new file mode 100644 index 0000000000..e809e9a7ec --- /dev/null +++ b/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf @@ -0,0 +1,20 @@ +provider "aws" { + region = "us-east-1" +} +resource "aws_dynamodb_table" "example" { + name = "example-table" + billing_mode = "PAY_PER_REQUEST" # or "PROVISIONED" + hash_key = "id" attribute { + name = "id" + type = "S" # S = String, N = Number, B = Binary + } + tags = { + AppId = "APP-1234" + Billing = "APP-1234" + DynatraceMonitoring = true + EnvironmentType = "PROD" + DataClassification = "HighlyRestricted" + yor_name = "example" + yor_trace = "d9afaa27-0ffc-49c4-90fa-9c3a7649637f" + } +} \ No newline at end of file diff --git a/tests/terraform/runner/test_plan_runner.py b/tests/terraform/runner/test_plan_runner.py index c1fd33b4fe..1a3943c59e 100644 --- a/tests/terraform/runner/test_plan_runner.py +++ b/tests/terraform/runner/test_plan_runner.py @@ -318,7 +318,7 @@ def test_runner_nested_child_modules(self): self.assertEqual(report.get_exit_code({'soft_fail': False, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 1) self.assertEqual(report.get_exit_code({'soft_fail': True, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 0) - self.assertEqual(report.get_summary()["failed"], 15) + self.assertEqual(report.get_summary()["failed"], 18) self.assertEqual(report.get_summary()["passed"], 3) failed_check_ids = set([c.check_id for c in report.failed_checks]) @@ -327,7 +327,8 @@ def test_runner_nested_child_modules(self): "CKV_AWS_38", "CKV_AWS_39", "CKV_AWS_58", - "CUSTOM_GRAPH_AWS_1" + "CUSTOM_GRAPH_AWS_1", + "CUSTOM_AWS_TAGGING_POLICY", } assert failed_check_ids == expected_failed_check_ids diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index aab501f28e..aab4c19e8e 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -172,6 +172,23 @@ def test_for_each_check(self): assert 'module.simple[0].aws_s3_bucket_object.this_file' in failed_resources assert 'module.simple[1].aws_s3_bucket_object.this_file' in failed_resources + + def test_and_with_different_resource_types(self): + runner = Runner(db_connector=self.db_connector()) + current_dir = os.path.dirname(os.path.realpath(__file__)) + resources_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "resources", "and_with_different_resource_types") + extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] + + report = runner.run(root_folder=resources_path, external_checks_dir=extra_checks_dir_path, + runner_filter=RunnerFilter(framework=["terraform"], checks=["CUSTOM_AWS_TAGGING_POLICY"])) + self.assertEqual(len(report.passed_checks), 1) + self.assertEqual(len(report.failed_checks), 0) + + # Remove external checks from registry. + runner.graph_registry.checks[:] = [check for check in runner.graph_registry.checks if + "CUSTOM" not in check.id] + def test_runner_passing_valid_tf(self): current_dir = os.path.dirname(os.path.realpath(__file__)) @@ -1209,7 +1226,7 @@ def test_loading_external_checks_yaml(self): current_dir = os.path.dirname(os.path.realpath(__file__)) extra_checks_dir_path = current_dir + "/extra_yaml_checks" runner.load_external_checks([extra_checks_dir_path]) - self.assertEqual(len(runner.graph_registry.checks), base_len + 5) + self.assertEqual(len(runner.graph_registry.checks), base_len + 6) runner.graph_registry.checks = runner.graph_registry.checks[:base_len] def test_loading_external_checks_yaml_multiple_times(self): @@ -1218,14 +1235,15 @@ def test_loading_external_checks_yaml_multiple_times(self): runner.graph_registry.checks = [] extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] runner.load_external_checks(extra_checks_dir_path) - self.assertEqual(len(runner.graph_registry.checks), 5) + self.assertEqual(len(runner.graph_registry.checks), 6) runner.load_external_checks(extra_checks_dir_path) - self.assertEqual(len(runner.graph_registry.checks), 5) + self.assertEqual(len(runner.graph_registry.checks), 6) graph_checks = [x.id for x in runner.graph_registry.checks] self.assertIn('CUSTOM_GRAPH_AWS_1', graph_checks) self.assertIn('CUSTOM_GRAPH_AWS_2', graph_checks) self.assertIn('CKV2_CUSTOM_1', graph_checks) + self.assertIn('CUSTOM_AWS_TAGGING_POLICY', graph_checks) runner.graph_registry.checks = [] def test_loading_external_checks_python(self):