Skip to content

Commit 64807af

Browse files
authored
Replace batcher with S3 inventory (#131)
1 parent ca049c5 commit 64807af

37 files changed

+1588
-1794
lines changed

.coveragerc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[run]
22
source=.
3-
omit=venv/*
3+
omit=venv/*,tests/*
44

55
[report]
66
fail_under=85

.travis.yml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@ notifications:
66
install:
77
- pip3 install -r requirements.txt
88
script:
9-
- coverage run manage.py unit_test
10-
- coverage report # Required coverage threshold specified in .coveragerc
11-
- pylint lambda_functions rules tests *.py -j 1 # Config in .pylintrc
12-
- mypy lambda_functions rules *.py --disallow-untyped-defs --ignore-missing-imports --warn-unused-ignores
13-
- bandit -r . # Configuration in .bandit
14-
- sphinx-build -W docs/source docs/build
9+
- tests/ci_tests.sh
1510
after_success:
1611
- coveralls

cli/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""BinaryAlert release version"""
2+
__version__ = '1.1.0'

cli/config.py

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
"""BinaryAlert configuration management."""
2+
import base64
3+
import getpass
4+
import os
5+
import re
6+
import subprocess
7+
from typing import Any
8+
9+
import boto3
10+
import hcl
11+
12+
from cli.exceptions import InvalidConfigError
13+
14+
# File locations
15+
PARENT_DIR = os.path.dirname(os.path.realpath(__file__)) # Directory containing this file.
16+
TERRAFORM_DIR = os.path.realpath(os.path.join(PARENT_DIR, '..', 'terraform'))
17+
CONFIG_FILE = os.path.join(TERRAFORM_DIR, 'terraform.tfvars')
18+
VARIABLES_FILE = os.path.join(TERRAFORM_DIR, 'variables.tf')
19+
20+
21+
def get_input(prompt: str, default_value: str,
22+
config: Any = None, property_name: str = None) -> str:
23+
"""Request user input, updating the underlying config if applicable.
24+
25+
Args:
26+
prompt: On-screen prompt before user input
27+
default_value: The default (existing) value
28+
config: BinaryAlertConfig instance, if updating the underlying configuration
29+
If None, the valid values are assumed to be 'yes' and 'no'
30+
property_name: Name of the config property to update (applicable only if config != None)
31+
32+
Returns:
33+
Lowercase user input, stripped of extra spaces, or the default value if no input was given
34+
"""
35+
if default_value:
36+
prompt = '{} ({}): '.format(prompt, default_value)
37+
else:
38+
prompt = '{}: '.format(prompt)
39+
40+
# Keep requesting user input until it is valid
41+
while True:
42+
user_input = input(prompt).strip().lower() or default_value
43+
if config and property_name:
44+
try:
45+
setattr(config, property_name, user_input)
46+
break
47+
except InvalidConfigError as error:
48+
print('ERROR: {}'.format(error))
49+
elif user_input in {'yes', 'no'}:
50+
break
51+
else:
52+
print('ERROR: Please enter exactly "yes" or "no"')
53+
54+
return user_input
55+
56+
57+
class BinaryAlertConfig:
58+
"""Wrapper around reading, validating, and updating the terraform.tfvars config file."""
59+
# Expected configuration value formats.
60+
VALID_AWS_ACCOUNT_ID_FORMAT = r'\d{12}'
61+
VALID_AWS_REGION_FORMAT = r'[a-z]{2}-[a-z]{2,15}-\d'
62+
VALID_NAME_PREFIX_FORMAT = r'[a-z][a-z0-9_]{3,50}'
63+
VALID_CB_API_TOKEN_FORMAT = r'[a-f0-9]{40}' # CarbonBlack API token.
64+
VALID_CB_ENCRYPTED_TOKEN_FORMAT = r'\S{50,500}'
65+
VALID_CB_URL_FORMAT = r'https?://\S+'
66+
67+
def __init__(self) -> None:
68+
"""Parse the terraform.tfvars config file and make sure it contains every variable.
69+
70+
Raises:
71+
InvalidConfigError: If any variable is defined in variables.tf but not terraform.tfvars.
72+
"""
73+
with open(CONFIG_FILE) as f:
74+
self._config = hcl.load(f) # Dict[str, Union[int, str]]
75+
76+
with open(VARIABLES_FILE) as f:
77+
variable_names = hcl.load(f)['variable'].keys()
78+
79+
for variable in variable_names:
80+
# Verify that the variable is defined.
81+
if variable not in self._config:
82+
raise InvalidConfigError(
83+
'variable "{}" is not defined in {}'.format(variable, CONFIG_FILE)
84+
)
85+
86+
@property
87+
def aws_account_id(self) -> str:
88+
return self._config['aws_account_id']
89+
90+
@aws_account_id.setter
91+
def aws_account_id(self, value: str) -> None:
92+
if not re.fullmatch(self.VALID_AWS_ACCOUNT_ID_FORMAT, value, re.ASCII):
93+
raise InvalidConfigError(
94+
'aws_account_id "{}" does not match format {}'.format(
95+
value, self.VALID_AWS_ACCOUNT_ID_FORMAT)
96+
)
97+
self._config['aws_account_id'] = value
98+
99+
@property
100+
def aws_region(self) -> str:
101+
return self._config['aws_region']
102+
103+
@aws_region.setter
104+
def aws_region(self, value: str) -> None:
105+
if not re.fullmatch(self.VALID_AWS_REGION_FORMAT, value, re.ASCII):
106+
raise InvalidConfigError(
107+
'aws_region "{}" does not match format {}'.format(
108+
value, self.VALID_AWS_REGION_FORMAT)
109+
)
110+
self._config['aws_region'] = value
111+
112+
@property
113+
def name_prefix(self) -> str:
114+
return self._config['name_prefix']
115+
116+
@name_prefix.setter
117+
def name_prefix(self, value: str) -> None:
118+
if not re.fullmatch(self.VALID_NAME_PREFIX_FORMAT, value, re.ASCII):
119+
raise InvalidConfigError(
120+
'name_prefix "{}" does not match format {}'.format(
121+
value, self.VALID_NAME_PREFIX_FORMAT)
122+
)
123+
self._config['name_prefix'] = value
124+
125+
@property
126+
def enable_carbon_black_downloader(self) -> int:
127+
return self._config['enable_carbon_black_downloader']
128+
129+
@enable_carbon_black_downloader.setter
130+
def enable_carbon_black_downloader(self, value: int) -> None:
131+
if value not in {0, 1}:
132+
raise InvalidConfigError(
133+
'enable_carbon_black_downloader "{}" must be either 0 or 1.'.format(value)
134+
)
135+
self._config['enable_carbon_black_downloader'] = value
136+
137+
@property
138+
def carbon_black_url(self) -> str:
139+
return self._config['carbon_black_url']
140+
141+
@carbon_black_url.setter
142+
def carbon_black_url(self, value: str) -> None:
143+
if not re.fullmatch(self.VALID_CB_URL_FORMAT, value, re.ASCII):
144+
raise InvalidConfigError(
145+
'carbon_black_url "{}" does not match format {}'.format(
146+
value, self.VALID_CB_URL_FORMAT)
147+
)
148+
self._config['carbon_black_url'] = value
149+
150+
@property
151+
def encrypted_carbon_black_api_token(self) -> str:
152+
return self._config['encrypted_carbon_black_api_token']
153+
154+
@encrypted_carbon_black_api_token.setter
155+
def encrypted_carbon_black_api_token(self, value: str) -> None:
156+
if not re.fullmatch(self.VALID_CB_ENCRYPTED_TOKEN_FORMAT, value, re.ASCII):
157+
raise InvalidConfigError(
158+
'encrypted_carbon_black_api_token "{}" does not match format {}'.format(
159+
value, self.VALID_CB_ENCRYPTED_TOKEN_FORMAT
160+
)
161+
)
162+
self._config['encrypted_carbon_black_api_token'] = value
163+
164+
@property
165+
def plaintext_carbon_black_api_token(self) -> str:
166+
return boto3.client('kms').decrypt(
167+
CiphertextBlob=base64.b64decode(self.encrypted_carbon_black_api_token))['Plaintext']
168+
169+
@property
170+
def force_destroy(self) -> str:
171+
return self._config['force_destroy']
172+
173+
@property
174+
def binaryalert_analyzer_name(self) -> str:
175+
return '{}_binaryalert_analyzer'.format(self.name_prefix)
176+
177+
@property
178+
def binaryalert_analyzer_queue_name(self) -> str:
179+
return '{}_binaryalert_analyzer_queue'.format(self.name_prefix)
180+
181+
@property
182+
def binaryalert_downloader_queue_name(self) -> str:
183+
return '{}_binaryalert_downloader_queue'.format(self.name_prefix)
184+
185+
@property
186+
def binaryalert_dynamo_table_name(self) -> str:
187+
return '{}_binaryalert_matches'.format(self.name_prefix)
188+
189+
@property
190+
def binaryalert_s3_bucket_name(self) -> str:
191+
return '{}.binaryalert-binaries.{}'.format(
192+
self.name_prefix.replace('_', '.'), self.aws_region
193+
)
194+
195+
@property
196+
def retro_batch_size(self) -> int:
197+
return self._config['objects_per_retro_message']
198+
199+
def _encrypt_cb_api_token(self) -> None:
200+
"""Save an encrypted CarbonBlack API token.
201+
202+
This Terraforms the KMS keys required to encrypt the token.
203+
"""
204+
# Request API token using password-style input (will not be displayed on screen).
205+
while True:
206+
api_token = getpass.getpass(
207+
'CarbonBlack API token (only needs binary read access): ').strip().lower()
208+
if re.fullmatch(self.VALID_CB_API_TOKEN_FORMAT, api_token, re.ASCII):
209+
break
210+
else:
211+
print('ERROR: {}-character input does not match expected token format {}'.format(
212+
len(api_token), self.VALID_CB_API_TOKEN_FORMAT
213+
))
214+
215+
# We need the KMS key to encrypt the API token.
216+
# The same key will be used by the downloader to decrypt the API token at runtime.
217+
print('Terraforming KMS key...')
218+
os.chdir(TERRAFORM_DIR)
219+
subprocess.check_call(['terraform', 'init'])
220+
subprocess.check_call(
221+
['terraform', 'apply', '-target=aws_kms_alias.encrypt_credentials_alias']
222+
)
223+
224+
print('Encrypting API token...')
225+
response = boto3.client('kms').encrypt(
226+
KeyId='alias/{}_binaryalert_carbonblack_credentials'.format(self.name_prefix),
227+
Plaintext=api_token
228+
)
229+
self.encrypted_carbon_black_api_token = base64.b64encode(
230+
response['CiphertextBlob']).decode('utf-8')
231+
232+
def _configure_carbon_black(self) -> None:
233+
"""If CarbonBlack downloader is enabled, request URL and credentials"""
234+
get_input('CarbonBlack URL', self.carbon_black_url, self, 'carbon_black_url')
235+
236+
update_api_token = 'yes'
237+
if self.encrypted_carbon_black_api_token:
238+
# API token already exists - ask if they want to update it.
239+
update_api_token = get_input('Change the CarbonBlack API token?', 'no')
240+
241+
if update_api_token == 'yes':
242+
self.save() # Save updates so far to enable the downloader for terraform.
243+
self._encrypt_cb_api_token()
244+
245+
def configure(self) -> None:
246+
"""Request basic configuration settings from the user.
247+
248+
Each request will be retried until the answer is in the correct format.
249+
"""
250+
get_input('AWS Account ID', self.aws_account_id, self, 'aws_account_id')
251+
get_input('AWS Region', self.aws_region, self, 'aws_region')
252+
get_input('Unique name prefix, e.g. "company_team"', self.name_prefix, self, 'name_prefix')
253+
enable_downloader = get_input('Enable the CarbonBlack downloader?',
254+
'yes' if self.enable_carbon_black_downloader else 'no')
255+
self.enable_carbon_black_downloader = 1 if enable_downloader == 'yes' else 0
256+
257+
if self.enable_carbon_black_downloader:
258+
self._configure_carbon_black()
259+
260+
# Save the updated configuration.
261+
self.save()
262+
263+
def validate(self) -> None:
264+
"""Validate config values against their expected formats.
265+
266+
Terraform and AWS have their own validation, but this simple up-front check
267+
saves the user some headache compared to waiting for a deploy to fail.
268+
We only explicitly validate variables which the user can change through the CLI:
269+
aws_region, name_prefix, *carbon_black*
270+
271+
Raises:
272+
InvalidConfigError: If any config variable has an invalid value.
273+
"""
274+
# Go through the internal setters which have the validation logic.
275+
self.aws_account_id = self.aws_account_id
276+
self.aws_region = self.aws_region
277+
self.name_prefix = self.name_prefix
278+
self.enable_carbon_black_downloader = self.enable_carbon_black_downloader
279+
if self.enable_carbon_black_downloader:
280+
# Validate CarbonBlack variables if applicable.
281+
self.carbon_black_url = self.carbon_black_url
282+
self.encrypted_carbon_black_api_token = self.encrypted_carbon_black_api_token
283+
284+
def save(self) -> None:
285+
"""Save the current configuration to the terraform.tfvars config file."""
286+
# In order to preserve comments, we overwrite each individual variable instead of re-writing
287+
# the entire configuration file.
288+
with open(CONFIG_FILE) as config_file:
289+
raw_config = config_file.read()
290+
291+
for variable_name, value in self._config.items():
292+
if isinstance(value, str):
293+
formatted_value = '"{}"'.format(value)
294+
elif isinstance(value, bool):
295+
formatted_value = str(value).lower()
296+
else:
297+
formatted_value = value
298+
299+
raw_config = re.sub(
300+
r'{}\s*=\s*\S+'.format(variable_name),
301+
'{} = {}'.format(variable_name, formatted_value),
302+
raw_config
303+
)
304+
305+
with open(CONFIG_FILE, 'w') as config_file:
306+
config_file.write(raw_config)

cli/enqueue_task.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Worker task for adding things to a queue."""
2+
from multiprocessing import JoinableQueue, Process
3+
import time
4+
from typing import List
5+
6+
import boto3
7+
8+
9+
class EnqueueTask:
10+
"""A Task to send a batch of records to SQS."""
11+
12+
def __init__(self, messages: List[str]) -> None:
13+
"""Initialize a Task with up to 10 SQS message entries."""
14+
self.messages = messages
15+
16+
def run(self, sqs_queue: boto3.resource) -> None:
17+
"""Send messages to SQS."""
18+
while self.messages:
19+
response = sqs_queue.send_messages(Entries=[
20+
{'Id': str(i), 'MessageBody': message}
21+
for i, message in enumerate(self.messages)
22+
])
23+
24+
if not response.get('Failed'):
25+
return
26+
27+
# There were some failed messages, put them back and retry in a few seconds
28+
self.messages = [
29+
self.messages[int(failure['Id'])]
30+
for failure in response['Failed']
31+
]
32+
time.sleep(2)
33+
34+
35+
class Worker(Process):
36+
"""Worker processes consumes S3 versions from the task queue and processes them."""
37+
38+
def __init__(self, sqs_queue_name: str, task_queue: JoinableQueue) -> None:
39+
"""Create a new worker process.
40+
41+
Args:
42+
sqs_queue_name: Name of the target SQS queue
43+
task_queue: Thread-safe queue of EnqueueTasks to complete
44+
"""
45+
super().__init__()
46+
self._task_queue = task_queue
47+
self._queue = boto3.resource('sqs').get_queue_by_name(QueueName=sqs_queue_name)
48+
49+
def run(self) -> None:
50+
"""Consume tasks from the task queue until an empty task is found."""
51+
while True:
52+
task = self._task_queue.get()
53+
54+
if task is None:
55+
self._task_queue.task_done()
56+
return
57+
58+
task.run(self._queue)
59+
self._task_queue.task_done()

0 commit comments

Comments
 (0)