Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 82 additions & 112 deletions src/IoTUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,33 @@


class IoTUtils():
_region: str
_thing_name: str = None
_thing_group_name: str = None

def __init__(self, region: str):
self._region = region
self._iotClient = client("iot", region_name=self._region)
self._iamClient = client("iam", region_name=self._region)
self._ggClient = client("greengrassv2", region_name=self._region)
self._iot_client = client("iot", region_name=self._region)
self._iam_client = client("iam", region_name=self._region)
self._gg_client = client("greengrassv2", region_name=self._region)
self._thing_groups = []

@property
def thing_name(self):
return self._thing_name

@property
def thing_group_name(self):
return self._thing_group_name

def set_random_id(self):
def generate_random_id(self):
return str(uuid.uuid4().hex)

def set_thing_name(self, id):
def generate_thing_name(self, id):
return "ggl-uat-thing-" + id

def set_thing_group_name(self, id):
def generate_thing_group_name(self, id):
return "ggl-uat-thing-group-" + id

def set_up_core_device(self):
id = self.set_random_id()
self._thing_name = self.set_thing_name(id)
self._thing_group_name = self.set_thing_group_name(id)

id = self.generate_random_id()
self._thing_name = self.generate_thing_name(id)
device_cert, private_key = self.create_new_thing(self._thing_name)

#TODO: consider the single thing deployment
self.add_thing_to_thing_group(self._thing_name, self._thing_group_name)

data = {
"DEVICE_CERT": device_cert,
"PRIVATE_KEY": private_key,
Expand All @@ -59,15 +49,14 @@ def set_up_core_device(self):

def create_new_thing(self, thing_name: str) -> list | None:
try:
thing_response = self._iotClient.create_thing(thingName=thing_name)

cert_response = self._iotClient.create_keys_and_certificate(
thing_response = self._iot_client.create_thing(thingName=thing_name)
cert_response = self._iot_client.create_keys_and_certificate(
setAsActive=True)

cert_arn = cert_response['certificateArn']

self._iotClient.attach_thing_principal(thingName=thing_name,
principal=cert_arn)
self._iot_client.attach_thing_principal(thingName=thing_name,
principal=cert_arn)

except Exception as error:
print(f"Error when creating thing: {str(error)}")
Expand All @@ -85,9 +74,10 @@ def create_new_thing(self, thing_name: str) -> list | None:
]

def create_new_thing_group(self, thing_group_name: str) -> bool:
response = self._iotClient.create_thing_group(
response = self._iot_client.create_thing_group(
thingGroupName=thing_group_name)
if response is not None:
self._thing_groups.append(response['thingGroupName'])
print(f"Successfully created a thing group: {thing_group_name}")
return True

Expand All @@ -97,13 +87,13 @@ def add_thing_to_thing_group(self, thing_name: str,
thing_group_name: str) -> bool:
# Check if the thing group exists
try:
self._iotClient.describe_thing_group(
self._iot_client.describe_thing_group(
thingGroupName=thing_group_name)
except self._iotClient.exceptions.ResourceNotFoundException:
except self._iot_client.exceptions.ResourceNotFoundException:
# Thing group doesn't exist, create it
self.create_new_thing_group(thing_group_name)

response = self._iotClient.add_thing_to_thing_group(
response = self._iot_client.add_thing_to_thing_group(
thingName=thing_name, thingGroupName=thing_group_name)

if response['ResponseMetadata']['HTTPStatusCode'] == 200:
Expand All @@ -113,63 +103,45 @@ def add_thing_to_thing_group(self, thing_name: str,
return True
return False

def remove_all_thing_groups_from_thing(self,
thing_name: str) -> bool | None:
def remove_thing_from_thing_group(self, thing_name: str,
thing_group_name: str) -> bool | None:
try:
response = self._iotClient.list_thing_groups_for_thing(
thingName=thing_name)
except self._iotClient.exceptions.ResourceNotFoundException:
print(f"Thing {thing_name} does not exist")
return None
# Check if the thing group exists
self._iot_client.describe_thing_group(
thingGroupName=thing_group_name)

for thing_group in response['thingGroups']:
self._iotClient.remove_thing_from_thing_group(
thingGroupName=thing_group['groupName'], thingName=thing_name)
# Check if the thing is in the group before attempting removal
response = self._iot_client.list_things_in_thing_group(
thingGroupName=thing_group_name, recursive=True)

# Delete the corresponding thing group
thing_group_result = self.delete_thing_group(
thing_group['groupName'])
if not thing_group_result:
# If the thing is not in the group, return False
if thing_name not in response.get('things', []):
return False

print(f"Removed all thing groups from thing {thing_name}")
return True

def remove_all_things_from_thing_group(
self, thing_group_name: str) -> bool | None:
try:
self._iotClient.describe_thing_group(
thingGroupName=thing_group_name)
except self._iotClient.exceptions.ResourceNotFoundException:
# Proceed with removal
self._iot_client.remove_thing_from_thing_group(
thingName=thing_name, thingGroupName=thing_group_name)
return True
except self._iot_client.exceptions.ResourceNotFoundException:
print(f"Thing group {thing_group_name} does not exist")
return None

response = self._iotClient.list_things_in_thing_group(
thingGroupName=thing_group_name, recursive=True)

for thing in response['things']:
self._iotClient.remove_thing_from_thing_group(
thingGroupName=thing_group_name, thingName=thing)

# Delete the corresponding thing
thing_result = self.delete_thing(thing)
if not thing_result:
return False

print(f"Removed all things from group {thing_group_name}")
return True
except self._iot_client.exceptions.ClientError as e:
print(
f"Error removing thing '{thing_name}' from thing group '{thing_group_name}': {e}"
)
return False

def delete_core_device(self):
# Cancel all deployments for the core device
try:
deployments = self._ggClient.list_effective_deployments(
deployments = self._gg_client.list_effective_deployments(
coreDeviceThingName=self._thing_name)

if not deployments.get('effectiveDeployments'):
print("No deployments found to cancel")
else:
for deployment in deployments['effectiveDeployments']:
self._ggClient.cancel_deployment(
self._gg_client.cancel_deployment(
deploymentId=deployment['deploymentId'])
print(f"Cancelled all deployments")

Expand All @@ -179,7 +151,7 @@ def delete_core_device(self):

# Delete the core device
try:
self._ggClient.delete_core_device(
self._gg_client.delete_core_device(
coreDeviceThingName=self._thing_name)
print(f"Deleted Greengrass core device '{self._thing_name}'")
except Exception as e:
Expand All @@ -191,80 +163,78 @@ def delete_core_device(self):
def delete_thing(self, thing_name: str):
try:
# Get certificates that attached to the thing
principals = self._iotClient.list_thing_principals(
principals = self._iot_client.list_thing_principals(
thingName=thing_name)['principals']

# For each certificate attached to the thing
for principal in principals:
cert_id = principal.split('/')[-1]

# Detach all policies from the certificate
policies = self._iotClient.list_attached_policies(
policies = self._iot_client.list_attached_policies(
target=principal)['policies']

for policy in policies:
self._iotClient.detach_policy(
self._iot_client.detach_policy(
policyName=policy['policyName'], target=principal)

# Detach certificate from thing
self._iotClient.detach_thing_principal(thingName=thing_name,
principal=principal)
self._iot_client.detach_thing_principal(thingName=thing_name,
principal=principal)

# Update certificate to INACTIVE
self._iotClient.update_certificate(certificateId=cert_id,
newStatus='INACTIVE')
self._iot_client.update_certificate(certificateId=cert_id,
newStatus='INACTIVE')

# Delete the certificate
self._iotClient.delete_certificate(certificateId=cert_id,
forceDelete=True)
self._iot_client.delete_certificate(certificateId=cert_id,
forceDelete=True)

# Finally, delete the thing
self._iotClient.delete_thing(thingName=thing_name)
self._iot_client.delete_thing(thingName=thing_name)

print(
f"Successfully deleted thing '{thing_name}' and its associated certificates"
)
return True

except self._iot_client.exceptions.ResourceNotFoundException:
print(f"Thing '{thing_name}' does not exist, nothing to delete")
return True

except Exception as e:
print(f"Unexpected error deleting thing '{thing_name}': {str(e)}")
return False

def delete_thing_group(self, thing_group_name: str):
try:
self._iotClient.delete_thing_group(thingGroupName=thing_group_name)
self._iot_client.delete_thing_group(thingGroupName=thing_group_name)
print(f"Successfully deleted thing group '{thing_group_name}'")
return True

except self._iot_client.exceptions.ResourceNotFoundException:
print(
f"Thing group '{thing_group_name}' does not exist, nothing to delete"
)
return True

except Exception as e:
print(
f"Unexpected error deleting thing group '{thing_group_name}': {str(e)}"
)
return False

def clean_up(self, thing_name=None, thing_group_name=None):
"""
Comprehensive cleanup method that handles both scenarios:
1. When a thing is added to multiple thing groups
2. When multiple things are added to a single thing group
"""
def clean_up(self):
print("\nRunning IoT clean up...")
try:
if thing_name:
# Scenario 1: Clean up a specific thing and all its associated thing groups
print(
f"Cleaning up thing '{thing_name}' and all its associated thing groups"
)
self.remove_all_thing_groups_from_thing(thing_name)

if thing_group_name:
# Scenario 2: Clean up a specific thing group and all its associated things
print(
f"Cleaning up thing group '{thing_group_name}' and all its associated things"
)
self.remove_all_things_from_thing_group(thing_group_name)

for thing_group in self._thing_groups:
self.remove_thing_from_thing_group(self._thing_name,
thing_group)
self.delete_thing_group(thing_group)

# Delete the core device
self.delete_core_device()
self.delete_thing(self._thing_name)
print("IoT clean-up completed.\n")

Expand Down Expand Up @@ -312,20 +282,20 @@ def _create_iot_role(self) -> str | None:
policy_name = "ggl-uat-role-token-exchange-policy"

try:
role_response = self._iamClient.get_role(RoleName=role_name)
role_response = self._iam_client.get_role(RoleName=role_name)
print(f"Role '{role_name}' already exists.")
return role_response['Role']['Arn']

except self._iamClient.exceptions.NoSuchEntityException:
role_response = self._iamClient.create_role(
except self._iam_client.exceptions.NoSuchEntityException:
role_response = self._iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy))

policy_response = self._iamClient.create_policy(
policy_response = self._iam_client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(token_exchange_policy))

self._iamClient.attach_role_policy(
self._iam_client.attach_role_policy(
RoleName=role_name, PolicyArn=policy_response['Policy']['Arn'])

return role_response['Role']['Arn']
Expand All @@ -338,13 +308,13 @@ def _create_role_alias(self, role_arn: str) -> str | None:
role_alias_name = "ggl-uat-role-alias"

try:
response = self._iotClient.describe_role_alias(
response = self._iot_client.describe_role_alias(
roleAlias=role_alias_name)
print(f"Role alias '{role_alias_name}' already exists.")
return response['roleAliasDescription']['roleAliasArn']

except self._iotClient.exceptions.ResourceNotFoundException:
response = self._iotClient.create_role_alias(
except self._iot_client.exceptions.ResourceNotFoundException:
response = self._iot_client.create_role_alias(
roleAlias=role_alias_name,
roleArn=role_arn,
credentialDurationSeconds=3600)
Expand Down Expand Up @@ -378,13 +348,13 @@ def _attach_thing_policy(self, role_alias_arn: str, cert_arn: str):
}

try:
self._iotClient.get_policy(policyName=iot_policy_name)
self._iot_client.get_policy(policyName=iot_policy_name)
print(f"Policy '{iot_policy_name}' already exists.")

except self._iotClient.exceptions.ResourceNotFoundException:
self._iotClient.create_policy(
except self._iot_client.exceptions.ResourceNotFoundException:
self._iot_client.create_policy(
policyName=iot_policy_name,
policyDocument=json.dumps(policy_document))

self._iotClient.attach_policy(policyName=iot_policy_name,
target=cert_arn)
self._iot_client.attach_policy(policyName=iot_policy_name,
target=cert_arn)
Loading