5
5
"""
6
6
# pylint: disable=invalid-name
7
7
8
+ import inspect
8
9
import logging
9
10
import json
10
- from typing import Any , Union , List , Dict
11
+ from typing import Any , Union , List , Dict , Optional
11
12
import time
13
+ from dataclasses import dataclass
12
14
from datetime import datetime
13
15
import re
14
16
import base64
38
40
C8Y_TOKEN_TOPIC = "c8y/s/dat"
39
41
40
42
43
+ @dataclass
44
+ class Certificate :
45
+ issuer : str = ""
46
+ subject : str = ""
47
+ thumbprint : str = ""
48
+
49
+ @classmethod
50
+ def from_dict (cls , env ):
51
+ return cls (
52
+ ** {k : v for k , v in env .items () if k in inspect .signature (cls ).parameters }
53
+ )
54
+
55
+ @property
56
+ def is_self_signed (self ):
57
+ if self .issuer is None or self .subject is None :
58
+ return False
59
+ return self .issuer and self .issuer == self .subject
60
+
61
+
41
62
class MQTTMessage :
42
63
timestamp : float
43
64
topic : str
@@ -58,50 +79,22 @@ class ThinEdgeIO(DeviceLibrary):
58
79
"""ThinEdgeIO Library"""
59
80
60
81
def __init__ (
61
- self ,
62
- image : str = DeviceLibrary .DEFAULT_IMAGE ,
63
- adapter : str = None ,
64
- bootstrap_script : str = DeviceLibrary .DEFAULT_BOOTSTRAP_SCRIPT ,
65
- ** kwargs ,
82
+ self ,
83
+ image : str = DeviceLibrary .DEFAULT_IMAGE ,
84
+ adapter : str = None ,
85
+ bootstrap_script : str = DeviceLibrary .DEFAULT_BOOTSTRAP_SCRIPT ,
86
+ ** kwargs ,
66
87
):
67
88
super ().__init__ (
68
89
image = image , adapter = adapter , bootstrap_script = bootstrap_script , ** kwargs
69
90
)
70
91
92
+ # track self-signed devices certificates for cleanup after the suite has finished
93
+ self ._certificates : Dict [str , str ] = {}
94
+
71
95
# Configure retries
72
96
retry .configure_retry_on_members (self , "^_assert_" )
73
97
74
- def should_delete_device_certificate (self ) -> bool :
75
- """Check if the certificate should be deleted or not
76
- """
77
- # Only delete the certificate if it is a self signed certificate
78
-
79
- # Parse the certificate details via tedge cert show
80
- lines = []
81
- try :
82
- lines = self .execute_command ("tedge cert show" , ignore_exit_code = True ).splitlines ()
83
- except Exception as ex :
84
- # Ignore any errors
85
- log .info ("Could not read certificate information. %s" , ex )
86
-
87
- # Prase output and decode the certificate information
88
- # Use simple parser to avoid having to decode the certificate
89
- certificate = {}
90
- for line in lines :
91
- key , _ , value = line .partition (":" )
92
- if key and value :
93
- certificate [key .lower ().strip ()] = value .strip ()
94
-
95
- issuer = certificate .get ("issuer" , None )
96
- subject = certificate .get ("subject" , None )
97
-
98
- if issuer is None or subject is None :
99
- return False
100
-
101
- # Self signed certificates generally have the same issue information as the subject
102
- is_self_signed = subject == issuer
103
- return is_self_signed
104
-
105
98
def end_suite (self , _data : Any , result : Any ):
106
99
"""End suite hook which is called by Robot Framework
107
100
when the test suite has finished
@@ -112,16 +105,34 @@ def end_suite(self, _data: Any, result: Any):
112
105
"""
113
106
log .info ("Suite %s (%s) ending" , result .name , result .message )
114
107
115
- for device in self .devices .values ():
108
+ log .info (
109
+ "Removing the following self-signed certificates (thumbprints): %s" ,
110
+ self ._certificates ,
111
+ )
112
+ for thumbprint , device_sn in self ._certificates .items ():
116
113
try :
117
- if isinstance (device , DeviceAdapter ):
118
- if device .should_cleanup :
119
- if self .should_delete_device_certificate ():
120
- self .remove_certificate (device )
121
- self .remove_device (device )
114
+ self .remove_certificate (thumbprint )
115
+ c8y_lib .device_mgmt .inventory .delete_device_and_user (
116
+ device_sn , "c8y_Serial"
117
+ )
122
118
except Exception as ex :
123
119
log .warning ("Could not cleanup certificate/device. %s" , ex )
124
120
121
+ # remove device management objects and related users
122
+ # Note: this needs to run in addition to the certificate cleanup
123
+ # for device that don't use self-signed certificate, and delete_device_and_user
124
+ # does a no-op if the managed object and/or user does not exist, so it is safe
125
+ # to run multiple times
126
+ for device in self .devices .values ():
127
+ try :
128
+ device_sn = device .get_id ()
129
+ # Note: this is a no-op if the device or user does not exist
130
+ c8y_lib .device_mgmt .inventory .delete_device_and_user (
131
+ device_sn , "c8y_Serial"
132
+ )
133
+ except Exception as ex :
134
+ log .warning ("Could not cleanup device. %s" , ex )
135
+
125
136
super ().end_suite (_data , result )
126
137
127
138
def end_test (self , _data : Any , result : Any ):
@@ -136,21 +147,51 @@ def end_test(self, _data: Any, result: Any):
136
147
if not result .passed :
137
148
log .info ("Test '%s' failed: %s" , result .name , result .message )
138
149
139
- # TODO: Only cleanup on the suite?
140
- # self.remove_certificate_and_device(self.current)
150
+ # store self-signed certificates before anything else is done with the devices
151
+ # record each self-signed certificate within the current set of devices
152
+ # as the certificates can change within tests which would result in the suite
153
+ # teardown not knowing about any intermediate artifacts
154
+ for device in self .devices .values ():
155
+ try :
156
+ if isinstance (device , DeviceAdapter ) and device .should_cleanup :
157
+ certificate = self .get_certificate_details (device )
158
+ if certificate .is_self_signed and certificate .thumbprint :
159
+ self ._certificates [certificate .thumbprint ] = device .get_id ()
160
+ except Exception as ex :
161
+ log .warning ("Could not cleanup certificate/device. %s" , ex )
162
+
141
163
super ().end_test (_data , result )
142
164
143
- @keyword ("Delete Managed Object" )
144
- def delete_managed_object (self , internal_id : str , ** kwargs ) -> None :
145
- """Delete managed object and related device user
165
+ @keyword ("Register Certificate For Cleanup" )
166
+ def register_certificate (
167
+ self ,
168
+ cloud_profile : Optional [str ] = None ,
169
+ common_name : Optional [str ] = None ,
170
+ device_name : Optional [str ] = None ,
171
+ ):
172
+ """Register a self-signed certificate for deletion after the test suite
173
+ has finished.
146
174
147
175
Args:
148
- internal_id (str): Internal id of the managed object
176
+ device_name (Optional[str], optional): device name. Defaults to current device.
177
+ cloud_profile (Optional[str], optional): Cloud profile name. Defaults to None.
178
+
179
+ Raises:
180
+ ValueError: No device context given
149
181
"""
150
- url = f"{ c8y_lib .c8y .base_url } /inventory/managedObjects/{ internal_id } "
182
+ device = self .current
183
+ if device_name :
184
+ if device_name in self .devices :
185
+ device = self .devices .get (device_name )
186
+
187
+ if not device :
188
+ raise ValueError (
189
+ f"Unable to execute the command as the device: '{ device_name } ' has not been setup"
190
+ )
151
191
152
- response = c8y_lib .c8y .session .delete (url )
153
- response .raise_for_status ()
192
+ certificate = self .get_certificate_details (device , cloud_profile = cloud_profile )
193
+ if certificate .is_self_signed and certificate .thumbprint :
194
+ self ._certificates [certificate .thumbprint ] = common_name or device .get_id ()
154
195
155
196
@keyword ("Get Debian Architecture" )
156
197
def get_debian_architecture (self ):
@@ -299,30 +340,60 @@ def log_operations(self, mo_id: str, status: str = None):
299
340
else :
300
341
log .info ("No operations found" )
301
342
302
- def remove_certificate (self , device : DeviceAdapter = None ):
303
- """Remove trusted certificate"""
343
+ def get_certificate_details (
344
+ self , device : DeviceAdapter = None , cloud_profile : Optional [str ] = None
345
+ ) -> Certificate :
346
+ """Get the details about the device's certificate
347
+
348
+ Args:
349
+ device (DeviceAdapter, optional): Device. Defaults to the current device.
350
+ cloud_profile (Optional[str], optional): Optional cloud profile name. Defaults to None.
351
+
352
+ Returns:
353
+ Certificate: Information about the current certificate
354
+ """
355
+ certificate = Certificate ()
304
356
if device is None :
305
357
device = self .current
306
358
307
359
if not device :
308
360
log .info (f"No certificate to remove as the device as not been set" )
309
- return
361
+ return certificate
310
362
311
- result = device .execute_command (
312
- "command -v tedge >/dev/null && (tedge cert show | grep '^Thumbprint:' | cut -d' ' -f2 | tr A-Z a-z) || true" ,
313
- )
314
- if result .return_code != 0 :
315
- log .info ("Failed to get device certificate fingerprint. %s" , result .stdout )
316
- return
363
+ # Parse the certificate details via tedge cert show
364
+ lines = []
365
+ try :
366
+ command = "tedge cert show c8y"
367
+ if cloud_profile :
368
+ command += f" --profile { cloud_profile } "
369
+ lines = self .execute_command (command , ignore_exit_code = True ).splitlines ()
317
370
318
- fingerprint = result .stdout .strip ()
319
- if fingerprint :
320
- try :
321
- c8y_lib .trusted_certificate_delete (fingerprint )
322
- except Exception as ex :
323
- log .warning (
324
- "Could not remove device certificate. error=%s" , ex
325
- )
371
+ except Exception as ex :
372
+ # Ignore any errors
373
+ log .info ("Could not read certificate information. %s" , ex )
374
+ return certificate
375
+
376
+ # Prase output and decode the certificate information
377
+ # Use simple parser to avoid having to decode the certificate
378
+ fields = {}
379
+ for line in lines :
380
+ key , _ , value = line .partition (":" )
381
+ if key and value :
382
+ fields [key .lower ().strip ()] = value .strip ()
383
+
384
+ certificate = Certificate .from_dict (fields )
385
+ return certificate
386
+
387
+ def remove_certificate (self , thumbprint : str ):
388
+ """Remove trusted certificate
389
+
390
+ Args:
391
+ thumbprint (str): Certificate thumbprint/fingerprint
392
+ """
393
+ try :
394
+ c8y_lib .trusted_certificate_delete (thumbprint .lower ())
395
+ except Exception as ex :
396
+ log .warning ("Could not remove device certificate. error=%s" , ex )
326
397
327
398
def remove_device (self , device : DeviceAdapter = None ):
328
399
"""Remove device from the cloud"""
@@ -340,14 +411,13 @@ def remove_device(self, device: DeviceAdapter = None):
340
411
"Device serial number is empty, so nothing to delete from Cumulocity"
341
412
)
342
413
return
343
- device_mo = c8y_lib .c8y .identity .get_object (device_sn , "c8y_Serial" )
344
- c8y_lib .device_mgmt .inventory .delete_device_and_user (device_mo )
414
+ c8y_lib .device_mgmt .inventory .delete_device_and_user (
415
+ device_sn , "c8y_Serial"
416
+ )
345
417
except KeyError :
346
418
log .info ("Device does not exist in cloud, nothing to delete" )
347
419
except Exception as ex :
348
- log .warning (
349
- "Could not remove device. error=%s" , ex
350
- )
420
+ log .warning ("Could not remove device. error=%s" , ex )
351
421
352
422
@keyword ("Download From GitHub" )
353
423
def download_from_github (self , * run_id : str , arch : str = "aarch64" ):
0 commit comments