Skip to content

Session Container not created for other consistency levels #41920

Open
@tvaron3

Description

@tvaron3
  • Package Name:
  • Package Version:
  • Operating System:
  • Python Version:

Describe the bug
Here is a stack trace for the nonetype error due to session container being empty.
client = <azure.cosmos._cosmos_client_connection.CosmosClientConnection object at 0x118c9b240>
global_endpoint_manager = <azure.cosmos._global_partition_endpoint_manager_circuit_breaker._GlobalPartitionEndpointManagerForCircuitBreaker object at 0x118c9b350>
function = <function _Request at 0x118914c20>
args = (<azure.cosmos._request_object.RequestObject object at 0x118e7c050>, <azure.cosmos.documents.ConnectionPolicy object a...er%20f2e1ec53-e0d8-4f73-8db9-533b404df9a2/docs/test_item_no_preferred_locations2cf3b662-fe17-4b7c-a3e4-42680a9823c4/'>)
kwargs = {}, pk_range_wrapper = None
endpointDiscovery_retry_policy = <azure.cosmos._endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy object at 0x118e863d0>
database_account_retry_policy = <azure.cosmos._database_account_retry_policy.DatabaseAccountRetryPolicy object at 0x118e717f0>
resourceThrottle_retry_policy = <azure.cosmos._resource_throttle_retry_policy.ResourceThrottleRetryPolicy object at 0x118e71400>
defaultRetry_policy = <azure.cosmos._default_retry_policy.DefaultRetryPolicy object at 0x118e718d0>
sessionRetry_policy = <azure.cosmos._session_retry_policy._SessionRetryPolicy object at 0x118e16fd0>
partition_key_range_gone_retry_policy = <azure.cosmos._gone_retry_policy.PartitionKeyRangeGoneRetryPolicy object at 0x118e71940>
timeout_failover_retry_policy = <azure.cosmos._timeout_failover_retry_policy._TimeoutFailoverRetryPolicy object at 0x118e86350>
service_response_retry_policy = <azure.cosmos._service_response_retry_policy.ServiceResponseRetryPolicy object at 0x118e17070>
service_request_retry_policy = <azure.cosmos._service_request_retry_policy.ServiceRequestRetryPolicy object at 0x118e17390>

def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylint: disable=too-many-locals
    """Executes the function with passed parameters applying all retry policies

    :param object client:
        Document client instance
    :param object global_endpoint_manager:
        Instance of _GlobalEndpointManager class
    :param function function:
        Function to be called wrapped with retries
    :param list args:
    :returns: the result of running the passed in function as a (result, headers) tuple
    :rtype: tuple of (dict, dict)
    """
    pk_range_wrapper = None
    if args and global_endpoint_manager.is_circuit_breaker_applicable(args[0]):
        pk_range_wrapper = global_endpoint_manager.create_pk_range_wrapper(args[0])
    # instantiate all retry policies here to be applied for each request execution
    endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy(
        client.connection_policy, global_endpoint_manager, *args
    )
    database_account_retry_policy = _database_account_retry_policy.DatabaseAccountRetryPolicy(
        client.connection_policy
    )
    resourceThrottle_retry_policy = _resource_throttle_retry_policy.ResourceThrottleRetryPolicy(
        client.connection_policy.RetryOptions.MaxRetryAttemptCount,
        client.connection_policy.RetryOptions.FixedRetryIntervalInMilliseconds,
        client.connection_policy.RetryOptions.MaxWaitTimeInSeconds,
    )
    defaultRetry_policy = _default_retry_policy.DefaultRetryPolicy(*args)

    sessionRetry_policy = _session_retry_policy._SessionRetryPolicy(
        client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, pk_range_wrapper, *args
    )

    partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args)

    timeout_failover_retry_policy = _timeout_failover_retry_policy._TimeoutFailoverRetryPolicy(
        client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args
    )
    service_response_retry_policy = _service_response_retry_policy.ServiceResponseRetryPolicy(
        client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args,
    )
    service_request_retry_policy = _service_request_retry_policy.ServiceRequestRetryPolicy(
        client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args,
    )
    # HttpRequest we would need to modify for Container Recreate Retry Policy
    request = None
    if args and len(args) > 3:
        # Reference HttpRequest instance in args
        request = args[3]
        container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
            client, client._container_properties_cache, request, *args)
    else:
        container_recreate_retry_policy = _container_recreate_retry_policy.ContainerRecreateRetryPolicy(
            client, client._container_properties_cache, None, *args)

    while True:
        client_timeout = kwargs.get('timeout')
        start_time = time.time()
        try:
            if args:
                result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)
                global_endpoint_manager.record_success(args[0])
            else:
                result = ExecuteFunction(function, *args, **kwargs)
            if not client.last_response_headers:
                client.last_response_headers = {}

            # setting the throttle related response headers before returning the result
            client.last_response_headers[
                HttpHeaders.ThrottleRetryCount
            ] = resourceThrottle_retry_policy.current_retry_attempt_count
            client.last_response_headers[
                HttpHeaders.ThrottleRetryWaitTimeInMs
            ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
            # TODO: It is better to raise Exceptions manually in the method related to the request,
            #  a rework of retry would be needed to be able to retry exceptions raised that way.
            #  for now raising a manual exception here should allow it to be retried.
            # If container does not have throughput, results will return empty list.
            # We manually raise a 404. We raise it here, so we can handle it in retry utilities.
            if result and isinstance(result[0], dict) and 'Offers' in result[0] and \
                    not result[0]['Offers'] and request.method == 'POST':
                # Grab the link used for getting throughput properties to add to message.
                link = json.loads(request.body)["parameters"][0]["value"]
                raise exceptions.CosmosResourceNotFoundError(
                    status_code=StatusCodes.NOT_FOUND,
                    message="Could not find ThroughputProperties for container " + link,
                    sub_status_code=SubStatusCodes.THROUGHPUT_OFFER_NOT_FOUND)
            return result
        except exceptions.CosmosHttpResponseError as e:
            if request and _has_database_account_header(request.headers):
                retry_policy = database_account_retry_policy
            # Re-assign retry policy based on error code
            elif e.status_code == StatusCodes.FORBIDDEN and e.sub_status in\
                    [SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
                retry_policy = endpointDiscovery_retry_policy
            elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
                retry_policy = resourceThrottle_retry_policy
            elif (
                e.status_code == StatusCodes.NOT_FOUND
                and e.sub_status
                and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
            ):
                retry_policy = sessionRetry_policy
            elif exceptions._partition_range_is_gone(e):
                retry_policy = partition_key_range_gone_retry_policy
            elif exceptions._container_recreate_exception(e):
                retry_policy = container_recreate_retry_policy
                # Before we retry if retry policy is container recreate, we need refresh the cache of the
                # container properties and pass in the new RID in the headers.
                client._refresh_container_properties_cache(retry_policy.container_link)
                if e.sub_status != SubStatusCodes.COLLECTION_RID_MISMATCH and retry_policy.check_if_rid_different(
                        retry_policy.container_link, client._container_properties_cache, retry_policy.container_rid):
                    retry_policy.refresh_container_properties_cache = False
                else:
                    cached_container = client._container_properties_cache[retry_policy.container_link]
                    # If partition key value was previously extracted from the document definition
                    # reattempt to extract partition key with updated partition key definition
                    if retry_policy.should_extract_partition_key(cached_container):
                        new_partition_key = retry_policy._extract_partition_key(
                            client, container_cache=cached_container, body=request.body
                        )
                        request.headers[HttpHeaders.PartitionKey] = new_partition_key
                    # If getting throughput, we have to replace the container link received from stale cache
                    # with refreshed cache
                    if retry_policy.should_update_throughput_link(request.body, cached_container):
                        new_body = retry_policy._update_throughput_link(request.body)
                        request.body = new_body

                    retry_policy.container_rid = cached_container["_rid"]
                    request.headers[retry_policy._intended_headers] = retry_policy.container_rid
            elif e.status_code == StatusCodes.REQUEST_TIMEOUT or e.status_code >= StatusCodes.INTERNAL_SERVER_ERROR:
                if args:
                    # record the failure for circuit breaker tracking
                    global_endpoint_manager.record_failure(args[0])
                retry_policy = timeout_failover_retry_policy
            else:
                retry_policy = defaultRetry_policy

            # If none of the retry policies applies or there is no retry needed, set the
            # throttle related response headers and re-throw the exception back arg[0]
            # is the request. It needs to be modified for write forbidden exception
            if not retry_policy.ShouldRetry(e):
                if not client.last_response_headers:
                    client.last_response_headers = {}
                client.last_response_headers[
                    HttpHeaders.ThrottleRetryCount
                ] = resourceThrottle_retry_policy.current_retry_attempt_count
                client.last_response_headers[
                    HttpHeaders.ThrottleRetryWaitTimeInMs
                ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
                if args and args[0].should_clear_session_token_on_session_read_failure:
                  client.session.clear_session_token(client.last_response_headers)

E AttributeError: 'NoneType' object has no attribute 'clear_session_token'

To Reproduce
Run this test with an account with a consistency level that is not session. TestPreferredLocations.test_read_no_preferred_locations_with_errors

Metadata

Metadata

Assignees

No one assigned

    Labels

    ClientThis issue points to a problem in the data-plane of the library.Cosmos

    Type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions