Intel examples #538
Answered
by
jshcodes
tommorgan365
asked this question in
Q&A
Intel examples
#538
-
Hello, Thanks, |
Beta Was this translation helpful? Give feedback.
Answered by
jshcodes
Jan 31, 2022
Replies: 1 comment 1 reply
-
Hi @tommorgan365 - There is! (But it's not posted yet... 😢 ) I'm in the process of finalizing the MISP importer conversion over to using the FalconPy SDK. In the interim, here's a snippet to get you started. (Please note: This code is still in flight, and is meant to be used as an example in this case.)
from falconpy import Intel
class IntelAPIClient:
"""This class provides the interface for the CrowdStrike Intel API."""
def __init__(self, client_id, client_secret, crowdstrike_url, api_request_max, use_ssl: bool = True):
"""Construct an instance of the IntelAPIClient class.
:param client_id: CrowdStrike API Client ID
:param client_secret: CrowdStrike API Client Secret
:param crowdstrike_url: CrowdStrike Base URL / Base URL shortname
:param api_request_max [int]: Maximum number of records to return per API request
:param use_ssl [bool]: Enable SSL validation to the CrowdStrike Cloud (default: True)
"""
self.falcon = Intel(client_id=client_id, client_secret=client_secret, base_url=crowdstrike_url, ssl_verify=use_ssl)
self.valid_report_types = ["csa", "csir", "csit", "csgt", "csia", "csmr", "csta", "cswr"]
self.request_size_limit = api_request_max
self._is_valid_report = lambda report: any(report.get('name') and report.get('name').lower().startswith(valid_type)
for valid_type in self.valid_report_types)
def get_reports(self, start_time):
"""Get all the reports that were updated after a certain moment in time (UNIX).
:param start_time: unix time of the oldest report you want to pull
"""
reports = []
offset = 0
total = 0
first_run = True
while offset < total or first_run:
params = {"sort": "last_modified_date.asc",
"filter": f'last_modified_date:>{start_time}',
'limit': self.request_size_limit,
'offset': offset}
resp_json = self.falcon.query_report_entities(parameters=params)["body"]
self.__check_metadata(resp_json)
total = resp_json.get('meta', {}).get('pagination', {}).get('total')
offset += resp_json.get('meta', {}).get('pagination', {}).get('limit')
first_run = False
reports.extend(resp_json.get('resources', []))
valid_reports = [report for report in reports if self._is_valid_report(report)]
return valid_reports
def get_indicators(self, start_time, include_deleted):
"""Get all the indicators that were updated after a certain moment in time (UNIX).
:param start_time: unix time of the oldest indicator you want to pull
:param include_deleted [bool]: include indicators marked as deleted
"""
indicators = []
indicators_in_request = []
first_run = True
while len(indicators_in_request) == self.request_size_limit or first_run:
params = {"sort": "_marker.asc",
"filter": f"_marker:>='{start_time}'",
'limit': self.request_size_limit,
}
if include_deleted:
params['include_deleted'] = True
resp_json = self.falcon.query_indicator_entities(parameters=params)["body"]
first_run = False
indicators_in_request = resp_json.get('resources', [])
if indicators_in_request:
total_found = reduce(lambda d, key: d.get(key, None) if isinstance(d, dict) else None,
"meta.pagination.total".split("."),
resp_json
)
logging.debug("Retrieved %i of %i remaining indicators.", len(indicators_in_request), total_found)
else:
break
indicators.extend(indicators_in_request)
last_marker = indicators_in_request[-1].get('_marker', '')
if last_marker == '':
break
start_time = last_marker
return indicators
def get_actors(self, start_time):
"""Get all the actors that were updated after a certain moment in time (UNIX).
:param start_time: unix time of the oldest actor you want to pull
"""
actors = []
offset = 0
total = 0
first_run = True
while offset < total or first_run:
params = {"sort": "last_modified_date.asc",
"filter": f'last_modified_date:>{start_time}',
'limit': self.request_size_limit,
'offset': offset}
resp_json = self.falcon.query_actor_entities(parameters=params)["body"]
total = resp_json.get('meta', {}).get('pagination', {}).get('total')
offset += resp_json.get('meta', {}).get('pagination', {}).get('limit')
first_run = False
actors.extend(resp_json.get('resources', []))
return actors
@staticmethod
def __check_metadata(resp_json):
if (resp_json.get('meta', {}).get('pagination', {}).get('total') is None) \
or (resp_json.get('meta', {}).get('pagination', {}).get('limit') is None):
raise Exception(f'Unable to decode pagination metadata from response. Response is {resp_json}.') Let us know if there are any questions. 😄 |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
jshcodes
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @tommorgan365 -
There is! (But it's not posted yet... 😢 )
I'm in the process of finalizing the MISP importer conversion over to using the FalconPy SDK.
In the interim, here's a snippet to get you started. (Please note: This code is still in flight, and is meant to be used as an example in this case.)