Skip to content

Commit fdf1519

Browse files
JonoYangtdruez
andauthored
1287 purldb scan worker update (#1320)
* Split the WebhookSubscription model in 2 models #1325 WebhookSubscription defines the Webhook behavior WebhookDelivery stores historical data about deliveries Signed-off-by: tdruez <tdruez@nexb.com> * Add model migrations #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Display the Webhook deliveries in the run details view #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Add support for new Webhook model in API #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Improve the Webhook related tests #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Using proper webhookdeliveries for the related name #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Relate a webhook delivery with a pipeline run #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Refine API support for webhooks #1325 Signed-off-by: tdruez <tdruez@nexb.com> * Send scan results to purldb in pipeline run #1287 * Update purldb-scan-worker.py to only create scan projects from download urls and to not track project status Signed-off-by: Jono Yang <jyang@nexb.com> * Filter using Q objects #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Move pipeline function logic to scanpipe.pipes.purldb #1287 * Remove poll_run_status Signed-off-by: Jono Yang <jyang@nexb.com> * Create test for check_project_run_statuses #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Update test for check_project_run_statuses #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Remove test for get_run_status #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Update project extra_data before executing it #1287 * Print message detailing new jobs from purldb Signed-off-by: Jono Yang <jyang@nexb.com> * Use existing queryset methods to get runs #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Update CHANGELOG.rst #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Display message on continue #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Add webhook subscription when creating scan project #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Update URLs #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Remove send_project_results pipeline #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Fix logic display logic #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Get webhook_url from purldb #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Update test expectations #1287 * Bump packageurl-python to 0.15.6 Signed-off-by: Jono Yang <jyang@nexb.com> * Update CHANGELOG.rst #1287 Signed-off-by: Jono Yang <jyang@nexb.com> * Fix test #1287 Signed-off-by: Jono Yang <jyang@nexb.com> --------- Signed-off-by: tdruez <tdruez@nexb.com> Signed-off-by: Jono Yang <jyang@nexb.com> Co-authored-by: tdruez <tdruez@nexb.com>
1 parent 71f3d45 commit fdf1519

File tree

6 files changed

+156
-163
lines changed

6 files changed

+156
-163
lines changed

CHANGELOG.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ v34.7.2 (unreleased)
2626
- Add a new Dependency view that renders the project dependencies as a tree.
2727
https://github.com/nexB/scancode.io/issues/1145
2828

29+
- The ``purldb-scan-worker`` command has been updated to send project results
30+
back using the Project webhook subscriptions. This allows us to not have the
31+
main task loop to monitor a single project run for completion in order to
32+
return data, and allows us to have multiple scan projects active at once while
33+
we use ``purldb-scan-worker``. A new option ``--max-concurrent-projects`` has
34+
been added to set the number of purldb packages that can be requested and
35+
processed at once.
36+
https://github.com/nexB/scancode.io/issues/1287
37+
2938
v34.7.1 (2024-07-15)
3039
--------------------
3140

scanpipe/management/commands/purldb-scan-worker.py

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727

2828
from scanpipe.management.commands import AddInputCommandMixin
2929
from scanpipe.management.commands import CreateProjectCommandMixin
30-
from scanpipe.pipes import output
30+
from scanpipe.management.commands import execute_project
31+
from scanpipe.models import Run
3132
from scanpipe.pipes import purldb
3233

3334

@@ -47,40 +48,76 @@ def add_arguments(self, parser):
4748
parser.add_argument(
4849
"--max-loops",
4950
dest="max_loops",
51+
type=int,
5052
default=0,
5153
action="store",
5254
help="Limit the number of loops to a maximum number. "
5355
"0 means no limit. Used only for testing.",
5456
)
5557

58+
parser.add_argument(
59+
"--max-concurrent-projects",
60+
dest="max_concurrent_projects",
61+
type=int,
62+
default=1,
63+
action="store",
64+
help="Limit the number of Projects that can be run at once.",
65+
)
66+
5667
def handle(self, *args, **options):
5768
self.verbosity = options["verbosity"]
5869
sleep = options["sleep"]
5970
run_async = options["async"]
6071
max_loops = options["max_loops"]
72+
max_concurrent_projects = options["max_concurrent_projects"]
6173

6274
loop_count = 0
6375
while True:
64-
if max_loops and int(loop_count) >= int(max_loops):
76+
if max_loops and loop_count >= max_loops:
6577
self.stdout.write("loop max reached")
6678
break
6779

6880
time.sleep(sleep)
6981
loop_count += 1
7082

83+
# Usually, a worker can only run one Run at a time
84+
queued_or_running = Run.objects.queued_or_running()
85+
queued_or_running_count = queued_or_running.count()
86+
if queued_or_running_count >= max_concurrent_projects:
87+
self.stdout.write(
88+
"Continuing: number of queued or running Runs"
89+
f"({queued_or_running_count}) is greater "
90+
"than the number of max concurrent projects "
91+
f"({max_concurrent_projects})"
92+
)
93+
continue
94+
7195
# 1. Get download url from purldb
7296
response = purldb.get_next_download_url()
7397
if response:
7498
scannable_uri_uuid = response["scannable_uri_uuid"]
7599
download_url = response["download_url"]
76100
pipelines = response["pipelines"]
101+
webhook_url = response["webhook_url"]
77102
else:
78103
self.stderr.write("Bad response from PurlDB: unable to get next job.")
79104
continue
80105

81106
if not (download_url and scannable_uri_uuid):
82107
self.stdout.write("No new job from PurlDB.")
83108
continue
109+
else:
110+
formatted_pipeline_names = [f"\t\t{pipeline}" for pipeline in pipelines]
111+
formatted_pipeline_names = "\n".join(formatted_pipeline_names)
112+
msg = (
113+
"New job from PurlDB:\n"
114+
"\tscannable_uri_uuid:\n"
115+
f"\t\t{scannable_uri_uuid}\n"
116+
"\tdownload_url:\n"
117+
f"\t\t{download_url}\n"
118+
"\tpipelines:\n"
119+
) + formatted_pipeline_names
120+
self.stdout.write(msg)
84121

85122
try:
86123
# 2. Create and run project
@@ -89,21 +126,12 @@ def handle(self, *args, **options):
89126
scannable_uri_uuid=scannable_uri_uuid,
90127
download_url=download_url,
91128
pipelines=pipelines,
129+
webhook_url=webhook_url,
92130
run_async=run_async,
93131
)
94132

95-
# 3. Poll project results
96-
purldb.poll_run_status(
97-
project=project,
98-
sleep=sleep,
99-
)
100-
101-
# 4. Get project results and send to PurlDB
102-
send_scan_project_results(
103-
project=project, scannable_uri_uuid=scannable_uri_uuid
104-
)
105133
self.stdout.write(
106-
"Scan results and other data have been sent to PurlDB",
134+
f"Project {project.name} has been created",
107135
self.style.SUCCESS,
108136
)
109137

@@ -119,7 +147,7 @@ def handle(self, *args, **options):
119147

120148

121149
def create_scan_project(
122-
command, scannable_uri_uuid, download_url, pipelines, run_async=False
150+
command, scannable_uri_uuid, download_url, pipelines, webhook_url, run_async=False
123151
):
124152
"""
125153
Create and return a Project for the scan project request with ID of
@@ -135,30 +163,17 @@ def create_scan_project(
135163
name=name,
136164
pipelines=pipelines,
137165
input_urls=input_urls,
138-
execute=True,
139-
run_async=run_async,
140166
)
141-
project.update_extra_data({"scannable_uri_uuid": scannable_uri_uuid})
142-
return project
143-
144-
145-
def send_scan_project_results(project, scannable_uri_uuid):
146-
"""
147-
Send the JSON summary and results of `project` to PurlDB for the scan
148-
request `scannable_uri_uuid`.
149-
150-
Raise a PurlDBException if there is an issue sending results to PurlDB.
151-
"""
152-
project.refresh_from_db()
153-
scan_results_location = output.to_json(project)
154-
scan_summary_location = project.get_latest_output(filename="summary")
155-
response = purldb.send_results_to_purldb(
156-
scannable_uri_uuid,
157-
scan_results_location,
158-
scan_summary_location,
159-
project.extra_data,
167+
project.update_extra_data(
168+
{
169+
"scannable_uri_uuid": scannable_uri_uuid,
170+
}
160171
)
161-
if not response:
162-
raise purldb.PurlDBException(
163-
"Bad response returned when sending results to PurlDB"
164-
)
172+
project.add_webhook_subscription(
173+
target_url=webhook_url,
174+
trigger_on_each_run=False,
175+
include_summary=True,
176+
include_results=True,
177+
)
178+
execute_project(project=project, run_async=run_async, command=command)
179+
return project

scanpipe/pipes/purldb.py

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
from scanpipe.pipes import LoopProgress
3636
from scanpipe.pipes import _clean_package_data
37-
from scanpipe.pipes import poll_until_success
3837

3938

4039
class PurlDBException(Exception):
@@ -426,38 +425,6 @@ def get_next_download_url(timeout=DEFAULT_TIMEOUT, api_url=PURLDB_API_URL):
426425
return response
427426

428427

429-
def send_results_to_purldb(
430-
scannable_uri_uuid,
431-
scan_results_location,
432-
scan_summary_location,
433-
project_extra_data,
434-
timeout=DEFAULT_TIMEOUT,
435-
api_url=PURLDB_API_URL,
436-
):
437-
"""
438-
Send project results to purldb for the package handeled by the ScannableURI
439-
with uuid of `scannable_uri_uuid`
440-
"""
441-
with open(scan_results_location, "rb") as scan_results_file:
442-
with open(scan_summary_location, "rb") as scan_summary_file:
443-
data = {
444-
"scannable_uri_uuid": scannable_uri_uuid,
445-
"scan_status": "scanned",
446-
"project_extra_data": json.dumps(project_extra_data),
447-
}
448-
files = {
449-
"scan_results_file": scan_results_file,
450-
"scan_summary_file": scan_summary_file,
451-
}
452-
response = request_post(
453-
url=f"{api_url}scan_queue/update_status/",
454-
timeout=timeout,
455-
data=data,
456-
files=files,
457-
)
458-
return response
459-
460-
461428
def update_status(
462429
scannable_uri_uuid,
463430
status,
@@ -467,12 +434,11 @@ def update_status(
467434
):
468435
"""Update the status of a ScannableURI on a PurlDB scan queue"""
469436
data = {
470-
"scannable_uri_uuid": scannable_uri_uuid,
471437
"scan_status": status,
472438
"scan_log": scan_log,
473439
}
474440
response = request_post(
475-
url=f"{api_url}scan_queue/update_status/",
441+
url=f"{api_url}scan_queue/{scannable_uri_uuid}/update_status/",
476442
timeout=timeout,
477443
data=data,
478444
)
@@ -486,18 +452,29 @@ def create_project_name(download_url, scannable_uri_uuid):
486452
return f"{slugify(download_url)}-{scannable_uri_uuid[0:8]}"
487453

488454

489-
def poll_run_status(project, sleep=10):
455+
def check_project_run_statuses(project, logger=None):
490456
"""
491-
Poll the status of all runs of `project`. Raise a PurlDBException with a
492-
message containing the log of the run if the run has stopped, failed, or
493-
gone stale, otherwise return None.
457+
If any of the runs of this Project has failed, stopped, or gone stale,
458+
update the status of the Scannable URI associated with this Project to
459+
`failed` and send back a log of the failed runs.
494460
"""
495-
runs = project.runs.all()
496-
for run in runs:
497-
if not poll_until_success(check=get_run_status, sleep=sleep, run=run):
498-
status = get_run_status(run)
499-
msg = f"Run ended with status {status}:\n\n{run.log}"
500-
raise PurlDBException(msg)
461+
failed_runs = project.runs.failed()
462+
if failed_runs.exists():
463+
failure_msgs = []
464+
for failed_run in failed_runs:
465+
msg = f"{failed_run.pipeline_name} failed:\n\n{failed_run.log}\n"
466+
failure_msgs.append(msg)
467+
failure_msg = "\n".join(failure_msgs)
468+
469+
if logger:
470+
logger(failure_msg)
471+
472+
scannable_uri_uuid = project.extra_data.get("scannable_uri_uuid")
473+
update_status(
474+
scannable_uri_uuid=scannable_uri_uuid,
475+
status="failed",
476+
scan_log=failure_msg,
477+
)
501478

502479

503480
def get_run_status(run, **kwargs):

scanpipe/tests/pipes/test_purldb.py

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,14 @@ def test_scanpipe_pipes_purldb_get_next_download_url(
150150
results = purldb.get_next_download_url()
151151
self.assertFalse(results)
152152

153-
def test_scanpipe_pipes_purldb_get_run_status(self):
154-
now = timezone.now()
155-
run = self.create_run(
156-
pipeline="succeed",
157-
task_start_date=now,
158-
task_end_date=now,
159-
task_exitcode=0,
160-
)
161-
status = purldb.get_run_status(run=run)
162-
self.assertEqual("success", status)
163-
164-
def test_scanpipe_pipes_purldb_poll_run_status(self):
153+
@mock.patch("scanpipe.pipes.purldb.request_post")
154+
@mock.patch("scanpipe.pipes.purldb.is_available")
155+
def test_scanpipe_pipes_purldb_check_project_run_statuses(
156+
self, mock_is_available, mock_request_post
157+
):
158+
mock_is_available.return_value = True
159+
scannable_uri_uuid = "97627c6e-9acb-43e0-b8df-28bd92f2b7e5"
160+
self.project1.extra_data.update({"scannable_uri_uuid": scannable_uri_uuid})
165161
now = timezone.now()
166162

167163
# Test poll_run_status on individual pipelines
@@ -172,7 +168,10 @@ def test_scanpipe_pipes_purldb_poll_run_status(self):
172168
task_end_date=now,
173169
task_exitcode=0,
174170
)
175-
purldb.poll_run_status(project=self.project1)
171+
purldb.check_project_run_statuses(
172+
project=self.project1,
173+
)
174+
mock_request_post.assert_not_called()
176175
self.project1.runs.all().delete()
177176

178177
self.create_run(
@@ -182,9 +181,21 @@ def test_scanpipe_pipes_purldb_poll_run_status(self):
182181
task_exitcode=1,
183182
log="failed",
184183
)
185-
with self.assertRaises(purldb.PurlDBException) as context:
186-
purldb.poll_run_status(project=self.project1)
187-
self.assertIn("failed", context.exception)
184+
purldb.check_project_run_statuses(
185+
project=self.project1,
186+
)
187+
mock_request_post.assert_called_once()
188+
mock_request_post_call = mock_request_post.mock_calls[0]
189+
mock_request_post_call_kwargs = mock_request_post_call.kwargs
190+
purldb_update_status_url = (
191+
f"{purldb.PURLDB_API_URL}scan_queue/{scannable_uri_uuid}/update_status/"
192+
)
193+
self.assertEqual(purldb_update_status_url, mock_request_post_call_kwargs["url"])
194+
expected_data = {
195+
"scan_status": "failed",
196+
"scan_log": "failed failed:\n\nfailed\n",
197+
}
198+
self.assertEqual(expected_data, mock_request_post_call_kwargs["data"])
188199
self.project1.runs.all().delete()
189200

190201
self.create_run(
@@ -194,9 +205,14 @@ def test_scanpipe_pipes_purldb_poll_run_status(self):
194205
task_exitcode=99,
195206
log="stopped",
196207
)
197-
with self.assertRaises(purldb.PurlDBException) as context:
198-
purldb.poll_run_status(project=self.project1)
199-
self.assertIn("stopped", context.exception)
208+
purldb.check_project_run_statuses(
209+
project=self.project1,
210+
)
211+
self.assertEqual(2, mock_request_post.call_count)
212+
mock_request_post_call = mock_request_post.mock_calls[0]
213+
mock_request_post_call_kwargs = mock_request_post_call.kwargs
214+
self.assertEqual(purldb_update_status_url, mock_request_post_call_kwargs["url"])
215+
self.assertEqual(expected_data, mock_request_post_call_kwargs["data"])
200216
self.project1.runs.all().delete()
201217

202218
self.create_run(
@@ -206,29 +222,14 @@ def test_scanpipe_pipes_purldb_poll_run_status(self):
206222
task_exitcode=88,
207223
log="stale",
208224
)
209-
with self.assertRaises(purldb.PurlDBException) as context:
210-
purldb.poll_run_status(project=self.project1)
211-
self.assertIn("stale", context.exception)
212-
self.project1.runs.all().delete()
213-
214-
# Test pipelines success, then failure
215-
self.assertEqual(0, self.project1.runs.count())
216-
self.create_run(
217-
pipeline="succeed",
218-
task_start_date=now,
219-
task_end_date=now,
220-
task_exitcode=0,
221-
)
222-
self.create_run(
223-
pipeline="failed",
224-
task_start_date=now,
225-
task_end_date=now,
226-
task_exitcode=1,
227-
log="failed",
225+
purldb.check_project_run_statuses(
226+
project=self.project1,
228227
)
229-
with self.assertRaises(purldb.PurlDBException) as context:
230-
purldb.poll_run_status(project=self.project1)
231-
self.assertIn("failed", context.exception)
228+
self.assertEqual(3, mock_request_post.call_count)
229+
mock_request_post_call = mock_request_post.mock_calls[0]
230+
mock_request_post_call_kwargs = mock_request_post_call.kwargs
231+
self.assertEqual(purldb_update_status_url, mock_request_post_call_kwargs["url"])
232+
self.assertEqual(expected_data, mock_request_post_call_kwargs["data"])
232233
self.project1.runs.all().delete()
233234

234235
def test_scanpipe_pipes_purldb_create_project_name(self):

0 commit comments

Comments
 (0)