Skip to content

Commit 81e568e

Browse files
kaiseroKyle ParrishOliver Kaiser
authored
v0.1.5 (#31)
* Add function to query hitcounts https://developer.cisco.com/site/ftd-api-reference/#!introduction-to-firepower-threat-defense-rest-api/about-the-firepower-threat-defense-rest-api * Fixed #28 #30 and merged #29 Co-authored-by: Kyle Parrish <arnydo@gmail.com> Co-authored-by: Oliver Kaiser <ok@ong.at>
1 parent b09388b commit 81e568e

File tree

6 files changed

+289
-79
lines changed

6 files changed

+289
-79
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
# 0.1.5
2+
3+
## Bugfixes
4+
5+
Fixed issue with id_by_name helper functions caused by incorrect cache impl (#28)
6+
Fixed missing interface_id param for interface PUT operations (#30)
7+
8+
## Enhancements
9+
10+
Added additional unit tests for id_by_name operations
11+
Merged and enhanced hitcount implementation by @arnydo (#29)
12+
113
# 0.1.4
214

315
## Bugfixes

fireREST/__init__.py

Lines changed: 111 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# -*- coding: utf-8 -*-
1+
# -*- coding: utf-8 -*-
22

33
import json
44
import logging
@@ -84,6 +84,20 @@ def _url(self, namespace='base', path=''):
8484
raise exc.InvalidNamespaceError(f'Invalid namespace "{namespace}" provided. Options: {options.keys()}')
8585
return options[namespace]
8686

87+
def _filter(self, items=None):
88+
'''
89+
Get filter string from list of key, value pairs
90+
: param items: list of key value pairs used to build filter string
91+
: return: valid filter string
92+
'''
93+
if items:
94+
filter_str = ''
95+
for k, v in items.items():
96+
if v:
97+
filter_str += f'{k}:{v};'
98+
return filter_str.rstrip(';')
99+
return ''
100+
87101
@utils.handle_errors
88102
def _request(self, method: str, url: str, params=None, auth=None, data=None):
89103
if params:
@@ -210,7 +224,6 @@ def _sanitize(self, method: str, payload: Dict):
210224
sanitized_payload.pop('id', None)
211225
return sanitized_payload
212226

213-
# @utils.cache_result
214227
@utils.validate_object_type
215228
@utils.minimum_version_required('6.1.0')
216229
def get_object_id_by_name(self, object_type: str, object_name: str):
@@ -228,7 +241,6 @@ def get_object_id_by_name(self, object_type: str, object_name: str):
228241
return obj['id']
229242
return None
230243

231-
@utils.cache_result
232244
@utils.minimum_version_required('6.1.0')
233245
def get_device_id_by_name(self, device_name: str):
234246
'''
@@ -244,52 +256,48 @@ def get_device_id_by_name(self, device_name: str):
244256
return device['id']
245257
return None
246258

247-
@utils.cache_result
248259
@utils.minimum_version_required('6.2.3')
249-
def get_device_hapair_id_by_name(self, device_hapair_name: str):
260+
def get_devicehapair_id_by_name(self, hapair_name: str):
250261
'''
251262
helper function to retrieve device ha - pair id by name
252-
: param device_hapair_name: name of the ha - pair
263+
: param hapair_name: name of the ha - pair
253264
: return: id if ha - pair is found, None otherwise
254265
'''
255266
request = '/devicehapairs/ftddevicehapairs'
256267
url = self._url('config', request)
257268
ha_pairs = self._get(url)
258269
for ha_pair in ha_pairs:
259-
if ha_pair['name'] == device_hapair_name:
270+
if ha_pair['name'] == hapair_name:
260271
return ha_pair['id']
261272
return None
262273

263-
@utils.cache_result
264274
@utils.minimum_version_required('6.2.3')
265-
def get_device_id_from_hapair(self, device_hapair_id: str):
275+
def get_device_id_from_hapair(self, hapair_id: str):
266276
'''
267277
helper function to retrieve device id from ha - pair
268-
: param device_hapar_id: id of ha - pair
278+
: param hapair_id: id of ha - pair
269279
: return: id if device is found, None otherwise
270280
'''
271-
request = f'/devicehapairs/ftddevicehapairs/{device_hapair_id}'
281+
request = f'/devicehapairs/ftddevicehapairs/{hapair_id}'
272282
url = self._url('config', request)
273283
ha_pair = self._get(url)
274284
return ha_pair['primary']['id']
275285

276-
@utils.cache_result
277286
@utils.minimum_version_required('6.2.3')
278-
def get_nat_policy_id_by_name(self, nat_policy_name: str):
287+
def get_natpolicy_id_by_name(self, policy_name: str):
279288
'''
280289
helper function to retrieve nat policy id by name
281-
: param nat_policy_name: name of nat policy
290+
: param policy_name: name of nat policy
282291
: return: policy id if nat policy is found, None otherwise
283292
'''
284293
request = '/policy/ftdnatpolicies'
285294
url = self._url('config', request)
286-
nat_policies = self._get(url)
287-
for nat_policy in nat_policies:
288-
if nat_policy['name'] == nat_policy_name:
289-
return nat_policy['id']
295+
natpolicies = self._get(url)
296+
for natpolicy in natpolicies:
297+
if natpolicy['name'] == policy_name:
298+
return natpolicy['id']
290299
return None
291300

292-
@utils.cache_result
293301
@utils.minimum_version_required('6.1.0')
294302
def get_accesspolicy_id_by_name(self, policy_name: str):
295303
'''
@@ -299,22 +307,34 @@ def get_accesspolicy_id_by_name(self, policy_name: str):
299307
'''
300308
request = '/policy/accesspolicies'
301309
url = self._url('config', request)
302-
accesspolicies = self._get(url)
303-
for accesspolicy in accesspolicies:
304-
if accesspolicy['name'] == policy_name:
305-
return accesspolicy['id']
310+
policies = self._get(url)
311+
for policy in policies:
312+
if policy['name'] == policy_name:
313+
return policy['id']
314+
return None
315+
316+
def get_prefilterpolicy_id_by_name(self, policy_name: str):
317+
'''
318+
helper function to retrieve prefilter policy id by name
319+
: param policy_name: name of the prefilter policy
320+
: return: prefilter policy id if policy name is found, None otherwise
321+
'''
322+
request = '/policy/prefilterpolicies'
323+
url = self._url('config', request)
324+
prefilterpolicies = self._get(url)
325+
for policy in prefilterpolicies:
326+
if policy['name'] == policy_name:
327+
return policy['id']
306328
return None
307329

308-
@utils.cache_result
309330
@utils.minimum_version_required('6.1.0')
310-
def get_accessrule_id_by_name(self, policy_name: str, rule_name: str):
331+
def get_accessrule_id_by_name(self, policy_id: str, rule_name: str):
311332
'''
312333
helper function to retrieve access control policy rule id by name
313-
: param policy_name: name of the access control policy that will be queried
314-
: param rule_name: name of the access control policy rule
315-
: return: accesspolicy rule id if access control policy rule is found, None otherwise
334+
: param policy_id: id of the accesspolicy that will be queried
335+
: param rule_name: name of the accessrule
336+
: return: accesspolicy rule id if accessrule is found, None otherwise
316337
'''
317-
policy_id = self.get_accesspolicy_id_by_name(policy_name)
318338
request = f'/policy/accesspolicies/{policy_id}/accessrules'
319339
url = self._url('config', request)
320340
accessrules = self._get(url)
@@ -323,32 +343,30 @@ def get_accessrule_id_by_name(self, policy_name: str, rule_name: str):
323343
return accessrule['id']
324344
return None
325345

326-
@utils.cache_result
327346
@utils.minimum_version_required('6.1.0')
328-
def get_syslog_alert_id_by_name(self, syslog_alert_name: str):
347+
def get_syslogalert_id_by_name(self, alert_name: str):
329348
'''
330349
helper function to retrieve syslog alert object id by name
331-
: param syslog_alert_name: name of syslog alert object
350+
: param alert_name: name of syslog alert object
332351
: return: syslogalert id if syslog alert is found, None otherwise
333352
'''
334353
syslogalerts = self.get_syslogalerts()
335-
for syslog_alert in syslogalerts:
336-
if syslog_alert['name'] == syslog_alert_name:
337-
return syslog_alert['id']
354+
for syslogalert in syslogalerts:
355+
if syslogalert['name'] == alert_name:
356+
return syslogalert['id']
338357
return None
339358

340-
@utils.cache_result
341359
@utils.minimum_version_required('6.1.0')
342-
def get_snmp_alert_id_by_name(self, snmp_alert_name: str):
360+
def get_snmpalert_id_by_name(self, alert_name: str):
343361
'''
344362
helper function to retrieve snmp alert object id by name
345-
: param snmp_alert_name: name of snmp alert object
363+
: param alert_name: name of snmp alert object
346364
: return: snmpalert id if snmp alert is found, None otherwise
347365
'''
348-
snmp_alerts = self.get_snmpalerts()
349-
for snmp_alert in snmp_alerts:
350-
if snmp_alert['name'] == snmp_alert_name:
351-
return snmp_alert['id']
366+
snmpalerts = self.get_snmpalerts()
367+
for snmpalert in snmpalerts:
368+
if snmpalert['name'] == alert_name:
369+
return snmpalert['id']
352370
return None
353371

354372
def get_domain_id_by_name(self, domain_name: str):
@@ -475,46 +493,46 @@ def delete_device(self, device_id: str):
475493
return self._delete(url)
476494

477495
@utils.minimum_version_required('6.2.3')
478-
def get_device_hapairs(self):
496+
def get_devicehapairs(self):
479497
url = self._url('config', '/devicehapairs/ftddevicehapairs')
480498
return self._get(url)
481499

482500
@utils.minimum_version_required('6.2.3')
483-
def create_device_hapair(self, data: Dict):
501+
def create_devicehapair(self, data: Dict):
484502
url = self._url('config', '/devicehapairs/ftddevicehapairs')
485503
return self._get(url, data)
486504

487505
@utils.minimum_version_required('6.2.3')
488-
def get_device_hapair(self, device_hapair_id: str):
489-
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
506+
def get_devicehapair(self, devicehapair_id: str):
507+
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
490508
return self._get(url)
491509

492510
@utils.minimum_version_required('6.2.3')
493-
def update_device_hapair(self, data: Dict, device_hapair_id: str):
494-
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
511+
def update_devicehapair(self, data: Dict, devicehapair_id: str):
512+
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
495513
return self._update(url, data)
496514

497515
@utils.minimum_version_required('6.2.3')
498-
def delete_device_hapair(self, device_hapair_id: str):
499-
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}')
516+
def delete_devicehapair(self, devicehapair_id: str):
517+
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}')
500518
return self._delete(url)
501519

502520
@utils.minimum_version_required('6.3.0')
503-
def get_device_hapair_monitoredinterfaces(self, device_hapair_id: str):
504-
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces',)
521+
def get_devicehapair_monitoredinterfaces(self, devicehapair_id: str):
522+
url = self._url('config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces')
505523
return self._get(url)
506524

507525
@utils.minimum_version_required('6.3.0')
508-
def get_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str):
526+
def get_devicehapair_monitoredinterface(self, devicehapair_id: str, monitoredinterface_id: str):
509527
url = self._url(
510-
'config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}',
528+
'config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces/{monitoredinterface_id}',
511529
)
512530
return self._get(url)
513531

514532
@utils.minimum_version_required('6.3.0')
515-
def update_device_hapair_monitoredinterface(self, device_hapair_id: str, monitoredinterface_id: str, data: Dict):
533+
def update_devicehapair_monitoredinterface(self, devicehapair_id: str, monitoredinterface_id: str, data: Dict):
516534
url = self._url(
517-
'config', f'/devicehapairs/ftddevicehapairs/{device_hapair_id}/monitoredinterfaces/{monitoredinterface_id}',
535+
'config', f'/devicehapairs/ftddevicehapairs/{devicehapair_id}/monitoredinterfaces/{monitoredinterface_id}',
518536
)
519537
return self._update(url, data)
520538

@@ -525,12 +543,12 @@ def get_ftd_physical_interfaces(self, device_id: str):
525543

526544
@utils.minimum_version_required('6.1.0')
527545
def get_ftd_physical_interface(self, device_id: str, interface_id: str):
528-
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}',)
546+
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}')
529547
return self._get(url)
530548

531549
@utils.minimum_version_required('6.1.0')
532-
def update_ftd_physical_interface(self, device_id: str, data: Dict):
533-
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces')
550+
def update_ftd_physical_interface(self, device_id: str, interface_id: str, data: Dict):
551+
url = self._url('config', f'/devices/devicerecords/{device_id}/physicalinterfaces/{interface_id}')
534552
return self._update(url, data)
535553

536554
@utils.minimum_version_required('6.1.0')
@@ -545,17 +563,17 @@ def get_ftd_redundant_interfaces(self, device_id: str):
545563

546564
@utils.minimum_version_required('6.1.0')
547565
def get_ftd_redundant_interface(self, device_id: str, interface_id: str):
548-
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}',)
566+
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
549567
return self._get(url)
550568

551569
@utils.minimum_version_required('6.1.0')
552-
def update_ftd_redundant_interface(self, device_id: str, data: Dict):
553-
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces')
570+
def update_ftd_redundant_interface(self, device_id: str, interface_id: str, data: Dict):
571+
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
554572
return self._update(url, data)
555573

556574
@utils.minimum_version_required('6.1.0')
557575
def delete_ftd_redundant_interface(self, device_id: str, interface_id: str):
558-
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}',)
576+
url = self._url('config', f'/devices/devicerecords/{device_id}/redundantinterfaces/{interface_id}')
559577
return self._delete(url)
560578

561579
@utils.minimum_version_required('6.1.0')
@@ -570,17 +588,17 @@ def get_ftd_portchannel_interfaces(self, device_id: str):
570588

571589
@utils.minimum_version_required('6.1.0')
572590
def get_ftd_portchannel_interface(self, device_id: str, interface_id: str):
573-
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}',)
591+
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
574592
return self._get(url)
575593

576594
@utils.minimum_version_required('6.1.0')
577-
def update_ftd_portchannel_interface(self, device_id: str, data: Dict):
578-
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces')
595+
def update_ftd_portchannel_interface(self, device_id: str, interface_id: str, data: Dict):
596+
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
579597
return self._update(url, data)
580598

581599
@utils.minimum_version_required('6.1.0')
582600
def delete_ftd_portchannel_interface(self, device_id: str, interface_id: str):
583-
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}',)
601+
url = self._url('config', f'/devices/devicerecords/{device_id}/etherchannelinterfaces/{interface_id}')
584602
return self._delete(url)
585603

586604
@utils.minimum_version_required('6.1.0')
@@ -620,17 +638,17 @@ def get_ftd_ipv4staticroutes(self, device_id: str):
620638

621639
@utils.minimum_version_required('6.3.0')
622640
def get_ftd_ipv4staticroute(self, device_id: str, route_id: str):
623-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
641+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
624642
return self._get(url)
625643

626644
@utils.minimum_version_required('6.3.0')
627645
def update_ftd_ipv4staticroute(self, device_id: str, route_id: str, data: Dict):
628-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
646+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
629647
return self._update(url, data)
630648

631649
@utils.minimum_version_required('6.3.0')
632650
def delete_ftd_ipv4staticroute(self, device_id: str, route_id: str):
633-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}',)
651+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv4staticroutes/{route_id}')
634652
return self._delete(url)
635653

636654
@utils.minimum_version_required('6.3.0')
@@ -645,17 +663,17 @@ def get_ftd_ipv6staticroutes(self, device_id: str):
645663

646664
@utils.minimum_version_required('6.3.0')
647665
def get_ftd_ipv6staticroute(self, device_id: str, route_id: str):
648-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
666+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
649667
return self._get(url)
650668

651669
@utils.minimum_version_required('6.3.0')
652670
def update_ftd_ipv6staticroute(self, device_id: str, route_id: str, data: Dict):
653-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
671+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
654672
return self._update(url, data)
655673

656674
@utils.minimum_version_required('6.3.0')
657675
def delete_ftd_ipv6staticroute(self, device_id: str, route_id: str):
658-
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}',)
676+
url = self._url('config', f'/devices/devicerecords/{device_id}/routing/ipv6staticroutes/{route_id}')
659677
return self._delete(url)
660678

661679
@utils.minimum_version_required('6.1.0')
@@ -886,3 +904,23 @@ def get_policy_assignment(self, policy_id: str):
886904
def update_policy_assignment(self, policy_id: str, data: Dict):
887905
url = self._url('config', f'/assignment/policyassignments/{policy_id}')
888906
return self._update(url, data)
907+
908+
@utils.minimum_version_required('6.4.0')
909+
def get_hitcounts(self, policy_id: str, device_id: str, rule_ids=None, fetch_zero_hitcount=True):
910+
params = {
911+
'filter': self._filter({'deviceId': device_id, 'ids': rule_ids, 'fetchZeroHitCount': fetch_zero_hitcount})
912+
}
913+
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
914+
return self._get(url, params)
915+
916+
@utils.minimum_version_required('6.4.0')
917+
def update_hitcounts(self, policy_id: str, device_id: str, rule_ids=None):
918+
params = {'filter': self._filter({'deviceId': device_id, 'ids': rule_ids})}
919+
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
920+
return self._update(url, params)
921+
922+
@utils.minimum_version_required('6.4.0')
923+
def delete_hitcounts(self, policy_id: str, device_id: str, rule_ids=None):
924+
params = {'filter': self._filter({'deviceId': device_id, 'ids': rule_ids})}
925+
url = self._url('config', f'/policy/accesspolicies/{policy_id}/operational/hitcounts')
926+
return self._update(url, params)

0 commit comments

Comments
 (0)