Skip to content

Unhandled Exception in Session Management Causing Application Crash [ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed #2

@ArturWincenciak

Description

@ArturWincenciak

We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:

[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable.  Session is closed
Unhandled exception. System.InvalidOperationException: Data services unavailable.  Session is closed
   at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)
   at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()
   at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.

To provide more context, below is how we create the session, queue, and subscription:

public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)
{
    _session = PlatformSession.Definition()
        .AppKey(options.AppKey)
        .OAuthGrantType(new GrantPassword()
            .UserName(options.UserName)
            .Password(options.Password))
        .TakeSignonControl(true)
        .GetSession()
        .OnState((state, msg, _) =>
            logger.LogInformation("On session state changed: {state}. {msg}", state, msg))
        .OnEvent((eventCode, msg, _) =>
            logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));

    var openSessionStatus = await _session.OpenAsync(ct);
    if (openSessionStatus != Session.State.Opened)
        logger.LogWarning("Session is not open, status: {status}", openSessionStatus);

    var queueDefinition = Queue.Definition(options.Endpoint);
    var queueManager = queueDefinition
        .CreateQueueManager()
        .OnError((err, _) => logger.LogError("Error: {error}", err));

    var queueCriteria = new JObject {{prop, value}};
    var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);

    var subscriber = queueDefinition.CreateAWSSubscriber(queue);
    var status = await subscriber.StartPollingAsync((response, _) =>
    {
        callback(response, ct).GetAwaiter().GetResult();
    });

    if (!status)
        logger.LogWarning("Start polling failed");
}

The synchronization point in our application is the subscriber.StartPollingAsync method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { } block to catch exceptions, as shown below:

try
{
    await client.Subscribe(async (news, ct) =>
    {
        await pipeline.Handle(news, ct); // Perform business logic
    }, cancellationToken);
}
catch (Exception ex)
{
    switch (ex)
    {
        case AmazonSQSException sqsEx:
            // Log the exception
            // Close the current session and queue subscription
            // Re-create the session and re-subscribe to the queue
            break;
        default:
            // Log any other exceptions
            break;
    }
}

However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.

That method _session.OpenAsync does not allow us to catch the [ERROR] [253] [EndpointDefinition] exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.

To give more context: in our application, we also make HTTP API calls in the following way:

try
{
    var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);
    // Perform business logic
}
catch (Exception ex)
{
    // Log the exception
}

Under the hood, this static method EndpointRequest.Definition implicitly uses the previously created session.

As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition] that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition method. However, we did not catch any exceptions log in the try { } catch { } block.

I suspect that the log [ERROR] [253] [EndpointDefinition] originates from the implementation of the EndpointRequest.Definition method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.

I would also like to address the AmazonSQSException, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.

If our approach is correct, this information might help identify areas in your library that could be improved.

Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync method that accepts an asynchronous callback, such as:

Task<bool> StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);

Currently, the only available method is:

Task<bool> StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);

This requires us to create a synchronization point for the callback using GetAwaiter and GetResult like this:

var status = await subscriber.StartPollingAsync((response, _) =>
{
    callback(response, ct).GetAwaiter().GetResult();
});

Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions