diff --git a/scanpipe/migrations/0074_codebaseresource_parent_path_and_more.py b/scanpipe/migrations/0074_codebaseresource_parent_path_and_more.py new file mode 100644 index 000000000..efd41fe53 --- /dev/null +++ b/scanpipe/migrations/0074_codebaseresource_parent_path_and_more.py @@ -0,0 +1,22 @@ +# Generated by Django 5.1.9 on 2025-06-16 17:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('scanpipe', '0073_add_sha1_git_checksum'), + ] + + operations = [ + migrations.AddField( + model_name='codebaseresource', + name='parent_path', + field=models.CharField(blank=True, help_text='The path of the resource\'s parent directory. Set to None for top-level (root) resources. Used to efficiently retrieve a directory\'s contents.', max_length=2000), + ), + migrations.AddIndex( + model_name='codebaseresource', + index=models.Index(fields=['project', 'parent_path'], name='scanpipe_co_project_008448_idx'), + ), + ] diff --git a/scanpipe/models.py b/scanpipe/models.py index 7bc6d1207..944446d5a 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -2695,6 +2695,17 @@ class CodebaseResource( 'Eg.: "/usr/bin/bash" for a path of "tarball-extract/rootfs/usr/bin/bash"' ), ) + + parent_path = models.CharField( + max_length=2000, + blank=True, + help_text=_( + "The path of the resource's parent directory. " + "Set to None for top-level (root) resources. " + "Used to efficiently retrieve a directory's contents." + ), + ) + status = models.CharField( blank=True, max_length=50, @@ -2788,6 +2799,7 @@ class Meta: models.Index(fields=["compliance_alert"]), models.Index(fields=["is_binary"]), models.Index(fields=["is_text"]), + models.Index(fields=["project", "parent_path"]), ] constraints = [ models.UniqueConstraint( @@ -2800,6 +2812,11 @@ class Meta: def __str__(self): return self.path + def save(self, *args, **kwargs): + if self.path and not self.parent_path: + self.parent_path = self.parent_directory() or "" + super().save(*args, **kwargs) + def get_absolute_url(self): return reverse("resource_detail", args=[self.project.slug, self.path]) @@ -2870,7 +2887,8 @@ def get_path_segments_with_subpath(self): def parent_directory(self): """Return the parent path for this CodebaseResource or None.""" - return parent_directory(self.path, with_trail=False) + parent_path = parent_directory(str(self.path), with_trail=False) + return parent_path or None def has_parent(self): """ diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index 18a5d72c7..1352c6d59 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -71,6 +71,11 @@ def make_codebase_resource(project, location, save=True, **extra_fields): from scanpipe.pipes import flag relative_path = Path(location).relative_to(project.codebase_path) + parent_path = str(relative_path.parent) + + if parent_path == ".": + parent_path = "" + try: resource_data = scancode.get_resource_info(location=str(location)) except OSError as error: @@ -91,6 +96,7 @@ def make_codebase_resource(project, location, save=True, **extra_fields): codebase_resource = CodebaseResource( project=project, path=relative_path, + parent_path=parent_path, **resource_data, ) diff --git a/scanpipe/pipes/rootfs.py b/scanpipe/pipes/rootfs.py index 9c623491a..95325d38d 100644 --- a/scanpipe/pipes/rootfs.py +++ b/scanpipe/pipes/rootfs.py @@ -139,6 +139,14 @@ def get_res(parent, fname): rootfs_path=rootfs_path, ) + # Explicitly yields the root directory as a resource when `with_dir` is True + if with_dir: + rootfs_path = "/" + yield Resource( + location=location, + rootfs_path=rootfs_path, + ) + for top, dirs, files in os.walk(location): for f in files: yield get_res(parent=top, fname=f) diff --git a/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json b/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json index 970d67200..b64c4115f 100644 --- a/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json +++ b/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json @@ -340,6 +340,42 @@ ], "dependencies": [], "files": [ + { + "path": "basic-rootfs.tar.gz-extract", + "type": "directory", + "name": "basic-rootfs.tar.gz-extract", + "status": "scanned", + "for_packages": [], + "tag": "", + "extension": ".tar.gz-extract", + "programming_language": "", + "detected_license_expression": "", + "detected_license_expression_spdx": "", + "license_detections": [], + "license_clues": [], + "percentage_of_license_text": null, + "copyrights": [], + "holders": [], + "authors": [], + "package_data": [], + "emails": [], + "urls": [], + "md5": "", + "sha1": "", + "sha256": "", + "sha512": "", + "sha1_git": "", + "is_binary": false, + "is_text": false, + "is_archive": false, + "is_media": false, + "is_legal": false, + "is_manifest": false, + "is_readme": false, + "is_top_level": true, + "is_key_file": false, + "extra_data": {} + }, { "path": "basic-rootfs.tar.gz-extract/etc", "type": "directory", diff --git a/scanpipe/tests/test_models.py b/scanpipe/tests/test_models.py index 67601d601..1912f0ba1 100644 --- a/scanpipe/tests/test_models.py +++ b/scanpipe/tests/test_models.py @@ -1645,6 +1645,16 @@ def test_scanpipe_can_compute_compliance_alert_for_license_exceptions(self): resource.update(detected_license_expression=license_expression) self.assertEqual("warning", resource.compute_compliance_alert()) + def test_scanpipe_codebase_root_parent_path(self): + resource1 = self.project1.codebaseresources.create(path="file") + + self.assertEqual("", resource1.parent_path) + + def test_scanpipe_codebase_regular_parent_path(self): + resource2 = self.project1.codebaseresources.create(path="dir1/dir2/file") + + self.assertEqual("dir1/dir2", resource2.parent_path) + def test_scanpipe_scan_fields_model_mixin_methods(self): expected = [ "detected_license_expression", diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index 0852bc841..40b567ba0 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -863,6 +863,60 @@ def test_scanpipe_scan_codebase_pipeline_integration(self): expected_file = self.data / "scancode" / "is-npm-1.0.0_scan_codebase.json" self.assertPipelineResultEqual(expected_file, result_file) + def test_scanpipe_scan_codebase_creates_top_level_paths(self): + pipeline_name = "scan_codebase" + project1 = make_project() + + filename = "is-npm-1.0.0.tgz" + input_location = self.data / "scancode" / filename + project1.copy_input_from(input_location) + + run = project1.add_pipeline(pipeline_name) + pipeline = run.make_pipeline_instance() + + exitcode, out = pipeline.execute() + self.assertEqual(0, exitcode, msg=out) + + expected_top_level_paths = ["is-npm-1.0.0.tgz", "is-npm-1.0.0.tgz-extract"] + + top_level_resources = project1.codebaseresources.filter(parent_path="") + top_level_paths = [resource.path for resource in top_level_resources] + + self.assertListEqual(top_level_paths, expected_top_level_paths) + + def test_scanpipe_scan_codebase_creates_parent_path_field(self): + pipeline_name = "scan_codebase" + project1 = make_project() + + filename = "is-npm-1.0.0.tgz" + input_location = self.data / "scancode" / filename + project1.copy_input_from(input_location) + + run = project1.add_pipeline(pipeline_name) + pipeline = run.make_pipeline_instance() + + exitcode, out = pipeline.execute() + self.assertEqual(0, exitcode, msg=out) + + expected_top_level_paths = ["is-npm-1.0.0.tgz", "is-npm-1.0.0.tgz-extract"] + expected_nested_paths = [ + "is-npm-1.0.0.tgz-extract/package/index.js", + "is-npm-1.0.0.tgz-extract/package/package.json", + "is-npm-1.0.0.tgz-extract/package/readme.md", + ] + + top_level_resources = project1.codebaseresources.filter(parent_path="") + top_level_paths = [resource.path for resource in top_level_resources] + + self.assertListEqual(top_level_paths, expected_top_level_paths) + + nested_resources = project1.codebaseresources.filter( + parent_path="is-npm-1.0.0.tgz-extract/package" + ) + nested_paths = [resource.path for resource in nested_resources] + + self.assertListEqual(nested_paths, expected_nested_paths) + def test_scanpipe_inspect_packages_creates_packages_npm(self): pipeline_name = "inspect_packages" project1 = make_project() @@ -1209,7 +1263,7 @@ def test_scanpipe_rootfs_pipeline_integration(self): exitcode, out = pipeline.execute() self.assertEqual(0, exitcode, msg=out) - self.assertEqual(16, project1.codebaseresources.count()) + self.assertEqual(17, project1.codebaseresources.count()) self.assertEqual(2, project1.discoveredpackages.count()) self.assertEqual(0, project1.discovereddependencies.count())