From 5474558d2c4f29fcbb01d4fbf213c4512d0dbd2f Mon Sep 17 00:00:00 2001 From: iifrach Date: Mon, 28 Jul 2025 17:37:57 +0300 Subject: [PATCH 01/10] Fix logic --- .../base_attribute_solver.py | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index 231aeeb173..b3d44802c1 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -24,7 +24,8 @@ from bc_jsonpath_ng import JSONPath from checkov.common.typing import LibraryGraph -SUPPORTED_BLOCK_TYPES = {BlockType.RESOURCE, TerraformBlockType.DATA, TerraformBlockType.MODULE, TerraformBlockType.PROVIDER} +SUPPORTED_BLOCK_TYPES = {BlockType.RESOURCE, TerraformBlockType.DATA, TerraformBlockType.MODULE, + TerraformBlockType.PROVIDER} WILDCARD_PATTERN = re.compile(r"(\S+[.][*][.]*)+") @@ -34,15 +35,17 @@ class BaseAttributeSolver(BaseSolver): jsonpath_parsed_statement_cache: "dict[str, JSONPath]" = {} # noqa: CCE003 # global cache def __init__( - self, resource_types: List[str], attribute: Optional[str], value: Any, is_jsonpath_check: bool = False + self, resource_types: List[str], attribute: Optional[str], value: Any, is_jsonpath_check: bool = False ) -> None: super().__init__(SolverType.ATTRIBUTE) self.resource_types = resource_types - self.attribute = attribute if attribute not in reserved_attributes_to_scan else wrap_reserved_attributes(attribute) + self.attribute = attribute if attribute not in reserved_attributes_to_scan else wrap_reserved_attributes( + attribute) self.value = value self.is_jsonpath_check = is_jsonpath_check - def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + def run(self, graph_connector: LibraryGraph) -> Tuple[ + List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: executer = ThreadPoolExecutor() jobs = [] passed_vertices: List[Dict[str, Any]] = [] @@ -60,7 +63,8 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List return passed_vertices, failed_vertices, unknown_vertices for _, data in graph_connector.nodes(): - if self.resource_type_pred(data, self.resource_types) and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES: + if self.resource_type_pred(data, self.resource_types) and data.get( + CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES: jobs.append(executer.submit( self._process_node, data, passed_vertices, failed_vertices, unknown_vertices)) @@ -73,7 +77,8 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: # handle edge cases in some policies that explicitly look for blank values # we also need to check the attribute stack - e.g., if they are looking for tags.component, but tags = local.tags, # then we actually need to see if tags is variable dependent as well - attr_parts = self.attribute.split('.') # type:ignore[union-attr] # due to attribute can be None (but not really) + attr_parts = self.attribute.split( + '.') # type:ignore[union-attr] # due to attribute can be None (but not really) attr_to_check = None for attr in attr_parts: attr_to_check = f'{attr_to_check}.{attr}' if attr_to_check else attr @@ -116,27 +121,27 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: ) return result + if self.resource_type_pred(vertex, self.resource_types): + result = self._get_operation( + vertex=vertex, attribute=self.attribute + ) + if result is not None: + debug.attribute_block( + resource_types=self.resource_types, + attribute=self.attribute, + operator=self.operator, + value=self.value, + resource=vertex, + status="passed" if result is True else "failed", + ) - result = self.resource_type_pred(vertex, self.resource_types) and self._get_operation( - vertex=vertex, attribute=self.attribute - ) - - debug.attribute_block( - resource_types=self.resource_types, - attribute=self.attribute, - operator=self.operator, - value=self.value, - resource=vertex, - status="passed" if result is True else "failed", - ) - - return result + return result def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool: raise NotImplementedError def _process_node( - self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]], + self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]], unknown_vertices: List[Dict[str, Any]] ) -> None: if not self.resource_type_pred(data, self.resource_types): @@ -155,17 +160,17 @@ def should_check_all_condition(self) -> bool: return self.is_jsonpath_check def _evaluate_attribute_matches( - self, vertex: dict[str, Any], attribute_matches: list[str], filtered_attribute_matches: list[str] + self, vertex: dict[str, Any], attribute_matches: list[str], filtered_attribute_matches: list[str] ) -> bool | None: if self.should_check_all_condition(): if self.resource_type_pred(vertex, self.resource_types) and all( - self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches + self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches ): return True if len(attribute_matches) == len(filtered_attribute_matches) else None return False if self.resource_type_pred(vertex, self.resource_types) and any( - self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches + self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches ): return True return False if len(attribute_matches) == len(filtered_attribute_matches) else None From 85712996aa98810c1ea19a83460de8fbf715d8c1 Mon Sep 17 00:00:00 2001 From: iifrach Date: Wed, 30 Jul 2025 12:50:49 +0300 Subject: [PATCH 02/10] Fix logic and tests --- .../base_attribute_solver.py | 31 +++++++++++-------- .../solvers/complex_solvers/and_solver.py | 7 ++++- .../solvers/complex_solvers/or_solver.py | 7 ++++- .../graph/checks_infra/solvers/base_solver.py | 9 ++++++ tests/terraform/runner/test_runner.py | 2 +- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index b3d44802c1..a9a5b264eb 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -44,6 +44,14 @@ def __init__( self.value = value self.is_jsonpath_check = is_jsonpath_check + def get_resource_types(self) -> List[str]: + """ + Returns the list of resource types supported by this solver. + + :return: List of resource types + """ + return self.resource_types + def run(self, graph_connector: LibraryGraph) -> Tuple[ List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: executer = ThreadPoolExecutor() @@ -121,21 +129,18 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: ) return result - if self.resource_type_pred(vertex, self.resource_types): - result = self._get_operation( - vertex=vertex, attribute=self.attribute + result = self._get_operation(vertex=vertex, attribute=self.attribute) + if result is not None: + debug.attribute_block( + resource_types=self.resource_types, + attribute=self.attribute, + operator=self.operator, + value=self.value, + resource=vertex, + status="passed" if result is True else "failed", ) - if result is not None: - debug.attribute_block( - resource_types=self.resource_types, - attribute=self.attribute, - operator=self.operator, - value=self.value, - resource=vertex, - status="passed" if result is True else "failed", - ) - return result + return result def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool: raise NotImplementedError diff --git a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py index f35f559724..0cdb63daff 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py @@ -18,10 +18,15 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: has_unrendered_attribute = False + found = False for solver in self.solvers: + resource_types = solver.get_resource_types() + if resource_types and not self.resource_type_pred(vertex, resource_types): + continue + found = True operation = solver.get_operation(vertex) if operation is None: has_unrendered_attribute = True elif not operation: return False - return None if has_unrendered_attribute else True + return None if (has_unrendered_attribute or not found) else True diff --git a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py index 36875f3ceb..a559875ab2 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py @@ -18,10 +18,15 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: has_unrendered_attribute = False + found = False for solver in self.solvers: + resource_types = solver.get_resource_types() + if resource_types and not self.resource_type_pred(vertex, resource_types): + continue + found = True operation = solver.get_operation(vertex) if operation: return True if operation is None: has_unrendered_attribute = True - return None if has_unrendered_attribute else False + return None if (has_unrendered_attribute or not found) else False diff --git a/checkov/common/graph/checks_infra/solvers/base_solver.py b/checkov/common/graph/checks_infra/solvers/base_solver.py index 2258a7ba1b..d9314212df 100644 --- a/checkov/common/graph/checks_infra/solvers/base_solver.py +++ b/checkov/common/graph/checks_infra/solvers/base_solver.py @@ -35,6 +35,15 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: raise NotImplementedError() + def get_resource_types(self) -> List[str]: + """ + Returns the list of resource types supported by this solver. + + :return: List of resource types + """ + return [] + + def resource_type_pred(self, v: Dict[str, Any], resource_types: List[str]) -> bool: resource_type = CustomAttributes.RESOURCE_TYPE if env_vars_config.CKV_SUPPORT_ALL_RESOURCE_TYPE: diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index aab501f28e..c475bbdabe 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -1321,7 +1321,7 @@ def test_list_of_routes(self): runner_filter=RunnerFilter(framework=["terraform"], checks=checks_allow_list)) self.assertEqual(len(report.passed_checks), 0) - self.assertEqual(len(report.failed_checks), 1) + self.assertEqual(len(report.failed_checks), 0) def test_resource_values_dont_exist(self): resources_path = os.path.join( From 7d71b6bcad2b594e85ba18b9c4006d25f0eac6d3 Mon Sep 17 00:00:00 2001 From: iifrach Date: Wed, 30 Jul 2025 13:06:43 +0300 Subject: [PATCH 03/10] Fix lint & mypy --- .../base_attribute_solver.py | 28 ++++++------------- .../graph/checks_infra/solvers/base_solver.py | 1 - 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index a9a5b264eb..a5cbe81159 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -39,21 +39,11 @@ def __init__( ) -> None: super().__init__(SolverType.ATTRIBUTE) self.resource_types = resource_types - self.attribute = attribute if attribute not in reserved_attributes_to_scan else wrap_reserved_attributes( - attribute) + self.attribute = attribute if attribute not in reserved_attributes_to_scan else wrap_reserved_attributes(attribute) self.value = value self.is_jsonpath_check = is_jsonpath_check - def get_resource_types(self) -> List[str]: - """ - Returns the list of resource types supported by this solver. - - :return: List of resource types - """ - return self.resource_types - - def run(self, graph_connector: LibraryGraph) -> Tuple[ - List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: executer = ThreadPoolExecutor() jobs = [] passed_vertices: List[Dict[str, Any]] = [] @@ -71,8 +61,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[ return passed_vertices, failed_vertices, unknown_vertices for _, data in graph_connector.nodes(): - if self.resource_type_pred(data, self.resource_types) and data.get( - CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES: + if self.resource_type_pred(data, self.resource_types) and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES: jobs.append(executer.submit( self._process_node, data, passed_vertices, failed_vertices, unknown_vertices)) @@ -85,8 +74,7 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: # handle edge cases in some policies that explicitly look for blank values # we also need to check the attribute stack - e.g., if they are looking for tags.component, but tags = local.tags, # then we actually need to see if tags is variable dependent as well - attr_parts = self.attribute.split( - '.') # type:ignore[union-attr] # due to attribute can be None (but not really) + attr_parts = self.attribute.split('.') # type:ignore[union-attr] # due to attribute can be None (but not really) attr_to_check = None for attr in attr_parts: attr_to_check = f'{attr_to_check}.{attr}' if attr_to_check else attr @@ -146,7 +134,7 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo raise NotImplementedError def _process_node( - self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]], + self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]], unknown_vertices: List[Dict[str, Any]] ) -> None: if not self.resource_type_pred(data, self.resource_types): @@ -165,17 +153,17 @@ def should_check_all_condition(self) -> bool: return self.is_jsonpath_check def _evaluate_attribute_matches( - self, vertex: dict[str, Any], attribute_matches: list[str], filtered_attribute_matches: list[str] + self, vertex: dict[str, Any], attribute_matches: list[str], filtered_attribute_matches: list[str] ) -> bool | None: if self.should_check_all_condition(): if self.resource_type_pred(vertex, self.resource_types) and all( - self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches + self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches ): return True if len(attribute_matches) == len(filtered_attribute_matches) else None return False if self.resource_type_pred(vertex, self.resource_types) and any( - self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches + self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches ): return True return False if len(attribute_matches) == len(filtered_attribute_matches) else None diff --git a/checkov/common/graph/checks_infra/solvers/base_solver.py b/checkov/common/graph/checks_infra/solvers/base_solver.py index d9314212df..c150b5fe04 100644 --- a/checkov/common/graph/checks_infra/solvers/base_solver.py +++ b/checkov/common/graph/checks_infra/solvers/base_solver.py @@ -43,7 +43,6 @@ def get_resource_types(self) -> List[str]: """ return [] - def resource_type_pred(self, v: Dict[str, Any], resource_types: List[str]) -> bool: resource_type = CustomAttributes.RESOURCE_TYPE if env_vars_config.CKV_SUPPORT_ALL_RESOURCE_TYPE: From 9dd4ecfbc0aee8c852181318b3646f26d79bc50c Mon Sep 17 00:00:00 2001 From: iifrach Date: Wed, 30 Jul 2025 14:24:10 +0300 Subject: [PATCH 04/10] Fix logic --- .../solvers/attribute_solvers/base_attribute_solver.py | 9 ++++++--- .../checks_infra/solvers/complex_solvers/or_solver.py | 8 ++++---- tests/terraform/runner/test_runner.py | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index a5cbe81159..2001d76dfc 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -24,8 +24,7 @@ from bc_jsonpath_ng import JSONPath from checkov.common.typing import LibraryGraph -SUPPORTED_BLOCK_TYPES = {BlockType.RESOURCE, TerraformBlockType.DATA, TerraformBlockType.MODULE, - TerraformBlockType.PROVIDER} +SUPPORTED_BLOCK_TYPES = {BlockType.RESOURCE, TerraformBlockType.DATA, TerraformBlockType.MODULE, TerraformBlockType.PROVIDER} WILDCARD_PATTERN = re.compile(r"(\S+[.][*][.]*)+") @@ -35,7 +34,7 @@ class BaseAttributeSolver(BaseSolver): jsonpath_parsed_statement_cache: "dict[str, JSONPath]" = {} # noqa: CCE003 # global cache def __init__( - self, resource_types: List[str], attribute: Optional[str], value: Any, is_jsonpath_check: bool = False + self, resource_types: List[str], attribute: Optional[str], value: Any, is_jsonpath_check: bool = False ) -> None: super().__init__(SolverType.ATTRIBUTE) self.resource_types = resource_types @@ -43,6 +42,10 @@ def __init__( self.value = value self.is_jsonpath_check = is_jsonpath_check + + def get_resource_types(self) -> List[str]: + return self.resource_types + def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: executer = ThreadPoolExecutor() jobs = [] diff --git a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py index a559875ab2..33a054c503 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py @@ -17,16 +17,16 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: return reduce(or_, args) def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: - has_unrendered_attribute = False + # found flag indicates if at least one solver tested the vertex found = False for solver in self.solvers: resource_types = solver.get_resource_types() + # In case that current solver's resource types aren't compatible with vertex, this operation should be + # skipped if resource_types and not self.resource_type_pred(vertex, resource_types): continue found = True operation = solver.get_operation(vertex) if operation: return True - if operation is None: - has_unrendered_attribute = True - return None if (has_unrendered_attribute or not found) else False + return None if not found else False diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index c475bbdabe..aab501f28e 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -1321,7 +1321,7 @@ def test_list_of_routes(self): runner_filter=RunnerFilter(framework=["terraform"], checks=checks_allow_list)) self.assertEqual(len(report.passed_checks), 0) - self.assertEqual(len(report.failed_checks), 0) + self.assertEqual(len(report.failed_checks), 1) def test_resource_values_dont_exist(self): resources_path = os.path.join( From a189d3b045313e006bd68032ba6904dfb0e7280b Mon Sep 17 00:00:00 2001 From: iifrach Date: Wed, 30 Jul 2025 14:27:41 +0300 Subject: [PATCH 05/10] Add comments --- .../common/checks_infra/solvers/complex_solvers/and_solver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py index 0cdb63daff..87e440c5e4 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py @@ -18,9 +18,12 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: has_unrendered_attribute = False + # found flag indicates if at least one solver tested the vertex found = False for solver in self.solvers: resource_types = solver.get_resource_types() + # In case that current solver's resource types aren't compatible with vertex, this operation should be + # skipped if resource_types and not self.resource_type_pred(vertex, resource_types): continue found = True From 3445e6b7fe810db2a46da6c820f41fe9d58247bd Mon Sep 17 00:00:00 2001 From: iifrach Date: Thu, 31 Jul 2025 13:46:59 +0300 Subject: [PATCH 06/10] Add test --- .../aws_tagging_different_resource_types.yaml | 38 +++++++++++++++++++ .../dynamo_db_table.tf | 20 ++++++++++ tests/terraform/runner/test_runner.py | 13 +++++++ 3 files changed, 71 insertions(+) create mode 100644 tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml create mode 100644 tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf diff --git a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml new file mode 100644 index 0000000000..de6595669e --- /dev/null +++ b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml @@ -0,0 +1,38 @@ +metadata: + id: "AWS_TAGGING_TF_000yy" + name: "HF_all_Policy" + guidelines: "Tagging needs to be done correctly." + category: "General" + severity: "critical" +scope: + provider: "aws" +definition: + and: + - cond_type: attribute + resource_types: + - aws_organizations_account + - aws_elasticache_cluster + - aws_db_instance + attribute: "tags.AccountType" + operator: regex_match + value: "^Lab$|^Sandbox$|^NonProd$|^Prod$|^PublicProd$|^PublicNonProd$" + - cond_type: attribute + resource_types: + - aws_db_instance # + - aws_organizations_account # + - aws_vpc + - aws_route53_zone + - aws_elasticache_cluster + - aws_lambda_function + - aws_ecs_cluster + - aws_instance + - aws_dynamodb_table + - aws_ebs_volume + - aws_efs_file_system + - aws_eks_cluster + - aws_fsx_windows_file_system + - aws_docdb_cluster + - aws_s3_bucket + attribute: "tags.Billing" + operator: regex_match + value: "APP-\\d{4}$|d{4}$" # Accepts APP-1234 or 4-digit dept codes \ No newline at end of file diff --git a/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf b/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf new file mode 100644 index 0000000000..e809e9a7ec --- /dev/null +++ b/tests/terraform/runner/resources/and_with_different_resource_types/dynamo_db_table.tf @@ -0,0 +1,20 @@ +provider "aws" { + region = "us-east-1" +} +resource "aws_dynamodb_table" "example" { + name = "example-table" + billing_mode = "PAY_PER_REQUEST" # or "PROVISIONED" + hash_key = "id" attribute { + name = "id" + type = "S" # S = String, N = Number, B = Binary + } + tags = { + AppId = "APP-1234" + Billing = "APP-1234" + DynatraceMonitoring = true + EnvironmentType = "PROD" + DataClassification = "HighlyRestricted" + yor_name = "example" + yor_trace = "d9afaa27-0ffc-49c4-90fa-9c3a7649637f" + } +} \ No newline at end of file diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index aab501f28e..c300168310 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -1387,6 +1387,19 @@ def test_resource_negative_values_do_exist(self): self.assertEqual(len(report.passed_checks), 3) self.assertEqual(len(report.failed_checks), 3) + def test_and_with_different_resource_types(self): + runner = Runner(db_connector=self.db_connector()) + current_dir = os.path.dirname(os.path.realpath(__file__)) + resources_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "resources", "and_with_different_resource_types") + extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] + runner.load_external_checks(extra_checks_dir_path) + + report = runner.run(root_folder=resources_path, external_checks_dir=extra_checks_dir_path, + runner_filter=RunnerFilter(framework=["terraform"], checks=["AWS_TAGGING_TF_000yy"])) + self.assertEqual(len(report.passed_checks), 1) + self.assertEqual(len(report.failed_checks), 0) + def test_unrendered_simple_var(self): resources_dir = os.path.join( os.path.dirname(os.path.realpath(__file__)), "resources", "unrendered_vars") From 5cb3af6e5718e9dc4895e4777cfd85b260ce0936 Mon Sep 17 00:00:00 2001 From: iifrach Date: Thu, 31 Jul 2025 13:50:39 +0300 Subject: [PATCH 07/10] Fix lint --- .../solvers/attribute_solvers/base_attribute_solver.py | 1 - 1 file changed, 1 deletion(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index 2001d76dfc..33696362aa 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -42,7 +42,6 @@ def __init__( self.value = value self.is_jsonpath_check = is_jsonpath_check - def get_resource_types(self) -> List[str]: return self.resource_types From 1b16efbe46de14a5f6846ea5f8a5905ed810e0aa Mon Sep 17 00:00:00 2001 From: iifrach Date: Thu, 31 Jul 2025 17:25:28 +0300 Subject: [PATCH 08/10] Support not found case in and and or solvers --- .../base_attribute_solver.py | 4 +- .../solvers/complex_solvers/and_solver.py | 12 ++++-- .../complex_solvers/base_complex_solver.py | 6 +-- .../solvers/complex_solvers/not_solver.py | 4 +- .../solvers/complex_solvers/or_solver.py | 15 ++++++-- .../aws_tagging_different_resource_types.yaml | 2 +- tests/terraform/runner/test_plan_runner.py | 5 ++- tests/terraform/runner/test_runner.py | 37 +++++++++++-------- 8 files changed, 53 insertions(+), 32 deletions(-) diff --git a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py index 33696362aa..010be46215 100644 --- a/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py +++ b/checkov/common/checks_infra/solvers/attribute_solvers/base_attribute_solver.py @@ -4,7 +4,7 @@ import logging import re import json -from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING +from typing import List, Tuple, Dict, Any, Optional, Pattern, TYPE_CHECKING, Union from bc_jsonpath_ng.ext import parse from networkx import DiGraph @@ -70,7 +70,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List concurrent.futures.wait(jobs) return passed_vertices, failed_vertices, unknown_vertices - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: # if this value contains an underendered variable, then we cannot evaluate value checks, # and will return None (for UNKNOWN) # handle edge cases in some policies that explicitly look for blank values diff --git a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py index 87e440c5e4..05f9c77ded 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py @@ -1,4 +1,4 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver @@ -16,7 +16,7 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None def _get_operation(self, *args: Any, **kwargs: Any) -> Any: return reduce(and_, args) - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: has_unrendered_attribute = False # found flag indicates if at least one solver tested the vertex found = False @@ -30,6 +30,12 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: operation = solver.get_operation(vertex) if operation is None: has_unrendered_attribute = True + elif operation == "not found": + continue elif not operation: return False - return None if (has_unrendered_attribute or not found) else True + if not found: + return "not found" + elif has_unrendered_attribute: + return None + return True diff --git a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py index ccd77afde0..5d02e4bd52 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional +from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional, Union from networkx import DiGraph @@ -29,7 +29,7 @@ def _get_negative_op(self, *args: Any) -> Any: return not self._get_operation(args) @abstractmethod - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: raise NotImplementedError() def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: @@ -53,7 +53,7 @@ def run(self, graph_connector: LibraryGraph) -> Tuple[List[Dict[str, Any]], List for _, data in graph_connector.nodes(): if self.resource_type_pred(data, self.resource_types): result = self.get_operation(data) - if result is None: + if result is None or result == "not found": unknown_vertices.append(data) elif result: passed_vertices.append(data) diff --git a/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py index 1c2467a836..87c0109d03 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/not_solver.py @@ -1,4 +1,4 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver @@ -18,6 +18,6 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any: raise Exception('The "not" operator must have exactly one child') return not args[0] - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: result = self.solvers[0].get_operation(vertex) return None if result is None else not result diff --git a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py index 33a054c503..af2665058a 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py @@ -1,4 +1,4 @@ -from typing import List, Any, Dict, Optional +from typing import List, Any, Dict, Optional, Union from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver @@ -16,7 +16,8 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None def _get_operation(self, *args: Any, **kwargs: Any) -> Any: return reduce(or_, args) - def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: + def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: + has_unrendered_attribute = False # found flag indicates if at least one solver tested the vertex found = False for solver in self.solvers: @@ -27,6 +28,14 @@ def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]: continue found = True operation = solver.get_operation(vertex) + if operation is None: + has_unrendered_attribute = True + elif operation == "not found": + continue if operation: return True - return None if not found else False + if not found: + return "not found" + elif has_unrendered_attribute: + return None + return False diff --git a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml index de6595669e..bc3cff87e2 100644 --- a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml +++ b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml @@ -1,5 +1,5 @@ metadata: - id: "AWS_TAGGING_TF_000yy" + id: "CUSTOM_AWS_TAGGING_POLICY" name: "HF_all_Policy" guidelines: "Tagging needs to be done correctly." category: "General" diff --git a/tests/terraform/runner/test_plan_runner.py b/tests/terraform/runner/test_plan_runner.py index c1fd33b4fe..1a3943c59e 100644 --- a/tests/terraform/runner/test_plan_runner.py +++ b/tests/terraform/runner/test_plan_runner.py @@ -318,7 +318,7 @@ def test_runner_nested_child_modules(self): self.assertEqual(report.get_exit_code({'soft_fail': False, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 1) self.assertEqual(report.get_exit_code({'soft_fail': True, 'soft_fail_checks': [], 'soft_fail_threshold': None, 'hard_fail_checks': [], 'hard_fail_threshold': None}), 0) - self.assertEqual(report.get_summary()["failed"], 15) + self.assertEqual(report.get_summary()["failed"], 18) self.assertEqual(report.get_summary()["passed"], 3) failed_check_ids = set([c.check_id for c in report.failed_checks]) @@ -327,7 +327,8 @@ def test_runner_nested_child_modules(self): "CKV_AWS_38", "CKV_AWS_39", "CKV_AWS_58", - "CUSTOM_GRAPH_AWS_1" + "CUSTOM_GRAPH_AWS_1", + "CUSTOM_AWS_TAGGING_POLICY", } assert failed_check_ids == expected_failed_check_ids diff --git a/tests/terraform/runner/test_runner.py b/tests/terraform/runner/test_runner.py index c300168310..aab4c19e8e 100644 --- a/tests/terraform/runner/test_runner.py +++ b/tests/terraform/runner/test_runner.py @@ -172,6 +172,23 @@ def test_for_each_check(self): assert 'module.simple[0].aws_s3_bucket_object.this_file' in failed_resources assert 'module.simple[1].aws_s3_bucket_object.this_file' in failed_resources + + def test_and_with_different_resource_types(self): + runner = Runner(db_connector=self.db_connector()) + current_dir = os.path.dirname(os.path.realpath(__file__)) + resources_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "resources", "and_with_different_resource_types") + extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] + + report = runner.run(root_folder=resources_path, external_checks_dir=extra_checks_dir_path, + runner_filter=RunnerFilter(framework=["terraform"], checks=["CUSTOM_AWS_TAGGING_POLICY"])) + self.assertEqual(len(report.passed_checks), 1) + self.assertEqual(len(report.failed_checks), 0) + + # Remove external checks from registry. + runner.graph_registry.checks[:] = [check for check in runner.graph_registry.checks if + "CUSTOM" not in check.id] + def test_runner_passing_valid_tf(self): current_dir = os.path.dirname(os.path.realpath(__file__)) @@ -1209,7 +1226,7 @@ def test_loading_external_checks_yaml(self): current_dir = os.path.dirname(os.path.realpath(__file__)) extra_checks_dir_path = current_dir + "/extra_yaml_checks" runner.load_external_checks([extra_checks_dir_path]) - self.assertEqual(len(runner.graph_registry.checks), base_len + 5) + self.assertEqual(len(runner.graph_registry.checks), base_len + 6) runner.graph_registry.checks = runner.graph_registry.checks[:base_len] def test_loading_external_checks_yaml_multiple_times(self): @@ -1218,14 +1235,15 @@ def test_loading_external_checks_yaml_multiple_times(self): runner.graph_registry.checks = [] extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] runner.load_external_checks(extra_checks_dir_path) - self.assertEqual(len(runner.graph_registry.checks), 5) + self.assertEqual(len(runner.graph_registry.checks), 6) runner.load_external_checks(extra_checks_dir_path) - self.assertEqual(len(runner.graph_registry.checks), 5) + self.assertEqual(len(runner.graph_registry.checks), 6) graph_checks = [x.id for x in runner.graph_registry.checks] self.assertIn('CUSTOM_GRAPH_AWS_1', graph_checks) self.assertIn('CUSTOM_GRAPH_AWS_2', graph_checks) self.assertIn('CKV2_CUSTOM_1', graph_checks) + self.assertIn('CUSTOM_AWS_TAGGING_POLICY', graph_checks) runner.graph_registry.checks = [] def test_loading_external_checks_python(self): @@ -1387,19 +1405,6 @@ def test_resource_negative_values_do_exist(self): self.assertEqual(len(report.passed_checks), 3) self.assertEqual(len(report.failed_checks), 3) - def test_and_with_different_resource_types(self): - runner = Runner(db_connector=self.db_connector()) - current_dir = os.path.dirname(os.path.realpath(__file__)) - resources_path = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "resources", "and_with_different_resource_types") - extra_checks_dir_path = [current_dir + "/extra_yaml_checks"] - runner.load_external_checks(extra_checks_dir_path) - - report = runner.run(root_folder=resources_path, external_checks_dir=extra_checks_dir_path, - runner_filter=RunnerFilter(framework=["terraform"], checks=["AWS_TAGGING_TF_000yy"])) - self.assertEqual(len(report.passed_checks), 1) - self.assertEqual(len(report.failed_checks), 0) - def test_unrendered_simple_var(self): resources_dir = os.path.join( os.path.dirname(os.path.realpath(__file__)), "resources", "unrendered_vars") From fd618bb415a93c91c172b9d9f63bd718b11fc781 Mon Sep 17 00:00:00 2001 From: iifrach Date: Thu, 31 Jul 2025 17:45:14 +0300 Subject: [PATCH 09/10] Use const --- .../checks_infra/solvers/complex_solvers/and_solver.py | 6 +++--- .../solvers/complex_solvers/base_complex_solver.py | 2 ++ .../checks_infra/solvers/complex_solvers/or_solver.py | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py index 05f9c77ded..7435b9d9f6 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/and_solver.py @@ -2,7 +2,7 @@ from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver -from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver +from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver, NOT_FOUND from functools import reduce from operator import and_ @@ -30,12 +30,12 @@ def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: operation = solver.get_operation(vertex) if operation is None: has_unrendered_attribute = True - elif operation == "not found": + elif operation == NOT_FOUND: continue elif not operation: return False if not found: - return "not found" + return NOT_FOUND elif has_unrendered_attribute: return None return True diff --git a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py index 5d02e4bd52..df0dd6f0f8 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/base_complex_solver.py @@ -11,6 +11,8 @@ if TYPE_CHECKING: from checkov.common.typing import LibraryGraph +NOT_FOUND = "NOT_FOUND" + class BaseComplexSolver(BaseSolver): operator = "" # noqa: CCE003 # a static attribute diff --git a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py index af2665058a..6bd13e8335 100644 --- a/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py +++ b/checkov/common/checks_infra/solvers/complex_solvers/or_solver.py @@ -2,7 +2,7 @@ from checkov.common.graph.checks_infra.enums import Operators from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver -from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver +from checkov.common.checks_infra.solvers.complex_solvers.base_complex_solver import BaseComplexSolver, NOT_FOUND from functools import reduce from operator import or_ @@ -30,12 +30,12 @@ def get_operation(self, vertex: Dict[str, Any]) -> Union[Optional[bool], str]: operation = solver.get_operation(vertex) if operation is None: has_unrendered_attribute = True - elif operation == "not found": + elif operation == NOT_FOUND: continue if operation: return True if not found: - return "not found" + return NOT_FOUND elif has_unrendered_attribute: return None return False From cd540e618da455d26454d5b7e903971db4695c44 Mon Sep 17 00:00:00 2001 From: iifrach Date: Thu, 31 Jul 2025 17:57:18 +0300 Subject: [PATCH 10/10] Change policy --- .../aws_tagging_different_resource_types.yaml | 24 +++---------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml index bc3cff87e2..87d0a35084 100644 --- a/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml +++ b/tests/terraform/runner/extra_yaml_checks/aws_tagging_different_resource_types.yaml @@ -1,38 +1,20 @@ metadata: id: "CUSTOM_AWS_TAGGING_POLICY" name: "HF_all_Policy" - guidelines: "Tagging needs to be done correctly." category: "General" - severity: "critical" scope: provider: "aws" definition: and: - cond_type: attribute resource_types: - - aws_organizations_account - - aws_elasticache_cluster - - aws_db_instance - attribute: "tags.AccountType" + - aws_s3_bucket + attribute: "tags.Name" operator: regex_match - value: "^Lab$|^Sandbox$|^NonProd$|^Prod$|^PublicProd$|^PublicNonProd$" + value: "^bucket-[a-zA-Z0-9-]+$" - cond_type: attribute resource_types: - - aws_db_instance # - - aws_organizations_account # - - aws_vpc - - aws_route53_zone - - aws_elasticache_cluster - - aws_lambda_function - - aws_ecs_cluster - - aws_instance - aws_dynamodb_table - - aws_ebs_volume - - aws_efs_file_system - - aws_eks_cluster - - aws_fsx_windows_file_system - - aws_docdb_cluster - - aws_s3_bucket attribute: "tags.Billing" operator: regex_match value: "APP-\\d{4}$|d{4}$" # Accepts APP-1234 or 4-digit dept codes \ No newline at end of file