Skip to content

fix: check for valid settings before creating EventStreams #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/aap_eda/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,29 @@ def valid_hash_format(fmt: str):
return fmt


def valid_webhook_auth_type(auth_type: str):
"""Check webhook auth type."""
if auth_type not in enums.WebhookAuthType.values():
def _validate_webhook_settings(auth_type: str):
"""Check webhook settings."""
if (
auth_type == enums.WebhookCredentialType.MTLS
and not settings.WEBHOOK_MTLS_BASE_URL
):
raise serializers.ValidationError(
(
f"EventStream of type {auth_type} cannot be used "
"because WEBHOOK_MTLS_BASE_URL is missing in settings."
)
)

if (
auth_type != enums.WebhookCredentialType.MTLS
and not settings.WEBHOOK_BASE_URL
):
raise serializers.ValidationError(
(
f"Invalid auth_type {auth_type} should "
f"be one of {enums.WebhookAuthType.values()}"
f"EventStream of type {auth_type} cannot be used "
"because WEBHOOK_BASE_URL is missing in settings."
)
)
return auth_type


def check_if_webhooks_exists(webhook_ids: list[int]) -> list[int]:
Expand Down Expand Up @@ -323,4 +336,5 @@ def check_credential_types_for_webhook(eda_credential_id: int) -> int:
f"The type of credential can only be one of {names}"
)

_validate_webhook_settings(name)
return eda_credential_id
23 changes: 9 additions & 14 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,20 +741,15 @@ def get_rulebook_process_log_level() -> RulebookProcessLogLevel:
PG_NOTIFY_DSN_SERVER = settings.get(
"PG_NOTIFY_DSN_SERVER", _DEFAULT_PG_NOTIFY_DSN_SERVER
)
SERVER_UUID = settings.get("SERVER_UUID", "abc-def-123-34567")
WEBHOOK_BASE_URL = (
settings.get(
"WEBHOOK_BASE_URL", f"https://ui.eda.local:8443/{SERVER_UUID}"
).strip("/")
+ "/"
)
WEBHOOK_MTLS_BASE_URL = (
settings.get(
"WEBHOOK_MTLS_BASE_URL",
f"https://ui.eda.local:8443/mtls/{SERVER_UUID}",
).strip("/")
+ "/"
)

WEBHOOK_BASE_URL = settings.get("WEBHOOK_BASE_URL", None)
if WEBHOOK_BASE_URL:
WEBHOOK_BASE_URL = WEBHOOK_BASE_URL.strip("/") + "/"

WEBHOOK_MTLS_BASE_URL = settings.get("WEBHOOK_MTLS_BASE_URL", None)
if WEBHOOK_MTLS_BASE_URL:
WEBHOOK_MTLS_BASE_URL = WEBHOOK_MTLS_BASE_URL.strip("/") + "/"

MAX_PG_NOTIFY_MESSAGE_SIZE = int(
settings.get("MAX_PG_NOTIFY_MESSAGE_SIZE", 6144)
)
82 changes: 79 additions & 3 deletions tests/integration/api/test_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pytest
import requests_mock
import yaml
from django.test import override_settings
from ecdsa import SigningKey
from ecdsa.util import sigencode_der
from rest_framework import status
Expand Down Expand Up @@ -908,6 +909,77 @@ def test_post_webhook_with_mtls(
assert response.status_code == auth_status


@pytest.mark.django_db
def test_post_webhook_with_mtls_missing_settings(
admin_client: APIClient,
preseed_credential_types,
):
header_key = "Subject"
inputs = {
"auth_type": "mtls",
"subject": "Subject",
"http_header_key": header_key,
}

obj = _create_webhook_credential(
admin_client, enums.WebhookCredentialType.MTLS.value, inputs
)

data_in = {
"name": "test-webhook-1",
"eda_credential_id": obj["id"],
"test_mode": True,
}
with override_settings(WEBHOOK_MTLS_BASE_URL=None):
response = admin_client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {
"eda_credential_id": [
(
"EventStream of type mTLS Webhook cannot be "
"used because WEBHOOK_MTLS_BASE_URL is "
"missing in settings."
)
]
}


@pytest.mark.django_db
def test_post_webhook_with_basic_auth_missing_settings(
admin_client: APIClient,
preseed_credential_types,
):
secret = secrets.token_hex(32)
username = "fred"
inputs = {
"auth_type": "basic",
"username": username,
"password": secret,
"http_header_key": "Authorization",
}

obj = _create_webhook_credential(
admin_client, enums.WebhookCredentialType.BASIC.value, inputs
)

data_in = {
"name": "test-webhook-1",
"eda_credential_id": obj["id"],
"test_mode": True,
}
with override_settings(WEBHOOK_BASE_URL=None):
response = admin_client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {
"eda_credential_id": [
(
"EventStream of type Basic Webhook cannot be used because "
"WEBHOOK_BASE_URL is missing in settings."
)
]
}


def _create_webhook_credential(
client: APIClient,
credential_type_name: str,
Expand All @@ -928,6 +1000,10 @@ def _create_webhook_credential(


def _create_webhook(client: APIClient, data_in: dict) -> models.Webhook:
response = client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_201_CREATED
return models.Webhook.objects.get(id=response.data["id"])
with override_settings(
WEBHOOK_BASE_URL="https://www.example.com/",
WEBHOOK_MTLS_BASE_URL="https://www.example.com/",
):
response = client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_201_CREATED
return models.Webhook.objects.get(id=response.data["id"])
16 changes: 9 additions & 7 deletions tests/integration/dab_rbac/test_crud_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ansible_base.rbac import permission_registry
from ansible_base.rbac.models import DABPermission, RoleDefinition
from django.contrib.contenttypes.models import ContentType
from django.test import override_settings
from django.urls.exceptions import NoReverseMatch
from rest_framework.reverse import reverse

Expand Down Expand Up @@ -56,11 +57,11 @@ def test_add_permissions(
pytest.skip("Model has no add permission")

url = reverse(f"{get_basename(model)}-list")

response = user_client.post(url, data=post_data)
prior_ct = model.objects.count()
assert response.status_code == 403, response.data
assert model.objects.count() == prior_ct # assure nothing was created
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = user_client.post(url, data=post_data)
prior_ct = model.objects.count()
assert response.status_code == 403, response.data
assert model.objects.count() == prior_ct # assure nothing was created

# Figure out the parent object if we can
parent_field_name = permission_registry.get_parent_fd_name(model)
Expand Down Expand Up @@ -109,8 +110,9 @@ def test_add_permissions(
related_perm = "view"
give_obj_perm(default_user, related_obj, related_perm)

response = user_client.post(url, data=post_data, format="json")
assert response.status_code == 201, response.data
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = user_client.post(url, data=post_data, format="json")
assert response.status_code == 201, response.data

if model.objects.count() == 1:
obj = model.objects.first()
Expand Down
7 changes: 5 additions & 2 deletions tests/integration/dab_rbac/test_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pytest
from django.apps import apps
from django.core.exceptions import FieldDoesNotExist
from django.test import override_settings
from django.urls.exceptions import NoReverseMatch
from rest_framework.reverse import reverse

Expand Down Expand Up @@ -51,7 +52,8 @@ def test_create_with_default_org(cls_factory, model, admin_client, request):
except NoReverseMatch:
pytest.skip("Not testing model for now")

response = admin_client.post(url, data=post_data, format="json")
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = admin_client.post(url, data=post_data, format="json")

if response.status_code == 405:
pytest.skip("Not testing model not allowing creation for now")
Expand Down Expand Up @@ -87,7 +89,8 @@ def test_create_with_custom_org(
except NoReverseMatch:
pytest.skip("Not testing model with no list view for now")

response = superuser_client.post(url, data=post_data, format="json")
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = superuser_client.post(url, data=post_data, format="json")

if response.status_code == 405:
pytest.skip("Not testing model not allowing creation for now")
Expand Down
Loading