Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/5087.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added new `q_select` field to UpstreamPulp to allow for more advanced filtering on upstream distributions.
`pulp_label_select` has been removed and its values have been migrated to this new field.
Please upgrade every API worker before issuing a new replicate task to avoid unwanted behavior.
44 changes: 44 additions & 0 deletions pulpcore/app/migrations/0123_upstreampulp_q_select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Generated by Django 4.2.14 on 2024-08-21 19:22

from django.db import migrations, models


def move_label_select_to_q(apps, schema_editor):
"""
To keep ZDU, the pulp_label_select field will be moved, but not deleted during this
migration. Old tasks will still be able to access the field and new tasks will use the new
q_select field. Any new UpstreamPulps will only use q_select (label_select is being removed
from the serializer), so on the next breaking change release we can add a migration to remove
the pulp_label_select field.
"""
UpstreamPulp = apps.get_model("core", "UpstreamPulp")
batch = []
for upstream in UpstreamPulp.objects.filter(pulp_label_select__isnull=False).iterator():
if upstream.pulp_label_select:
upstream.q_select = f"pulp_label_select={upstream.pulp_label_select!r}"
batch.append(upstream)
if len(batch) >= 500:
UpstreamPulp.objects.bulk_update(batch, ["q_select"])
batch = []
if batch:
UpstreamPulp.objects.bulk_update(batch, ["q_select"])


class Migration(migrations.Migration):

dependencies = [
("core", "0122_record_last_replication_timestamp"),
]

operations = [
migrations.AddField(
model_name="upstreampulp",
name="q_select",
field=models.TextField(null=True),
),
migrations.RunPython(
code=move_label_select_to_q,
reverse_code=migrations.RunPython.noop,
elidable=True,
),
]
2 changes: 2 additions & 0 deletions pulpcore/app/models/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class UpstreamPulp(BaseModel, AutoAddObjPermsMixin):
username = EncryptedTextField(null=True)
password = EncryptedTextField(null=True)

# TODO: Remove this field in next breaking change release
pulp_label_select = models.TextField(null=True)
q_select = models.TextField(null=True)

last_replication = models.DateTimeField(null=True)

Expand Down
6 changes: 3 additions & 3 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def needs_update(fields_dict, model_instance):
needs_update = True
return needs_update

def upstream_distributions(self, labels=None):
if labels:
params = {"pulp_label_select": labels}
def upstream_distributions(self, q=None):
if q:
params = {"q": q}
else:
params = {}
offset = 0
Expand Down
17 changes: 11 additions & 6 deletions pulpcore/app/serializers/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
IdentityField,
ModelSerializer,
)


from pulpcore.app.models import UpstreamPulp


Expand Down Expand Up @@ -75,10 +73,10 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
help_text="Timestamp of the most recent update of the remote.", read_only=True
)

pulp_label_select = serializers.CharField(
q_select = serializers.CharField(
help_text=_(
"One or more comma separated labels that will be used to filter distributions on the "
'upstream Pulp. E.g. "foo=bar,key=val" or "foo,key"'
"Filter distributions on the upstream Pulp using complex filtering. E.g. "
'pulp_label_select="foo" OR pulp_label_select="key=val"',
),
allow_null=True,
allow_blank=True,
Expand All @@ -93,6 +91,13 @@ class UpstreamPulpSerializer(ModelSerializer, HiddenFieldsMixin):
read_only=True,
)

def validate_q_select(self, value):
"""Ensure we have a valid q_select expression."""
from pulpcore.app.viewsets import DistributionFilter

DistributionFilter().filters["q"].field.clean(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like much better targeted.
👍

return value

class Meta:
abstract = True
model = UpstreamPulp
Expand All @@ -109,6 +114,6 @@ class Meta:
"password",
"pulp_last_updated",
"hidden_fields",
"pulp_label_select",
"q_select",
"last_replication",
)
2 changes: 1 addition & 1 deletion pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def replicate_distributions(server_pk):
supported_replicators.append(replicator)

for replicator in supported_replicators:
distros = replicator.upstream_distributions(labels=server.pulp_label_select)
distros = replicator.upstream_distributions(q=server.q_select)
distro_names = []
for distro in distros:
# Create remote
Expand Down
146 changes: 145 additions & 1 deletion pulpcore/tests/functional/api/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ def test_replicate_rbac(
"domain": "default",
"username": bindings_cfg.username,
"password": bindings_cfg.password,
"pulp_label_select": str(uuid.uuid4()),
}
upstream_pulp = gen_object_with_cleanup(
pulpcore_bindings.UpstreamPulpsApi,
Expand Down Expand Up @@ -465,3 +464,148 @@ def test_replicate_rbac(
try_action(
dean, pulpcore_bindings.UpstreamPulpsApi, "partial_update", 403, upstream_pulp.pulp_href, {}
)


@pytest.fixture
def populate_upstream(
domain_factory,
file_bindings,
file_repository_factory,
file_remote_factory,
file_distribution_factory,
write_3_iso_file_fixture_data_factory,
monitor_task,
):
def _populate_upstream(number):
upstream_domain = domain_factory()
tasks = []
for i in range(number):
repo = file_repository_factory(pulp_domain=upstream_domain.name, autopublish=True)
fix = write_3_iso_file_fixture_data_factory(str(i))
remote = file_remote_factory(pulp_domain=upstream_domain.name, manifest_path=fix)
body = {"remote": remote.pulp_href}
tasks.append(file_bindings.RepositoriesFileApi.sync(repo.pulp_href, body).task)
file_distribution_factory(
name=str(i),
pulp_domain=upstream_domain.name,
repository=repo.pulp_href,
pulp_labels={"upstream": str(i), "even" if i % 2 == 0 else "odd": ""},
)
for t in tasks:
monitor_task(t)
return upstream_domain

return _populate_upstream


@pytest.mark.parallel
def test_replicate_with_basic_q_select(
domain_factory,
populate_upstream,
bindings_cfg,
pulpcore_bindings,
monitor_task_group,
pulp_settings,
gen_object_with_cleanup,
):
"""Test basic label select replication."""
source_domain = populate_upstream(10)
dest_domain = domain_factory()
upstream_body = {
"name": str(uuid.uuid4()),
"base_url": bindings_cfg.host,
"api_root": pulp_settings.API_ROOT,
"domain": source_domain.name,
"username": bindings_cfg.username,
"password": bindings_cfg.password,
}
upstream = gen_object_with_cleanup(
pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=dest_domain.name
)
# Run the replicate task and assert that all 10 repos got synced
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 10

# Update q_select to sync only 'even' repos
body = {"q_select": "pulp_label_select='even'"}
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 5
assert {d.name for d in result.results} == {"0", "2", "4", "6", "8"}

# Update q_select to sync one 'upstream' repo
body["q_select"] = "pulp_label_select='upstream=4'"
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 1
assert result.results[0].name == "4"

# Show that basic label select is ANDed together
body["q_select"] = "pulp_label_select='even,upstream=0'"
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 1
assert result.results[0].name == "0"


@pytest.mark.parallel
def test_replicate_with_complex_q_select(
domain_factory,
populate_upstream,
bindings_cfg,
pulpcore_bindings,
monitor_task_group,
pulp_settings,
gen_object_with_cleanup,
):
"""Test complex q_select replication."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test where we need to quote the value inside of the quoted pulp_label_select filter string?

Copy link
Contributor Author

@gerrod3 gerrod3 Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example of what one mine look like? I'm just wondering what users would be quoting in their labels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing i learned is, that users are creative...
Consider:
pulp file repository label set --repository test1 --key "asdf" --value "'asdf/+*&\""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe we shouldn't allow quotes in labels. The ambiguity of it is very annoying and what value does a user gain by being able to use them?

I've been struggling with trying to get your example to work with the q=pulp_label_select filter and I'm not sure it is possible with how query expressions work. If you can get it to work then I'll add a test for it, else I'll just leave this as an edge case that hopefully users will be smart enough to avoid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are saying we cannot even select for these values today?
Can you add some sanitation for the label values to make sure we will never get there?
Maybe it's a separate PR, and I'm happy to have a migration fail in case a user has done that already.

source_domain = populate_upstream(10)
dest_domain = domain_factory()
upstream_body = {
"name": str(uuid.uuid4()),
"base_url": bindings_cfg.host,
"api_root": pulp_settings.API_ROOT,
"domain": source_domain.name,
"username": bindings_cfg.username,
"password": bindings_cfg.password,
"q_select": "pulp_label_select='upstream=1' OR pulp_label_select='upstream=2'",
}
upstream = gen_object_with_cleanup(
pulpcore_bindings.UpstreamPulpsApi, upstream_body, pulp_domain=dest_domain.name
)
# Run the replicate task and assert that two repos got synced
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 2
assert {d.name for d in result.results} == {"1", "2"}

# Test odds but not seven
body = {"q_select": "pulp_label_select='odd' AND NOT pulp_label_select='upstream=7'"}
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream.pulp_href)
monitor_task_group(response.task_group)
result = pulpcore_bindings.DistributionsApi.list(pulp_domain=dest_domain.name)
assert result.count == 4
assert {d.name for d in result.results} == {"1", "3", "5", "9"}

# Test we error when trying to provide an invalid q expression
body["q_select"] = "invalid='testing'"
with pytest.raises(pulpcore_bindings.ApiException) as e:
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
assert e.value.status == 400
assert e.value.body == '{"q_select":["Filter \'invalid\' does not exist."]}'

body["q_select"] = "name='hi' AND"
with pytest.raises(pulpcore_bindings.ApiException) as e:
pulpcore_bindings.UpstreamPulpsApi.partial_update(upstream.pulp_href, body)
assert e.value.status == 400
assert e.value.body == '{"q_select":["Syntax error in expression."]}'