Skip to content

Commit 4f67f97

Browse files
Create purldb scan worker (#1078)
* Create management command for purldb package scanning Signed-off-by: Jono Yang <jyang@nexb.com> * Fix bugs in package-scan-worker Signed-off-by: Jono Yang <jyang@nexb.com> * Send updates to purldb regarding scan status Signed-off-by: Jono Yang <jyang@nexb.com> * Return scan project url when updating scan status on purldb Signed-off-by: Jono Yang <jyang@nexb.com> * Get scananble_uri_uuid instead of package_uuid * Add sleep to main work loop Signed-off-by: Jono Yang <jyang@nexb.com> * Move project creation logic into its own function * Handle exceptions in package-scan-worker and send to purldb as errors Signed-off-by: Jono Yang <jyang@nexb.com> * Create project name from download url and uuid Signed-off-by: Jono Yang <jyang@nexb.com> * Refactor poll_until_success Signed-off-by: Jono Yang <jyang@nexb.com> * Add tests for new purldb functions Signed-off-by: Jono Yang <jyang@nexb.com> * Collect possible exceptions when getting next job Signed-off-by: Jono Yang <jyang@nexb.com> * Get pipelines to run from purldb Signed-off-by: Jono Yang <jyang@nexb.com> * Do not use sys.exit() in execute.py Signed-off-by: Jono Yang <jyang@nexb.com> * Add async arg to package-scan-worker.py Signed-off-by: Jono Yang <jyang@nexb.com> * Refactor commonly used command functions Signed-off-by: Jono Yang <jyang@nexb.com> * Create tests for Command mixins Signed-off-by: Jono Yang <jyang@nexb.com> * Move get_next_job() to pipes/purldb.py Signed-off-by: Jono Yang <jyang@nexb.com> * Create test for package-scan-worker Signed-off-by: Jono Yang <jyang@nexb.com> * Create failure tests for package-scan-worker * Remove `scanscan_project_url` from scanpipe.pipes.purldb.update_status() Signed-off-by: Jono Yang <jyang@nexb.com> * Save scannable_uri_uuid to project extra data Signed-off-by: Jono Yang <jyang@nexb.com> * Bump matchcode-toolkit version Signed-off-by: Jono Yang <jyang@nexb.com> * Send summary along with scan results to purldb Signed-off-by: Jono Yang <jyang@nexb.com> * Rename arguments for send_results_to_purldb * Bump matchcode-toolkit version to 4.0.0 Signed-off-by: Jono Yang <jyang@nexb.com> * Send project extra data to purldb with scan results Signed-off-by: Jono Yang <jyang@nexb.com> * Print success message when results are sent Signed-off-by: Jono Yang <jyang@nexb.com> * Update package-scan-worker tests * Sen both traceback and exception message to purldb Signed-off-by: Jono Yang <jyang@nexb.com> * Update CHANGELOG.rst * Update management command test names Signed-off-by: Jono Yang <jyang@nexb.com> * Address review comments * Move lists of statuses in poll_until_success to their own variables * Remove unnecessary else statement * Use tuples as default values for `create_project` Signed-off-by: Jono Yang <jyang@nexb.com> * Refactor logic in purldb-scan-queue-worker * Rename get_next_job to get_next_download_url * Update tests Signed-off-by: Jono Yang <jyang@nexb.com> * Fix imports and tests after rebase Signed-off-by: Jono Yang <jyang@nexb.com> * Move Command methods into functions Signed-off-by: Jono Yang <jyang@nexb.com> --------- Signed-off-by: Jono Yang <jyang@nexb.com> Co-authored-by: Jono Yang <jyang@nexb.com>
1 parent 7ee0e5f commit 4f67f97

File tree

11 files changed

+1080
-190
lines changed

11 files changed

+1080
-190
lines changed

CHANGELOG.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ v34.1.0 (unreleased)
4444
and display any available data.
4545
https://github.com/nexB/scancode.io/issues/1125
4646

47+
- Create a new management command `purldb-scan-queue-worker`, that runs
48+
scancode.io as a Package scan queue worker for PurlDB.
49+
`purldb-scan-queue-worker` gets the next available Package to be scanned and
50+
the list of pipeline names to be run on the Package from PurlDB, creates a
51+
Project, fetches the Package, runs the specified pipelines, and returns the
52+
results to PurlDB.
53+
https://github.com/nexB/scancode.io/pull/1078
54+
https://github.com/nexB/purldb/issues/236
55+
56+
- Update matchcode-toolkit to v4.0.0
57+
4758
v34.0.0 (2024-03-04)
4859
--------------------
4960

scanpipe/management/commands/__init__.py

Lines changed: 256 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
from pathlib import Path
2525

2626
from django.apps import apps
27+
from django.conf import settings
2728
from django.core.exceptions import ObjectDoesNotExist
29+
from django.core.exceptions import ValidationError
2830
from django.core.management.base import BaseCommand
2931
from django.core.management.base import CommandError
3032
from django.template.defaultfilters import pluralize
3133

34+
from scanpipe import tasks
3235
from scanpipe.models import CodebaseResource
3336
from scanpipe.models import DiscoveredPackage
3437
from scanpipe.models import Project
@@ -186,67 +189,29 @@ def extract_tag_from_input_files(input_files):
186189
187190
For example: "/path/to/file.zip:tag"
188191
"""
189-
input_files_data = {}
190-
for file in input_files:
191-
if ":" in file:
192-
key, value = file.split(":", maxsplit=1)
193-
input_files_data.update({key: value})
194-
else:
195-
input_files_data.update({file: ""})
196-
return input_files_data
192+
return extract_tag_from_input_files(input_files=input_files)
197193

198194
def handle_input_files(self, input_files_data):
199195
"""Copy provided `input_files` to the project's `input` directory."""
200-
copied = []
201-
202-
for file_location, tag in input_files_data.items():
203-
self.project.copy_input_from(file_location)
204-
filename = Path(file_location).name
205-
copied.append(filename)
206-
self.project.add_input_source(
207-
filename=filename,
208-
is_uploaded=True,
209-
tag=tag,
210-
)
211-
212-
msg = f"File{pluralize(copied)} copied to the project inputs directory:"
213-
self.stdout.write(msg, self.style.SUCCESS)
214-
msg = "\n".join(["- " + filename for filename in copied])
215-
self.stdout.write(msg)
196+
handle_input_files(
197+
project=self.project, input_files_data=input_files_data, command=self
198+
)
216199

217200
@staticmethod
218201
def validate_input_files(input_files):
219202
"""Raise an error if one of the provided `input_files` entry does not exist."""
220-
for file_location in input_files:
221-
file_path = Path(file_location)
222-
if not file_path.is_file():
223-
raise CommandError(f"{file_location} not found or not a file")
203+
validate_input_files(input_files=input_files)
224204

225205
def handle_input_urls(self, input_urls):
226206
"""
227207
Fetch provided `input_urls` and stores it in the project's `input`
228208
directory.
229209
"""
230-
downloads, errors = fetch_urls(input_urls)
231-
232-
if downloads:
233-
self.project.add_downloads(downloads)
234-
msg = "File(s) downloaded to the project inputs directory:"
235-
self.stdout.write(msg, self.style.SUCCESS)
236-
msg = "\n".join(["- " + downloaded.filename for downloaded in downloads])
237-
self.stdout.write(msg)
238-
239-
if errors:
240-
msg = "Could not fetch URL(s):\n"
241-
msg += "\n".join(["- " + url for url in errors])
242-
self.stderr.write(msg)
210+
handle_input_urls(project=self.project, input_urls=input_urls, command=self)
243211

244212
def handle_copy_codebase(self, copy_from):
245213
"""Copy `codebase_path` tree to the project's `codebase` directory."""
246-
project_codebase = self.project.codebase_path
247-
msg = f"{copy_from} content copied in {project_codebase}"
248-
self.stdout.write(msg, self.style.SUCCESS)
249-
shutil.copytree(src=copy_from, dst=project_codebase, dirs_exist_ok=True)
214+
handle_copy_codebase(project=self.project, copy_from=copy_from, command=self)
250215

251216

252217
def validate_copy_from(copy_from):
@@ -288,3 +253,249 @@ def validate_pipelines(pipelines_data):
288253
)
289254

290255
return pipelines_data
256+
257+
258+
def extract_tag_from_input_files(input_files):
259+
"""
260+
Add support for the ":tag" suffix in file location.
261+
262+
For example: "/path/to/file.zip:tag"
263+
"""
264+
input_files_data = {}
265+
for file in input_files:
266+
if ":" in file:
267+
key, value = file.split(":", maxsplit=1)
268+
input_files_data.update({key: value})
269+
else:
270+
input_files_data.update({file: ""})
271+
return input_files_data
272+
273+
274+
def validate_input_files(input_files):
275+
"""Raise an error if one of the provided `input_files` entry does not exist."""
276+
for file_location in input_files:
277+
file_path = Path(file_location)
278+
if not file_path.is_file():
279+
raise CommandError(f"{file_location} not found or not a file")
280+
281+
282+
def validate_project_inputs(pipelines, input_files, copy_from):
283+
"""
284+
Validate `pipelines`, `input_files`, and `copy_from`, returning a tuple
285+
of dictionaries containing the pipeline data of `pipelines` and the
286+
input files data from `input_files.
287+
"""
288+
pipelines_data = {}
289+
input_files_data = {}
290+
291+
if pipelines:
292+
pipelines_data = extract_group_from_pipelines(pipelines)
293+
pipelines_data = validate_pipelines(pipelines_data)
294+
295+
if input_files:
296+
input_files_data = extract_tag_from_input_files(input_files)
297+
validate_input_files(input_files=input_files_data.keys())
298+
299+
if copy_from:
300+
validate_copy_from(copy_from)
301+
302+
return pipelines_data, input_files_data
303+
304+
305+
def handle_input_files(project, input_files_data, command=None):
306+
"""Copy provided `input_files` to the project's `input` directory."""
307+
copied = []
308+
309+
for file_location, tag in input_files_data.items():
310+
project.copy_input_from(file_location)
311+
filename = Path(file_location).name
312+
copied.append(filename)
313+
project.add_input_source(
314+
filename=filename,
315+
is_uploaded=True,
316+
tag=tag,
317+
)
318+
319+
if command:
320+
msg = f"File{pluralize(copied)} copied to the project inputs directory:"
321+
command.stdout.write(msg, command.style.SUCCESS)
322+
msg = "\n".join(["- " + filename for filename in copied])
323+
command.stdout.write(msg)
324+
325+
326+
def handle_input_urls(project, input_urls, command=None):
327+
"""
328+
Fetch provided `input_urls` and stores it in the project's `input`
329+
directory.
330+
"""
331+
downloads, errors = fetch_urls(input_urls)
332+
333+
if downloads:
334+
project.add_downloads(downloads)
335+
msg = "File(s) downloaded to the project inputs directory:"
336+
if command:
337+
command.stdout.write(msg, command.style.SUCCESS)
338+
msg = "\n".join(["- " + downloaded.filename for downloaded in downloads])
339+
command.stdout.write(msg)
340+
341+
if errors and command:
342+
msg = "Could not fetch URL(s):\n"
343+
msg += "\n".join(["- " + url for url in errors])
344+
command.stderr.write(msg)
345+
346+
347+
def handle_copy_codebase(project, copy_from, command=None):
348+
"""Copy `codebase_path` tree to the project's `codebase` directory."""
349+
project_codebase = project.codebase_path
350+
if command:
351+
msg = f"{copy_from} content copied in {project_codebase}"
352+
command.stdout.write(msg, command.style.SUCCESS)
353+
shutil.copytree(src=copy_from, dst=project_codebase, dirs_exist_ok=True)
354+
355+
356+
def add_project_inputs(
357+
project, pipelines_data, input_files_data, input_urls, copy_from, command=None
358+
):
359+
for pipeline_name, selected_groups in pipelines_data.items():
360+
project.add_pipeline(pipeline_name, selected_groups=selected_groups)
361+
362+
if input_files_data:
363+
handle_input_files(
364+
project=project, input_files_data=input_files_data, command=command
365+
)
366+
367+
if input_urls:
368+
handle_input_urls(project=project, input_urls=input_urls, command=command)
369+
370+
if copy_from:
371+
handle_copy_codebase(project=project, copy_from=copy_from, command=command)
372+
373+
374+
def execute_project(project, run_async=False, command=None):
375+
run = project.get_next_run()
376+
377+
if not run:
378+
raise CommandError(f"No pipelines to run on project {project}")
379+
380+
if run_async:
381+
if not settings.SCANCODEIO_ASYNC:
382+
msg = "SCANCODEIO_ASYNC=False is not compatible with --async option."
383+
raise CommandError(msg)
384+
385+
run.start()
386+
if command:
387+
msg = f"{run.pipeline_name} added to the tasks queue for execution."
388+
command.stdout.write(msg, command.style.SUCCESS)
389+
else:
390+
command.stdout.write(f"Start the {run.pipeline_name} pipeline execution...")
391+
392+
try:
393+
tasks.execute_pipeline_task(run.pk)
394+
except KeyboardInterrupt:
395+
run.set_task_stopped()
396+
raise CommandError("Pipeline execution stopped.")
397+
except Exception as e:
398+
run.set_task_ended(exitcode=1, output=str(e))
399+
raise CommandError(e)
400+
401+
run.refresh_from_db()
402+
403+
if run.task_succeeded and command:
404+
msg = f"{run.pipeline_name} successfully executed on " f"project {project}"
405+
command.stdout.write(msg, command.style.SUCCESS)
406+
else:
407+
msg = f"Error during {run.pipeline_name} execution:\n{run.task_output}"
408+
raise CommandError(msg)
409+
410+
411+
def create_project(
412+
name,
413+
pipelines=None,
414+
input_files=None,
415+
input_urls=None,
416+
copy_from="",
417+
notes="",
418+
execute=False,
419+
run_async=False,
420+
command=None,
421+
):
422+
if execute and not pipelines:
423+
raise CommandError("The execute argument requires one or more pipelines.")
424+
425+
project = Project(name=name)
426+
if notes:
427+
project.notes = notes
428+
429+
try:
430+
project.full_clean(exclude=["slug"])
431+
except ValidationError as e:
432+
raise CommandError("\n".join(e.messages))
433+
434+
# Run validation before creating the project in the database
435+
pipelines_data, input_files_data = validate_project_inputs(
436+
pipelines=pipelines, input_files=input_files, copy_from=copy_from
437+
)
438+
439+
project.save()
440+
if command:
441+
command.project = project
442+
443+
if command:
444+
msg = f"Project {name} created with work directory {project.work_directory}"
445+
command.stdout.write(msg, command.style.SUCCESS)
446+
447+
add_project_inputs(
448+
project=project,
449+
pipelines_data=pipelines_data,
450+
input_files_data=input_files_data,
451+
input_urls=input_urls,
452+
copy_from=copy_from,
453+
command=command,
454+
)
455+
456+
if execute:
457+
execute_project(project=project, run_async=run_async, command=command)
458+
459+
return project
460+
461+
462+
class ExecuteProjectCommandMixin:
463+
def add_arguments(self, parser):
464+
super().add_arguments(parser)
465+
parser.add_argument(
466+
"--async",
467+
action="store_true",
468+
dest="async",
469+
help=(
470+
"Add the pipeline run to the tasks queue for execution by a worker "
471+
"instead of running in the current thread."
472+
),
473+
)
474+
475+
def execute_project(self, run_async=False):
476+
execute_project(project=self.project, run_async=run_async, command=self)
477+
478+
479+
class CreateProjectCommandMixin(ExecuteProjectCommandMixin):
480+
def create_project(
481+
self,
482+
name,
483+
pipelines=None,
484+
input_files=None,
485+
input_urls=None,
486+
copy_from="",
487+
notes="",
488+
execute=False,
489+
run_async=False,
490+
):
491+
return create_project(
492+
name=name,
493+
pipelines=pipelines,
494+
input_files=input_files,
495+
input_urls=input_urls,
496+
copy_from=copy_from,
497+
notes=notes,
498+
execute=execute,
499+
run_async=run_async,
500+
command=self,
501+
)

0 commit comments

Comments
 (0)