diff --git a/src/api.ts b/src/api.ts index 13bed68..dd4426c 100644 --- a/src/api.ts +++ b/src/api.ts @@ -399,6 +399,7 @@ export class API { return response.data; } catch (e) { + console.error(e); if (e instanceof AxiosError) { throw new Error(JSON.stringify(e.response?.data.errors)); } else { @@ -426,6 +427,7 @@ export class API { return response.data; } catch (e) { + console.error(e); if (e instanceof AxiosError) { throw new Error(JSON.stringify(e.response?.data)); } else { @@ -696,70 +698,60 @@ export class API { orderBy?: GenerationsOrderBy; }): Promise> { const query = ` - query GetGenerations( - $after: ID, - $before: ID, - $cursorAnchor: DateTime, - $filters: [generationsInputType!], - $orderBy: GenerationsOrderByInput, - $first: Int, - $last: Int, - $projectId: String, + query GetGenerations( + $after: ID + $before: ID + $cursorAnchor: DateTime + $filters: [generationsInputType!] + $orderBy: GenerationsOrderByInput + $first: Int + $last: Int + $projectId: String ) { - generations( - after: $after, - before: $before, - cursorAnchor: $cursorAnchor, - filters: $filters, - orderBy: $orderBy, - first: $first, - last: $last, - projectId: $projectId, - ) { + generations( + after: $after + before: $before + cursorAnchor: $cursorAnchor + filters: $filters + orderBy: $orderBy + first: $first + last: $last + projectId: $projectId + ) { pageInfo { - startCursor - endCursor - hasNextPage - hasPreviousPage + startCursor + endCursor + hasNextPage + hasPreviousPage } totalCount edges { - cursor - node { - id - projectId - prompt - completion - createdAt - provider - model - variables - messages - messageCompletion - tools - settings - stepId - tokenCount - duration - inputTokenCount - outputTokenCount - ttFirstToken - duration - tokenThroughputInSeconds - error - type - tags - step { - threadId - thread { - participant { - identifier - } - } - } - } - } + cursor + node { + id + projectId + prompt + completion + createdAt + provider + model + variables + messages + messageCompletion + tools + settings + tokenCount + duration + inputTokenCount + outputTokenCount + ttFirstToken + tokenThroughputInSeconds + error + type + tags + } } + } }`; const result = await this.makeGqlCall(query, variables); diff --git a/src/instrumentation/index.ts b/src/instrumentation/index.ts index 6308d09..526bb2f 100644 --- a/src/instrumentation/index.ts +++ b/src/instrumentation/index.ts @@ -1,20 +1,15 @@ -import { LiteralClient, Step, Thread } from '..'; +import { LiteralClient } from '..'; import { LiteralCallbackHandler } from './langchain'; import { instrumentLlamaIndex, withThread } from './llamaindex'; -import instrumentOpenAI, { - InstrumentOpenAIOptions, - OpenAIOutput -} from './openai'; +import instrumentOpenAI from './openai'; +import { InstrumentOpenAIOptions } from './openai'; import { makeInstrumentVercelSDK } from './vercel-sdk'; export type { InstrumentOpenAIOptions } from './openai'; export default (client: LiteralClient) => ({ - openai: ( - output: OpenAIOutput, - parent?: Step | Thread, - options?: InstrumentOpenAIOptions - ) => instrumentOpenAI(client, output, parent, options), + openai: (options?: InstrumentOpenAIOptions) => + instrumentOpenAI(client, options), langchain: { literalCallback: (threadId?: string) => { try { diff --git a/src/instrumentation/openai.ts b/src/instrumentation/openai.ts index 6bdce4f..f489c95 100644 --- a/src/instrumentation/openai.ts +++ b/src/instrumentation/openai.ts @@ -18,26 +18,14 @@ import { Thread } from '..'; -const openaiReqs: Record< - string, - { - // Record the ID of the request - id: string; - // Record the start time of the request - start: number; - // Record the inputs of the request - inputs: Record; - // Record the stream of the request if it's a streaming request - stream?: Stream; - } -> = {}; - // Define a generic type for the original function to be wrapped type OriginalFunction = (...args: T) => Promise; // Utility function to wrap a method function wrapFunction( - originalFunction: OriginalFunction + originalFunction: OriginalFunction, + client: LiteralClient, + options: InstrumentOpenAIOptions = {} ): OriginalFunction { return async function (this: any, ...args: T): Promise { const start = Date.now(); @@ -46,58 +34,57 @@ function wrapFunction( const result = await originalFunction.apply(this, args); if (result instanceof Stream) { - const streamResult = result as Stream; - // If it is a streaming request, we need to process the first token to get the id - // However we also need to tee the stream so that the end developer can process the stream - const [a, b] = streamResult.tee(); - // Re split the stream to store a clean instance for final processing later on - const c = a.tee()[0]; - let id; - // Iterate over the stream to find the first chunk and store the id - for await (const chunk of a) { - id = chunk.id; - if (!openaiReqs[id]) { - openaiReqs[id] = { - id, - inputs: args[0], - start, - stream: c - }; - break; - } - } - // @ts-expect-error Hacky way to add the id to the stream - b.id = id; + const streamResult = result; + const [returnedResult, processedResult] = streamResult.tee(); + + await processOpenAIOutput(client, processedResult, { + ...options, + start, + inputs: args[0] + }); - return b as any; + return returnedResult as R; } else { - const regularResult = result as ChatCompletion | Completion; - const id = regularResult.id; - openaiReqs[id] = { - id, - inputs: args[0], - start - }; + await processOpenAIOutput(client, result as ChatCompletion | Completion, { + ...options, + start, + inputs: args[0] + }); + return result; } }; } -// Patching the chat.completions.create function -const originalChatCompletionsCreate = OpenAI.Chat.Completions.prototype.create; -OpenAI.Chat.Completions.prototype.create = wrapFunction( - originalChatCompletionsCreate -) as any; - -// Patching the completions.create function -const originalCompletionsCreate = OpenAI.Completions.prototype.create; -OpenAI.Completions.prototype.create = wrapFunction( - originalCompletionsCreate -) as any; - -// Patching the completions.create function -const originalImagesGenerate = OpenAI.Images.prototype.generate; -OpenAI.Images.prototype.generate = wrapFunction(originalImagesGenerate) as any; +function instrumentOpenAI( + client: LiteralClient, + options: InstrumentOpenAIOptions = {} +) { + // Patching the chat.completions.create function + const originalChatCompletionsCreate = + OpenAI.Chat.Completions.prototype.create; + OpenAI.Chat.Completions.prototype.create = wrapFunction( + originalChatCompletionsCreate, + client, + options + ) as any; + + // Patching the completions.create function + const originalCompletionsCreate = OpenAI.Completions.prototype.create; + OpenAI.Completions.prototype.create = wrapFunction( + originalCompletionsCreate, + client, + options + ) as any; + + // Patching the images.generate function + const originalImagesGenerate = OpenAI.Images.prototype.generate; + OpenAI.Images.prototype.generate = wrapFunction( + originalImagesGenerate, + client, + options + ) as any; +} function processChatDelta( newDelta: ChatCompletionChunk.Choice.Delta, @@ -296,22 +283,49 @@ export interface InstrumentOpenAIOptions { tags?: Maybe; } -const instrumentOpenAI = async ( +export interface ProcessOpenAIOutput extends InstrumentOpenAIOptions { + start: number; + inputs: Record; +} + +function isStream(obj: any): boolean { + return ( + obj !== null && + typeof obj === 'object' && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + typeof obj.read === 'function' + ); +} + +const processOpenAIOutput = async ( client: LiteralClient, output: OpenAIOutput, - parent?: Step | Thread, - options: InstrumentOpenAIOptions = {} + { start, tags, inputs }: ProcessOpenAIOutput ) => { - //@ts-expect-error - This is a hacky way to get the id from the stream - const outputId = output.id; - const { stream, start, inputs } = openaiReqs[outputId]; const baseGeneration = { provider: 'openai', model: inputs.model, settings: getSettings(inputs), - tags: options.tags + tags: tags }; + let threadFromStore: Thread | null = null; + try { + threadFromStore = client.getCurrentThread(); + } catch (error) { + // Ignore error thrown if getCurrentThread is called outside of a context + } + + let stepFromStore: Step | null = null; + try { + stepFromStore = client.getCurrentStep(); + } catch (error) { + // Ignore error thrown if getCurrentStep is called outside of a context + } + + const parent = stepFromStore || threadFromStore; + if ('data' in output) { // Image Generation @@ -322,14 +336,16 @@ const instrumentOpenAI = async ( output: output, startTime: new Date(start).toISOString(), endTime: new Date().toISOString(), - tags: options.tags + tags: tags }; const step = parent ? parent.step(stepData) : client.step({ ...stepData, type: 'run' }); await step.send(); - } else if (output instanceof Stream) { + } else if (output instanceof Stream || isStream(output)) { + const stream = output as Stream; + if (!stream) { throw new Error('Stream not found'); } @@ -460,8 +476,6 @@ const instrumentOpenAI = async ( } } } - - delete openaiReqs[outputId]; }; export default instrumentOpenAI; diff --git a/tests/integration/openai.test.ts b/tests/integration/openai.test.ts index a7231cf..652e48e 100644 --- a/tests/integration/openai.test.ts +++ b/tests/integration/openai.test.ts @@ -1,130 +1,404 @@ import 'dotenv/config'; import OpenAI from 'openai'; +import { PassThrough } from 'stream'; +import { v4 as uuidv4 } from 'uuid'; -import { LiteralClient } from '../../src'; +import { + ChatGeneration, + LiteralClient, + Maybe, + OmitUtils, + Step +} from '../../src'; + +const url = process.env.LITERAL_API_URL; +const apiKey = process.env.LITERAL_API_KEY; + +if (!url || !apiKey) { + throw new Error('Missing environment variables'); +} + +const openai = new OpenAI({ apiKey: 'an-ocean-of-noise' }); // Skip for the CI -describe.skip('OpenAI Instrumentation', () => { - let client: LiteralClient; +describe('OpenAI Instrumentation', () => { + // Mock OpenAI Calls + beforeAll(() => { + /* @ts-expect-error the mock is incomplete but that's OK */ + OpenAI.Chat.Completions.prototype.create = jest.fn( + ({ stream }: { stream: boolean }) => { + if (stream) { + const generationId = uuidv4(); + const stream = new PassThrough({ objectMode: true }); - beforeAll(function () { - const url = process.env.LITERAL_API_URL; - const apiKey = process.env.LITERAL_API_KEY; + stream.write({ + id: generationId, + object: 'chat.completion.chunk', + choices: [ + { + delta: { role: 'assistant', content: 'Ottawa' }, + index: 0, + finish_reason: null + } + ] + }); - if (!url || !apiKey) { - throw new Error('Missing environment variables'); - } + stream.write({ + id: generationId, + object: 'chat.completion.chunk', + choices: [ + { + delta: { role: 'assistant', content: ' is' }, + index: 0, + finish_reason: null + } + ] + }); - client = new LiteralClient(apiKey, url); - }); + stream.end({ + id: generationId, + object: 'chat.completion.chunk', + choices: [ + { + delta: { role: 'assistant', content: ' the capital of Canada' }, + index: 0, + finish_reason: 'stop' + } + ] + }); - it('should monitor simple generation', async () => { - const spy = jest.spyOn(client.api, 'createGeneration'); + return stream; + } - const openai = new OpenAI(); + return Promise.resolve({ + id: uuidv4(), + object: 'chat.completion', + choices: [ + { + message: { + role: 'assistant', + content: 'Ottawa is the capital of Canada' + } + } + ] + }); + } + ); - const response = await openai.chat.completions.create({ - model: 'gpt-3.5-turbo', - messages: [ - { role: 'system', content: 'You are a helpful assistant.' }, - { role: 'user', content: 'What is the capital of Canada?' } - ] + /* @ts-expect-error the mock is incomplete but that's OK */ + OpenAI.Images.prototype.generate = jest.fn(() => { + return Promise.resolve({ + data: [{ url: 'https://example.com/image.png' }] + }); }); + }); + + describe('Streamed chat generation', () => { + let step: Maybe; + let generationFromStep: OmitUtils; - await client.instrumentation.openai(response); + beforeAll(async () => { + const testId = uuidv4(); - expect(response.choices[0].message.content).toBeTruthy(); + const client = new LiteralClient(apiKey, url); + client.instrumentation.openai({ tags: [testId] }); - expect(spy).toHaveBeenCalledWith( - expect.objectContaining({ - type: 'CHAT', - provider: 'openai', - model: 'gpt-3.5-turbo-0125', + await openai.chat.completions.create({ + model: 'gpt-3.5-turbo', messages: [ - { content: 'You are a helpful assistant.', role: 'system' }, - { content: 'What is the capital of Canada?', role: 'user' } + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'What is the capital of Canada?' } ], - messageCompletion: response.choices[0].message, - tokenCount: expect.any(Number) - }) - ); - }); + stream: true + }); - it('should monitor streamed generation', async () => { - const spy = jest.spyOn(client.api, 'createGeneration'); + const { + data: [generation] + } = await client.api.getGenerations({ + filters: [ + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); - const openai = new OpenAI(); + step = await client.api.getStep(generation.id); + generationFromStep = step!.generation!; + }); - const response = await openai.chat.completions.create({ - model: 'gpt-3.5-turbo', - messages: [ - { role: 'system', content: 'You are a helpful assistant.' }, - { role: 'user', content: 'What is the capital of Switzerland?' } - ], - stream: true + it('should create a generation with no thread or parent', async () => { + expect(step?.threadId).toBeNull(); + expect(step?.parentId).toBeNull(); + expect(step?.type).toBe('llm'); }); - await client.instrumentation.openai(response); + it("should log a generation's input & output", async () => { + expect(generationFromStep.messages).toEqual([ + { + role: 'system', + content: 'You are a helpful assistant.' + }, + { + role: 'user', + content: 'What is the capital of Canada?' + } + ]); + expect(generationFromStep.messageCompletion).toEqual({ + role: 'assistant', + content: 'Ottawa is the capital of Canada' + }); + }); - let resultText = ''; - // use stream as an async iterable: - for await (const chunk of response) { - resultText += chunk.choices[0].delta.content ?? ''; - } + it("should log a generation's settings", async () => { + expect(generationFromStep.provider).toBe('openai'); + expect(generationFromStep.model).toContain('gpt-3.5-turbo'); + expect(generationFromStep.tokenCount).toEqual(expect.any(Number)); + expect(generationFromStep.inputTokenCount).toEqual(expect.any(Number)); + expect(generationFromStep.outputTokenCount).toEqual(expect.any(Number)); + }); + }); - expect(resultText).toBeTruthy(); + describe('Outside of a thread or step wrapper', () => { + describe('Simple chat generation', () => { + let step: Maybe; + let generationFromStep: OmitUtils; + let response: OpenAI.ChatCompletion; - expect(spy).toHaveBeenCalledWith( - expect.objectContaining({ - type: 'CHAT', - provider: 'openai', - model: 'gpt-3.5-turbo-0125', - messages: [ - { content: 'You are a helpful assistant.', role: 'system' }, - { content: 'What is the capital of Switzerland?', role: 'user' } - ], - messageCompletion: { + beforeAll(async () => { + const testId = uuidv4(); + + const client = new LiteralClient(apiKey, url); + + client.instrumentation.openai({ tags: [testId] }); + + response = await openai.chat.completions.create({ + model: 'gpt-3.5-turbo', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'What is the capital of Canada?' } + ] + }); + + const { + data: [generation] + } = await client.api.getGenerations({ + filters: [ + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); + + step = await client.api.getStep(generation.id); + generationFromStep = step!.generation!; + }); + + it('should create a generation with no thread or parent', async () => { + expect(step?.threadId).toBeNull(); + expect(step?.parentId).toBeNull(); + expect(step?.type).toBe('llm'); + }); + + it("should log a generation's input & output", async () => { + expect(generationFromStep.messages).toEqual([ + { + role: 'system', + content: 'You are a helpful assistant.' + }, + { + role: 'user', + content: 'What is the capital of Canada?' + } + ]); + expect(generationFromStep.messageCompletion).toEqual({ role: 'assistant', - content: resultText - }, - duration: expect.any(Number), - ttFirstToken: expect.any(Number), - outputTokenCount: expect.any(Number), - tokenThroughputInSeconds: expect.any(Number) - }) - ); + content: response.choices[0].message.content + }); + }); + + it("should log a generation's settings", async () => { + expect(generationFromStep.provider).toBe('openai'); + expect(generationFromStep.model).toContain('gpt-3.5-turbo'); + expect(generationFromStep.tokenCount).toEqual(expect.any(Number)); + expect(generationFromStep.inputTokenCount).toEqual(expect.any(Number)); + expect(generationFromStep.outputTokenCount).toEqual(expect.any(Number)); + }); + }); + + describe('Image generation', () => { + it('should monitor image generation', async () => { + const testId = uuidv4(); + + const client = new LiteralClient(apiKey, url); + client.instrumentation.openai({ tags: [testId] }); + + const response = await openai.images.generate({ + prompt: 'A painting of a rose in the style of Picasso.', + model: 'dall-e-2', + size: '256x256', + n: 1 + }); + + const { + data: [step] + } = await client.api.getSteps({ + first: 1, + filters: [ + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); + + expect(step?.threadId).toBeNull(); + expect(step?.parentId).toBeNull(); + expect(step?.type).toBe('run'); + + expect(step?.output?.data[0].url).toEqual(response.data[0].url); + }, 30000); + }); }); - it('should monitor image generation', async () => { - const spy = jest.spyOn(client.api, 'sendSteps'); + describe('Inside of a thread or step wrapper', () => { + it('logs the generation inside its thread and parent', async () => { + const testId = uuidv4(); + + const client = new LiteralClient(apiKey, url); + client.instrumentation.openai({ tags: [testId] }); + + let threadId: Maybe; + let parentId: Maybe; - const openai = new OpenAI(); + await client.thread({ name: 'openai' }).wrap(async () => { + threadId = client.getCurrentThread().id; + return client.run({ name: 'openai' }).wrap(async () => { + parentId = client.getCurrentStep().id; - const response = await openai.images.generate({ - prompt: 'A painting of a rose in the style of Picasso.', - model: 'dall-e-2', - size: '256x256', - n: 1 + await openai.chat.completions.create({ + model: 'gpt-3.5-turbo', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'What is the capital of Canada?' } + ] + }); + }); + }); + + const { + data: [step] + } = await client.api.getSteps({ + first: 1, + filters: [ + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); + + expect(step?.threadId).toBe(threadId); + expect(step?.parentId).toBe(parentId); }); - await client.instrumentation.openai(response); + it("doesn't mix up threads and steps", async () => { + const testId = uuidv4(); + + const client = new LiteralClient(apiKey, url); + client.instrumentation.openai({ tags: [testId] }); + + const firstThreadId = uuidv4(); + const secondThreadId = uuidv4(); + + let firstStep: Maybe; + let secondStep: Maybe; + + await Promise.all([ + client + .thread({ id: firstThreadId, name: 'Thread 1' }) + .wrap(async () => { + return client + .step({ name: 'Step 1', type: 'assistant_message' }) + .wrap(async () => { + firstStep = client.getCurrentStep(); - expect(response.data[0].url).toBeTruthy(); + return openai.chat.completions.create({ + model: 'gpt-3.5-turbo', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'What is the capital of Canada?' } + ] + }); + }); + }), + client + .thread({ id: secondThreadId, name: 'Thread 2' }) + .wrap(async () => { + return client + .step({ name: 'Step 2', type: 'assistant_message' }) + .wrap(async () => { + secondStep = client.getCurrentStep(); - expect(spy).toHaveBeenCalledWith( - expect.arrayContaining([ - expect.objectContaining({ - name: 'dall-e-2', - type: 'run', - input: { - model: 'dall-e-2', - prompt: 'A painting of a rose in the style of Picasso.', - size: '256x256', - n: 1 + return openai.chat.completions.create({ + model: 'gpt-3.5-turbo', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'What is the capital of Canada?' } + ] + }); + }); + }) + ]); + + const { + data: [firstGeneration] + } = await client.api.getSteps({ + first: 1, + filters: [ + { + field: 'threadId', + operator: 'eq', + value: firstThreadId! }, - output: response - }) - ]) - ); + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); + + const { + data: [secondGeneration] + } = await client.api.getSteps({ + first: 1, + filters: [ + { + field: 'threadId', + operator: 'eq', + value: secondThreadId! + }, + { + field: 'tags', + operator: 'in', + value: [testId] + } + ] + }); + + expect(firstStep?.threadId).toEqual(firstThreadId); + expect(secondStep?.threadId).toEqual(secondThreadId); + + expect(firstGeneration?.threadId).toEqual(firstThreadId); + expect(firstGeneration?.parentId).toEqual(firstStep?.id); + expect(secondGeneration?.threadId).toEqual(secondThreadId); + expect(secondGeneration?.parentId).toEqual(secondStep?.id); + }, 30000); }); });