Skip to content

Added AZURE_DEFAULT_CREDENTIAL_ALLOW_LIST support #39520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features Added

- Added `subscription` parameter to `AzureCliCredential` to specify the subscription to use when authenticating with the Azure CLI. ([#37994](https://github.com/Azure/azure-sdk-for-python/pull/37994))
- Added support for environment variable `AZURE_DEFAULT_CREDENTIAL_ALLOW_LIST` and argument `default_credential_allow_list` for customizing the behavior of `DefaultAzureCredential`. ([#39520](https://github.com/Azure/azure-sdk-for-python/pull/39520))

### Breaking Changes

Expand Down
109 changes: 72 additions & 37 deletions sdk/identity/azure-identity/azure/identity/_credentials/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# ------------------------------------
import logging
import os
from typing import List, Any, Optional, cast
from typing import List, Any, Optional, cast, Dict

from azure.core.credentials import AccessToken, AccessTokenInfo, TokenRequestOptions, SupportsTokenInfo, TokenCredential
from .._constants import EnvironmentVariables
Expand All @@ -23,6 +23,17 @@
_LOGGER = logging.getLogger(__name__)


VALID_CREDENTIALS = [
"DEVELOPER_CLI",
"WORKLOAD_IDENTITY",
"CLI",
"ENVIRONMENT",
"MANAGED_IDENTITY",
"POWERSHELL",
"SHARED_CACHE",
]


class DefaultAzureCredential(ChainedTokenCredential):
"""A credential capable of handling most Azure SDK authentication scenarios. For more information, See
`Usage guidance for DefaultAzureCredential
Expand All @@ -48,22 +59,11 @@ class DefaultAzureCredential(ChainedTokenCredential):
:keyword str authority: Authority of a Microsoft Entra endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
:keyword bool exclude_workload_identity_credential: Whether to exclude the workload identity from the credential.
Defaults to **False**.
:keyword bool exclude_developer_cli_credential: Whether to exclude the Azure Developer CLI
from the credential. Defaults to **False**.
:keyword bool exclude_cli_credential: Whether to exclude the Azure CLI from the credential. Defaults to **False**.
:keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment
variables from the credential. Defaults to **False**.
:keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential.
Defaults to **False**.
:keyword bool exclude_powershell_credential: Whether to exclude Azure PowerShell. Defaults to **False**.
:keyword bool exclude_visual_studio_code_credential: Whether to exclude stored credential from VS Code.
Defaults to **True**.
:keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to
**False**.
:keyword bool exclude_interactive_browser_credential: Whether to exclude interactive browser authentication (see
:class:`~azure.identity.InteractiveBrowserCredential`). Defaults to **True**.
:keyword str default_credential_allow_list: A semicolon-separated list of credential names.
The default is to try all available credentials. If this is set, only the credentials in the list are tried.
e.g. "ENVIRONMENT;CLI;MANAGED_IDENTITY" will only try EnvironmentCredential, AzureCliCredential, and
ManagedIdentityCredential. All valid credential names are "DEVELOPER_CLI", "WORKLOAD_IDENTITY", "CLI",
"ENVIRONMENT", "MANAGED_IDENTITY", "POWERSHELL" and "SHARED_CACHE".
:keyword str interactive_browser_tenant_id: Tenant ID to use when authenticating a user through
:class:`~azure.identity.InteractiveBrowserCredential`. Defaults to the value of environment variable
AZURE_TENANT_ID, if any. If unspecified, users will authenticate in their home tenants.
Expand Down Expand Up @@ -96,11 +96,12 @@ class DefaultAzureCredential(ChainedTokenCredential):
:caption: Create a DefaultAzureCredential.
"""

def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements, too-many-locals
def __init__(self, **kwargs: Any) -> None:
# pylint: disable=too-many-statements, too-many-locals
if "tenant_id" in kwargs:
raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.")

authority = kwargs.pop("authority", None)
authority: Optional[str] = kwargs.pop("authority", None)

vscode_tenant_id = kwargs.pop(
"visual_studio_code_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
Expand Down Expand Up @@ -144,45 +145,58 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
exclude_powershell_credential = kwargs.pop("exclude_powershell_credential", False)

credentials: List[SupportsTokenInfo] = []
avail_credentials: Dict[str, Any] = {}
within_dac.set(True)

if not exclude_environment_credential:
credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs))
env_cred = EnvironmentCredential(authority=authority, _within_dac=True, **kwargs)
avail_credentials["ENVIRONMENT"] = env_cred
credentials.append(env_cred)
if not exclude_workload_identity_credential:
if all(os.environ.get(var) for var in EnvironmentVariables.WORKLOAD_IDENTITY_VARS):
client_id = workload_identity_client_id
credentials.append(
WorkloadIdentityCredential(
client_id=cast(str, client_id),
tenant_id=workload_identity_tenant_id,
token_file_path=os.environ[EnvironmentVariables.AZURE_FEDERATED_TOKEN_FILE],
**kwargs
)
workload_cred = WorkloadIdentityCredential(
client_id=cast(str, client_id),
tenant_id=workload_identity_tenant_id,
token_file_path=os.environ[EnvironmentVariables.AZURE_FEDERATED_TOKEN_FILE],
**kwargs,
)
avail_credentials["WORKLOAD_IDENTITY"] = workload_cred
credentials.append(workload_cred)
if not exclude_managed_identity_credential:
credentials.append(
ManagedIdentityCredential(
client_id=managed_identity_client_id,
_exclude_workload_identity_credential=exclude_workload_identity_credential,
**kwargs
)
mi_cred = ManagedIdentityCredential(
client_id=managed_identity_client_id,
_exclude_workload_identity_credential=exclude_workload_identity_credential,
**kwargs,
)
avail_credentials["MANAGED_IDENTITY"] = mi_cred
credentials.append(mi_cred)
if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported():
try:
# username and/or tenant_id are only required when the cache contains tokens for multiple identities
shared_cache = SharedTokenCacheCredential(
username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs
)
avail_credentials["SHARED_CACHE"] = shared_cache
credentials.append(shared_cache)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.info("Shared token cache is unavailable: '%s'", ex)
if not exclude_visual_studio_code_credential:
credentials.append(VisualStudioCodeCredential(**vscode_args))
vscode_cred = VisualStudioCodeCredential(**vscode_args)
avail_credentials["VISUAL_STUDIO_CODE"] = vscode_cred
credentials.append(vscode_cred)
if not exclude_cli_credential:
credentials.append(AzureCliCredential(process_timeout=process_timeout))
cli_cred = AzureCliCredential(process_timeout=process_timeout)
avail_credentials["CLI"] = cli_cred
credentials.append(cli_cred)
if not exclude_powershell_credential:
credentials.append(AzurePowerShellCredential(process_timeout=process_timeout))
ps_cred = AzurePowerShellCredential(process_timeout=process_timeout)
avail_credentials["POWERSHELL"] = ps_cred
credentials.append(ps_cred)
if not exclude_developer_cli_credential:
credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout))
dev_cli_cred = AzureDeveloperCliCredential(process_timeout=process_timeout)
avail_credentials["DEVELOPER_CLI"] = dev_cli_cred
credentials.append(dev_cli_cred)
if not exclude_interactive_browser_credential:
if interactive_browser_client_id:
credentials.append(
Expand All @@ -192,6 +206,12 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
)
else:
credentials.append(InteractiveBrowserCredential(tenant_id=interactive_browser_tenant_id, **kwargs))
default_credential_allow_list = kwargs.pop(
"default_credential_allow_list", os.environ.get("AZURE_DEFAULT_CREDENTIAL_ALLOW_LIST")
)
if default_credential_allow_list:
default_credential_allow_list = default_credential_allow_list.upper()
credentials = parse_azure_dac(default_credential_allow_list, avail_credentials)
within_dac.set(False)
super(DefaultAzureCredential, self).__init__(*credentials)

Expand Down Expand Up @@ -256,3 +276,18 @@ def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] =
token_info = cast(SupportsTokenInfo, super()).get_token_info(*scopes, options=options)
within_dac.set(False)
return token_info


def parse_azure_dac(az_dac, avail_credentials):
striped_az_dac = az_dac.strip()
if striped_az_dac.endswith(";"):
striped_az_dac = striped_az_dac[:-1]
creds = [cred.strip() for cred in striped_az_dac.split(";")]
credentials = []

for cred in creds:
if cred not in VALID_CREDENTIALS:
raise ValueError(f"Invalid credential '{cred}' in AZURE_DEFAULT_CREDENTIAL_ALLOW_LIST")
credentials.append(avail_credentials.get(cred))

return credentials
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
# ------------------------------------
import logging
import os
from typing import List, Optional, Any, cast
from typing import List, Optional, Any, cast, Dict

from azure.core.credentials import AccessToken, AccessTokenInfo, TokenRequestOptions
from azure.core.credentials_async import AsyncTokenCredential, AsyncSupportsTokenInfo
from ..._constants import EnvironmentVariables
from ..._internal import get_default_authority, normalize_authority, within_dac
from ..._credentials.default import parse_azure_dac
from .azure_cli import AzureCliCredential
from .azd_cli import AzureDeveloperCliCredential
from .azure_powershell import AzurePowerShellCredential
Expand Down Expand Up @@ -48,20 +49,11 @@ class DefaultAzureCredential(ChainedTokenCredential):
:keyword str authority: Authority of a Microsoft Entra endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
:keyword bool exclude_workload_identity_credential: Whether to exclude the workload identity from the credential.
Defaults to **False**.
:keyword bool exclude_developer_cli_credential: Whether to exclude the Azure Developer CLI
from the credential. Defaults to **False**.
:keyword bool exclude_cli_credential: Whether to exclude the Azure CLI from the credential. Defaults to **False**.
:keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment
variables from the credential. Defaults to **False**.
:keyword bool exclude_powershell_credential: Whether to exclude Azure PowerShell. Defaults to **False**.
:keyword bool exclude_visual_studio_code_credential: Whether to exclude stored credential from VS Code.
Defaults to **True**.
:keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential.
Defaults to **False**.
:keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to
**False**.
:keyword str default_credential_allow_list: A semicolon-separated list of credential names.
The default is to try all available credentials. If this is set, only the credentials in the list are tried.
e.g. "ENVIRONMENT;CLI;MANAGED_IDENTITY" will only try EnvironmentCredential, AzureCliCredential, and
ManagedIdentityCredential. All valid credential names are "DEVELOPER_CLI", "WORKLOAD_IDENTITY", "CLI",
"ENVIRONMENT", "MANAGED_IDENTITY", "POWERSHELL" and "SHARED_CACHE".
:keyword str managed_identity_client_id: The client ID of a user-assigned managed identity. Defaults to the value
of the environment variable AZURE_CLIENT_ID, if any. If not specified, a system-assigned identity will be used.
:keyword str workload_identity_client_id: The client ID of an identity assigned to the pod. Defaults to the value
Expand Down Expand Up @@ -89,11 +81,12 @@ class DefaultAzureCredential(ChainedTokenCredential):
:caption: Create a DefaultAzureCredential.
"""

def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements
def __init__(self, **kwargs: Any) -> None:
# pylint: disable=too-many-statements, too-many-locals
if "tenant_id" in kwargs:
raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.")

authority = kwargs.pop("authority", None)
authority: Optional[str] = kwargs.pop("authority", None)

vscode_tenant_id = kwargs.pop(
"visual_studio_code_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
Expand Down Expand Up @@ -135,45 +128,63 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
exclude_powershell_credential = kwargs.pop("exclude_powershell_credential", False)

credentials: List[AsyncSupportsTokenInfo] = []
avail_credentials: Dict[str, Any] = {}
within_dac.set(True)
if not exclude_environment_credential:
credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs))
env_cred = EnvironmentCredential(authority=authority, _within_dac=True, **kwargs)
avail_credentials["ENVIRONMENT"] = env_cred
credentials.append(env_cred)
if not exclude_workload_identity_credential:
if all(os.environ.get(var) for var in EnvironmentVariables.WORKLOAD_IDENTITY_VARS):
client_id = workload_identity_client_id
credentials.append(
WorkloadIdentityCredential(
client_id=cast(str, client_id),
tenant_id=workload_identity_tenant_id,
token_file_path=os.environ[EnvironmentVariables.AZURE_FEDERATED_TOKEN_FILE],
**kwargs
)
)
if not exclude_managed_identity_credential:
credentials.append(
ManagedIdentityCredential(
client_id=managed_identity_client_id,
_exclude_workload_identity_credential=exclude_workload_identity_credential,
workload_cred = WorkloadIdentityCredential(
client_id=cast(str, client_id),
tenant_id=workload_identity_tenant_id,
token_file_path=os.environ[EnvironmentVariables.AZURE_FEDERATED_TOKEN_FILE],
**kwargs
)
avail_credentials["WORKLOAD_IDENTITY"] = workload_cred
credentials.append(workload_cred)
if not exclude_managed_identity_credential:
mi_cred = ManagedIdentityCredential(
client_id=managed_identity_client_id,
_exclude_workload_identity_credential=exclude_workload_identity_credential,
**kwargs
)
avail_credentials["MANAGED_IDENTITY"] = mi_cred
credentials.append(mi_cred)
if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported():
try:
# username and/or tenant_id are only required when the cache contains tokens for multiple identities
shared_cache = SharedTokenCacheCredential(
username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs
)
avail_credentials["SHARED_CACHE"] = shared_cache
credentials.append(shared_cache)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.info("Shared token cache is unavailable: '%s'", ex)
if not exclude_visual_studio_code_credential:
credentials.append(VisualStudioCodeCredential(**vscode_args))
vscode_cred = VisualStudioCodeCredential(**vscode_args)
avail_credentials["VISUAL_STUDIO_CODE"] = vscode_cred
credentials.append(vscode_cred)
if not exclude_cli_credential:
credentials.append(AzureCliCredential(process_timeout=process_timeout))
cli_cred = AzureCliCredential(process_timeout=process_timeout)
avail_credentials["CLI"] = cli_cred
credentials.append(cli_cred)
if not exclude_powershell_credential:
credentials.append(AzurePowerShellCredential(process_timeout=process_timeout))
ps_cred = AzurePowerShellCredential(process_timeout=process_timeout)
avail_credentials["POWERSHELL"] = ps_cred
credentials.append(ps_cred)
if not exclude_developer_cli_credential:
credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout))
dev_cli_cred = AzureDeveloperCliCredential(process_timeout=process_timeout)
avail_credentials["DEVELOPER_CLI"] = dev_cli_cred
credentials.append(dev_cli_cred)
default_credential_allow_list = kwargs.pop(
"default_credential_allow_list", os.environ.get("AZURE_DEFAULT_CREDENTIAL_ALLOW_LIST")
)
if default_credential_allow_list:
default_credential_allow_list = default_credential_allow_list.upper()
credentials = parse_azure_dac(default_credential_allow_list, avail_credentials)
within_dac.set(False)
super().__init__(*credentials)

Expand Down
Loading