Skip to content

.Net: [WIP] OTel model diagnostics: streaming APIs #6242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
14 changes: 10 additions & 4 deletions dotnet/samples/Demos/TelemetryWithAppInsights/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,26 @@ private static async Task RunHuggingFaceChatAsync(Kernel kernel)

private static async Task RunChatAsync(Kernel kernel)
{
// Using non-streaming to get the poem.
var poem = await kernel.InvokeAsync<string>(
"WriterPlugin",
"ShortPoem",
new KernelArguments { ["input"] = "Write a poem about John Doe." });
var translatedPoem = await kernel.InvokeAsync<string>(
Console.WriteLine($"Poem:\n{poem}\n\n");

// Use streaming to translate the poem.
Console.WriteLine("Translated Poem:");
await foreach (var update in kernel.InvokeStreamingAsync<string>(
"WriterPlugin",
"Translate",
new KernelArguments
{
["input"] = poem,
["language"] = "Italian"
});

Console.WriteLine($"Poem:\n{poem}\n\nTranslated Poem:\n{translatedPoem}");
}))
{
Console.Write(update);
}
}

private static Kernel GetKernel(ILoggerFactory loggerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,33 @@ public async IAsyncEnumerable<StreamingChatMessageContent> StreamGenerateChatMes

for (state.Iteration = 1; ; state.Iteration++)
{
using var httpRequestMessage = await this.CreateHttpRequestAsync(state.GeminiRequest, this._chatStreamingEndpoint).ConfigureAwait(false);
using var response = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);
using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);

await foreach (var messageContent in this.GetStreamingChatMessageContentsOrPopulateStateForToolCallingAsync(state, responseStream, cancellationToken).ConfigureAwait(false))
using (var activity = ModelDiagnostics.StartCompletionActivity(
this._chatGenerationEndpoint, this._modelId, ModelProvider, chatHistory, executionSettings))
{
yield return messageContent;
HttpResponseMessage httpResponseMessage;
Stream responseStream;
try
{
using var httpRequestMessage = await this.CreateHttpRequestAsync(state.GeminiRequest, this._chatStreamingEndpoint).ConfigureAwait(false);
// We cannot dispose these two objects leaving the try-catch block because we need them to read the response stream
httpResponseMessage = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

await foreach (var messageContent in this.GetStreamingChatMessageContentsOrPopulateStateForToolCallingAsync(state, responseStream, cancellationToken).ConfigureAwait(false))
{
activity?.AddStreamingContent(messageContent);
yield return messageContent;
}

activity?.EndStreaming();
httpResponseMessage.Dispose();
responseStream.Dispose();
}

if (!state.AutoInvoke)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,31 @@ public async IAsyncEnumerable<StreamingTextContent> StreamGenerateTextAsync(
var request = this.CreateTextRequest(prompt, executionSettings);
request.Stream = true;

using var httpRequestMessage = this.CreatePost(request, endpoint, this.ApiKey);

using var response = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);

using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);
using var activity = ModelDiagnostics.StartCompletionActivity(endpoint, modelId, this.ModelProvider, prompt, executionSettings);
HttpResponseMessage httpResponseMessage;
Stream responseStream;
try
{
using var httpRequestMessage = this.CreatePost(request, endpoint, this.ApiKey);
// We cannot dispose these two objects leaving the try-catch block because we need them to read the response stream
httpResponseMessage = await this.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

await foreach (var streamingTextContent in this.ProcessTextResponseStreamAsync(responseStream, modelId, cancellationToken).ConfigureAwait(false))
{
activity?.AddStreamingContent(streamingTextContent);
yield return streamingTextContent;
}

activity?.EndStreaming();
httpResponseMessage.Dispose();
responseStream.Dispose();
}

private async IAsyncEnumerable<StreamingTextContent> ProcessTextResponseStreamAsync(Stream stream, string modelId, [EnumeratorCancellation] CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,31 @@ internal async IAsyncEnumerable<StreamingChatMessageContent> StreamCompleteChatM
var request = this.CreateChatRequest(chatHistory, executionSettings);
request.Stream = true;

using var httpRequestMessage = this._clientCore.CreatePost(request, endpoint, this._clientCore.ApiKey);

using var response = await this._clientCore.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken)
.ConfigureAwait(false);

using var responseStream = await response.Content.ReadAsStreamAndTranslateExceptionAsync()
.ConfigureAwait(false);
using var activity = ModelDiagnostics.StartCompletionActivity(endpoint, modelId, this._clientCore.ModelProvider, chatHistory, executionSettings);
HttpResponseMessage httpResponseMessage;
Stream responseStream;
try
{
using var httpRequestMessage = this._clientCore.CreatePost(request, endpoint, this._clientCore.ApiKey);
// We cannot dispose these two objects leaving the try-catch block because we need them to read the response stream
httpResponseMessage = await this._clientCore.SendRequestAndGetResponseImmediatelyAfterHeadersReadAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
responseStream = await httpResponseMessage.Content.ReadAsStreamAndTranslateExceptionAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

await foreach (var streamingChatContent in this.ProcessChatResponseStreamAsync(responseStream, modelId, cancellationToken).ConfigureAwait(false))
{
activity?.AddStreamingContent(streamingChatContent);
yield return streamingChatContent;
}

activity?.EndStreaming();
httpResponseMessage.Dispose();
responseStream.Dispose();
}

internal async Task<IReadOnlyList<ChatMessageContent>> CompleteChatMessageAsync(
Expand Down
72 changes: 52 additions & 20 deletions dotnet/src/Connectors/Connectors.OpenAI/AzureSdk/ClientCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ internal ClientCore(ILogger? logger = null)
/// <summary>
/// Creates completions for the prompt and settings.
/// </summary>
/// <param name="text">The prompt to complete.</param>
/// <param name="prompt">The prompt to complete.</param>
/// <param name="executionSettings">Execution settings for the completion API.</param>
/// <param name="kernel">The <see cref="Kernel"/> containing services, plugins, and other state for use throughout the operation.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>Completions generated by the remote model</returns>
internal async Task<IReadOnlyList<TextContent>> GetTextResultsAsync(
string text,
string prompt,
PromptExecutionSettings? executionSettings,
Kernel? kernel,
CancellationToken cancellationToken = default)
Expand All @@ -134,11 +134,11 @@ internal async Task<IReadOnlyList<TextContent>> GetTextResultsAsync(

ValidateMaxTokens(textExecutionSettings.MaxTokens);

var options = CreateCompletionsOptions(text, textExecutionSettings, this.DeploymentOrModelName);
var options = CreateCompletionsOptions(prompt, textExecutionSettings, this.DeploymentOrModelName);

Completions? responseData = null;
List<TextContent> responseContent;
using (var activity = ModelDiagnostics.StartCompletionActivity(this.Endpoint, this.DeploymentOrModelName, ModelProvider, text, executionSettings))
using (var activity = ModelDiagnostics.StartCompletionActivity(this.Endpoint, this.DeploymentOrModelName, ModelProvider, prompt, executionSettings))
{
try
{
Expand Down Expand Up @@ -183,15 +183,30 @@ internal async IAsyncEnumerable<StreamingTextContent> GetStreamingTextContentsAs

var options = CreateCompletionsOptions(prompt, textExecutionSettings, this.DeploymentOrModelName);

StreamingResponse<Completions>? response = await RunRequestAsync(() => this.Client.GetCompletionsStreamingAsync(options, cancellationToken)).ConfigureAwait(false);
using var activity = ModelDiagnostics.StartCompletionActivity(this.Endpoint, this.DeploymentOrModelName, ModelProvider, prompt, executionSettings);

StreamingResponse<Completions>? response;
try
{
response = await RunRequestAsync(() => this.Client.GetCompletionsStreamingAsync(options, cancellationToken)).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

await foreach (Completions completions in response.ConfigureAwait(false))
{
foreach (Choice choice in completions.Choices)
{
yield return new OpenAIStreamingTextContent(choice.Text, choice.Index, this.DeploymentOrModelName, choice, GetTextChoiceMetadata(completions, choice));
var openAIStreamingTextContent = new OpenAIStreamingTextContent(choice.Text, choice.Index, this.DeploymentOrModelName, choice, GetTextChoiceMetadata(completions, choice));
activity?.AddStreamingContent(openAIStreamingTextContent);
yield return openAIStreamingTextContent;
}
}

activity?.EndStreaming();
}

private static Dictionary<string, object?> GetTextChoiceMetadata(Completions completions, Choice choice)
Expand Down Expand Up @@ -613,9 +628,6 @@ internal async IAsyncEnumerable<OpenAIStreamingChatMessageContent> GetStreamingC

for (int requestIndex = 1; ; requestIndex++)
{
// Make the request.
var response = await RunRequestAsync(() => this.Client.GetChatCompletionsStreamingAsync(chatOptions, cancellationToken)).ConfigureAwait(false);

// Reset state
contentBuilder?.Clear();
toolCallIdsByIndex?.Clear();
Expand All @@ -627,25 +639,45 @@ internal async IAsyncEnumerable<OpenAIStreamingChatMessageContent> GetStreamingC
string? streamedName = null;
ChatRole? streamedRole = default;
CompletionsFinishReason finishReason = default;
await foreach (StreamingChatCompletionsUpdate update in response.ConfigureAwait(false))

// Make the request.
using (var activity = ModelDiagnostics.StartCompletionActivity(this.Endpoint, this.DeploymentOrModelName, ModelProvider, chat, executionSettings))
{
metadata = GetResponseMetadata(update);
streamedRole ??= update.Role;
streamedName ??= update.AuthorName;
finishReason = update.FinishReason ?? default;
StreamingResponse<StreamingChatCompletionsUpdate>? response;
try
{
response = await RunRequestAsync(() => this.Client.GetChatCompletionsStreamingAsync(chatOptions, cancellationToken)).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetError(ex);
throw;
}

// If we're intending to invoke function calls, we need to consume that function call information.
if (autoInvoke)
await foreach (StreamingChatCompletionsUpdate update in response.ConfigureAwait(false))
{
if (update.ContentUpdate is { Length: > 0 } contentUpdate)
metadata = GetResponseMetadata(update);
streamedRole ??= update.Role;
streamedName ??= update.AuthorName;
finishReason = update.FinishReason ?? default;

// If we're intending to invoke function calls, we need to consume that function call information.
if (autoInvoke)
{
(contentBuilder ??= new()).Append(contentUpdate);
if (update.ContentUpdate is { Length: > 0 } contentUpdate)
{
(contentBuilder ??= new()).Append(contentUpdate);
}

OpenAIFunctionToolCall.TrackStreamingToolingUpdate(update.ToolCallUpdate, ref toolCallIdsByIndex, ref functionNamesByIndex, ref functionArgumentBuildersByIndex);
}

OpenAIFunctionToolCall.TrackStreamingToolingUpdate(update.ToolCallUpdate, ref toolCallIdsByIndex, ref functionNamesByIndex, ref functionArgumentBuildersByIndex);
var openAIStreamingChatMessageContent = new OpenAIStreamingChatMessageContent(update, update.ChoiceIndex ?? 0, this.DeploymentOrModelName, metadata) { AuthorName = streamedName };
activity?.AddStreamingContent(openAIStreamingChatMessageContent);
yield return openAIStreamingChatMessageContent;
}

yield return new OpenAIStreamingChatMessageContent(update, update.ChoiceIndex ?? 0, this.DeploymentOrModelName, metadata) { AuthorName = streamedName };
activity?.EndStreaming();
}

// If we don't have a function to invoke, we're done.
Expand Down
Loading
Loading