Skip to content

Commit de2d632

Browse files
committed
fix typing in other areas
1 parent d6846bd commit de2d632

File tree

5 files changed

+24
-17
lines changed

5 files changed

+24
-17
lines changed

policy_sentry/querying/actions.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def get_actions_for_service(service_prefix: str) -> list[str]:
4444
"""
4545
service_prefix_data = get_service_prefix_data(service_prefix)
4646
results = []
47-
if isinstance(service_prefix_data, dict):
47+
if service_prefix_data and isinstance(service_prefix_data, dict):
4848
results = [
4949
f"{service_prefix}:{item}" for item in service_prefix_data["privileges"]
5050
]
@@ -406,6 +406,9 @@ def get_actions_matching_arn_v2(arn: str) -> list[str]:
406406
results = set()
407407
for raw_arn in raw_arns:
408408
resource_type_name = get_resource_type_name_with_raw_arn(raw_arn)
409+
if resource_type_name is None:
410+
continue
411+
409412
service_prefix = get_service_from_arn(raw_arn)
410413
service_prefix_data = get_service_prefix_data(service_prefix)
411414
for action_name, action_data in service_prefix_data["privileges"].items():
@@ -415,7 +418,7 @@ def get_actions_matching_arn_v2(arn: str) -> list[str]:
415418
return list(results)
416419

417420

418-
def get_actions_matching_condition_key(service_prefix: str, condition_key: str):
421+
def get_actions_matching_condition_key(service_prefix: str, condition_key: str) -> list[str]:
419422
"""
420423
Get a list of actions under a service that allow the use of a specified condition key
421424
@@ -610,7 +613,7 @@ def get_privilege_info(service_prefix: str, action: str) -> dict[str, Any]:
610613
List: The info from the docs about that action, along with some of the info from the docs
611614
"""
612615
try:
613-
privilege_info = iam_definition[service_prefix]["privileges"][action]
616+
privilege_info: dict[str, Any] = iam_definition[service_prefix]["privileges"][action]
614617
privilege_info["service_resources"] = iam_definition[service_prefix][
615618
"resources"
616619
]
@@ -637,22 +640,22 @@ def get_api_documentation_link_for_action(
637640
List: Link to the documentation about that API call
638641
"""
639642
rows = get_action_data(service_prefix, action_name)
640-
for row in rows.get(service_prefix):
641-
doc_link = row.get("api_documentation_link")
643+
for row in rows.get(service_prefix, []):
644+
doc_link: str | None = row.get("api_documentation_link")
642645
if doc_link:
643646
return doc_link
644647
return None
645648

646649

647650
@functools.lru_cache(maxsize=1024)
648-
def get_all_action_links() -> dict[str, str]:
651+
def get_all_action_links() -> dict[str, str | None]:
649652
"""
650653
Gets a huge list of the links to all AWS IAM actions. This is meant for use by Cloudsplaining.
651654
652655
:return: A dictionary of all actions present in the database, with the values being the API documentation links.
653656
"""
654657
all_actions = get_all_actions()
655-
results = {}
658+
results: dict[str, str | None] = {}
656659
for action in all_actions:
657660
try:
658661
service_prefix, action_name = action.split(":")

policy_sentry/querying/actions_v1.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ def get_actions_matching_arn_v1(arn: str) -> list[str]:
227227
results = []
228228
for raw_arn in raw_arns:
229229
resource_type_name = get_resource_type_name_with_raw_arn(raw_arn)
230+
if resource_type_name is None:
231+
continue
232+
230233
service_prefix = get_service_from_arn(raw_arn)
231234
service_prefix_data = get_service_prefix_data(service_prefix)
232235
for action_name, action_data in service_prefix_data["privileges"].items():

policy_sentry/querying/all.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def get_all_actions_v2(
6868
:param lowercase: Set to true to have the list of actions be in all lowercase strings.
6969
:return: A list of all actions present in the database.
7070
"""
71-
all_actions = set()
71+
all_actions: set[str] = set()
7272

7373
for service_prefix in all_service_prefixes:
7474
service_prefix_data = get_service_prefix_data(service_prefix)
@@ -88,5 +88,5 @@ def get_service_authorization_url(service_prefix: str) -> str | None:
8888
"""
8989
Gets the URL to the Actions, Resources, and Condition Keys page for a particular service.
9090
"""
91-
result = iam_definition.get(service_prefix, {}).get("service_authorization_url")
91+
result: str = iam_definition.get(service_prefix, {}).get("service_authorization_url")
9292
return result

policy_sentry/querying/arns.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import functools
99
import warnings
10-
from typing import Any
10+
from typing import Any, cast
1111

1212
from policy_sentry.querying.arns_v1 import get_arn_type_details_v1
1313
from policy_sentry.shared.constants import POLICY_SENTRY_SCHEMA_VERSION_V2
@@ -146,7 +146,7 @@ def get_resource_type_name_with_raw_arn(raw_arn: str) -> str | None:
146146

147147
for resource_name, resource_data in service_data["resources"].items():
148148
if resource_data["arn"].lower() == raw_arn.lower():
149-
return resource_name
149+
return cast("str", resource_name)
150150

151151
return None
152152

policy_sentry/shared/iam_data.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
import functools
77
from pathlib import Path
8-
from typing import Any
8+
from typing import Any, cast
99

1010
from policy_sentry.shared.constants import (
1111
DATASTORE_FILE_PATH,
@@ -22,8 +22,11 @@
2222

2323
@functools.lru_cache(maxsize=1)
2424
def get_iam_definition_schema_version() -> str:
25-
return iam_definition.get(
26-
POLICY_SENTRY_SCHEMA_VERSION_NAME, POLICY_SENTRY_SCHEMA_VERSION_V1
25+
return cast(
26+
"str",
27+
iam_definition.get(
28+
POLICY_SENTRY_SCHEMA_VERSION_NAME, POLICY_SENTRY_SCHEMA_VERSION_V1
29+
),
2730
)
2831

2932

@@ -37,10 +40,8 @@ def get_service_prefix_data(service_prefix: str) -> dict[str, Any]:
3740
Returns:
3841
List: A list of metadata about that service
3942
"""
40-
# result = list(filter(lambda item: item["prefix"] == service_prefix, iam_definition))
41-
result = iam_definition.get(service_prefix, None)
4243
try:
43-
return result
44+
return cast("dict[str, Any]", iam_definition.get(service_prefix, {}))
4445
# pylint: disable=bare-except, inconsistent-return-statements
4546
except:
4647
logger.info(f"Service prefix not {service_prefix} found.")

0 commit comments

Comments
 (0)