Skip to content

Commit a9a937d

Browse files
authored
Implement example of streamText fallback using an AI data stream (#2068)
1 parent 14e081e commit a9a937d

File tree

1 file changed

+353
-2
lines changed
  • references/d3-chat/src/trigger

1 file changed

+353
-2
lines changed

references/d3-chat/src/trigger/chat.ts

Lines changed: 353 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
import { anthropic } from "@ai-sdk/anthropic";
22
import { openai } from "@ai-sdk/openai";
33
import { ai } from "@trigger.dev/sdk/ai";
4-
import { logger, metadata, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3";
4+
import { logger, metadata, runs, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3";
55
import { sql } from "@vercel/postgres";
6-
import { streamText, TextStreamPart, tool } from "ai";
6+
import {
7+
CoreMessage,
8+
createDataStream,
9+
DataStreamWriter,
10+
streamText,
11+
TextStreamPart,
12+
tool,
13+
} from "ai";
714
import { nanoid } from "nanoid";
815
import { z } from "zod";
916
import { sendSQLApprovalMessage } from "../lib/slack";
@@ -267,3 +274,347 @@ export const interruptibleChat = schemaTask({
267274
}
268275
},
269276
});
277+
278+
async function createStreamWithProvider(params: {
279+
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>;
280+
messages: CoreMessage[];
281+
message_request_id: string;
282+
chat_id: string;
283+
userId?: string;
284+
}) {
285+
const { model, messages, message_request_id, chat_id, userId } = params;
286+
287+
return new Promise<string>((resolve, reject) => {
288+
const dataStreamResponse = createDataStream({
289+
execute: async (dataStream) => {
290+
const result = streamText({
291+
model,
292+
system: "This is the system prompt, please be nice.",
293+
messages,
294+
maxSteps: 20,
295+
toolCallStreaming: true,
296+
onError: (error) => {
297+
logger.error("Error in chatStream task (streamText)", {
298+
error: error instanceof Error ? error.message : "Unknown error",
299+
stack: error instanceof Error ? error.stack : undefined,
300+
provider: model.provider,
301+
});
302+
reject(error);
303+
},
304+
onChunk: async (chunk) => {
305+
console.log("Chunk:", chunk);
306+
},
307+
onFinish: async ({ response, reasoning }) => {
308+
metadata.flush();
309+
logger.info("AI stream finished", {
310+
chat_id,
311+
userId,
312+
messageCount: response.messages.length,
313+
provider: model.provider,
314+
});
315+
316+
if (userId) {
317+
try {
318+
// Pretend to save messages
319+
await new Promise((resolve) => setTimeout(resolve, 1000));
320+
321+
logger.info("Successfully saved AI response messages", {
322+
chat_id,
323+
userId,
324+
messageCount: response.messages.length,
325+
message: JSON.stringify(response.messages, null, 2),
326+
provider: model.provider,
327+
});
328+
} catch (error) {
329+
logger.error("Failed to save AI response messages", {
330+
error: error instanceof Error ? error.message : "Unknown error",
331+
stack: error instanceof Error ? error.stack : undefined,
332+
chat_id,
333+
userId,
334+
provider: model.provider,
335+
});
336+
}
337+
}
338+
},
339+
});
340+
341+
result.consumeStream();
342+
343+
result.mergeIntoDataStream(dataStream, {
344+
sendReasoning: true,
345+
});
346+
},
347+
onError: (error) => {
348+
logger.error("Error in chatStream task (createDataStream)", {
349+
error: error instanceof Error ? error.message : "Unknown error",
350+
stack: error instanceof Error ? error.stack : undefined,
351+
provider: model.provider,
352+
});
353+
reject(error);
354+
return error instanceof Error ? error.message : String(error);
355+
},
356+
});
357+
358+
// Process the stream
359+
(async () => {
360+
try {
361+
const stream = await metadata.stream("dataStream", dataStreamResponse);
362+
let fullResponse = "";
363+
364+
for await (const chunk of stream) {
365+
fullResponse += chunk;
366+
}
367+
368+
// Only resolve if we haven't rejected due to an error
369+
resolve(fullResponse);
370+
} catch (error) {
371+
reject(error);
372+
}
373+
})();
374+
});
375+
}
376+
377+
export const chatStream = schemaTask({
378+
id: "chat-stream",
379+
description: "Stream data from the AI SDK and use tools",
380+
schema: z.object({
381+
chat_id: z.string().default("chat"),
382+
messages: z.array(z.unknown()).describe("Array of chat messages"),
383+
message_request_id: z.string().describe("Unique identifier for the message request"),
384+
model: z.string().default("claude-3-7-sonnet-20250219"),
385+
userId: z.string().optional().describe("User ID for authentication"),
386+
existingProject: z.boolean().default(false).describe("Whether the project already exists"),
387+
}),
388+
machine: "large-2x",
389+
run: async ({ chat_id, messages, model, userId, message_request_id }) => {
390+
logger.info("Running chat stream", {
391+
chat_id,
392+
messages,
393+
model,
394+
userId,
395+
message_request_id,
396+
});
397+
398+
try {
399+
// First try with Anthropic
400+
return await createStreamWithProvider({
401+
model: anthropic(model),
402+
messages: messages as CoreMessage[],
403+
message_request_id,
404+
chat_id,
405+
userId,
406+
});
407+
} catch (error) {
408+
logger.info("Anthropic stream failed, falling back to OpenAI", {
409+
error: error instanceof Error ? error.message : "Unknown error",
410+
stack: error instanceof Error ? error.stack : undefined,
411+
chat_id,
412+
userId,
413+
message_request_id,
414+
});
415+
416+
try {
417+
// Fallback to OpenAI
418+
return await createStreamWithProvider({
419+
model: openai("gpt-4"),
420+
messages: messages as CoreMessage[],
421+
message_request_id,
422+
chat_id,
423+
userId,
424+
});
425+
} catch (fallbackError) {
426+
logger.error("Both Anthropic and OpenAI streams failed", {
427+
error: fallbackError instanceof Error ? fallbackError.message : "Unknown error",
428+
stack: fallbackError instanceof Error ? fallbackError.stack : undefined,
429+
chat_id,
430+
userId,
431+
message_request_id,
432+
});
433+
throw fallbackError;
434+
}
435+
}
436+
},
437+
});
438+
439+
export const chatStreamCaller = schemaTask({
440+
id: "chat-stream-caller",
441+
description: "Call the chat stream",
442+
schema: z.object({
443+
prompt: z.string().describe("The prompt to chat with the AI"),
444+
}),
445+
run: async ({ prompt }, { ctx }) => {
446+
const result = await chatStream.trigger({
447+
messages: [
448+
{
449+
role: "user",
450+
content: prompt,
451+
},
452+
],
453+
message_request_id: ctx.run.id,
454+
});
455+
456+
const stream = await runs.fetchStream(result.id, "dataStream");
457+
458+
for await (const chunk of stream) {
459+
console.log("Chunk:", chunk);
460+
}
461+
462+
return result;
463+
},
464+
});
465+
466+
export const streamFetcher = schemaTask({
467+
id: "stream-fetcher",
468+
description: "Fetch a stream",
469+
schema: z.object({
470+
runId: z.string().describe("The run ID to fetch the stream for"),
471+
streamId: z.string().describe("The stream ID to fetch"),
472+
}),
473+
run: async ({ runId, streamId }) => {
474+
const result = await runs.fetchStream(runId, streamId);
475+
476+
for await (const chunk of result) {
477+
console.log("Chunk:", chunk);
478+
}
479+
480+
return result;
481+
},
482+
});
483+
484+
export const chatStream2 = schemaTask({
485+
id: "chat-stream-2",
486+
description: "Stream data from the AI SDK and use tools",
487+
schema: z.object({
488+
chat_id: z.string().default("chat"),
489+
messages: z.array(z.unknown()).describe("Array of chat messages"),
490+
message_request_id: z.string().describe("Unique identifier for the message request"),
491+
model: z.string().default("claude-3-7-sonnet-20250219"),
492+
userId: z.string().optional().describe("User ID for authentication"),
493+
existingProject: z.boolean().default(false).describe("Whether the project already exists"),
494+
}),
495+
machine: "large-2x",
496+
run: async ({ chat_id, messages, model, userId, message_request_id }) => {
497+
logger.info("Running chat stream", {
498+
chat_id,
499+
messages,
500+
model,
501+
userId,
502+
message_request_id,
503+
});
504+
505+
const dataStreamResponse = createDataStream({
506+
execute: async (dataStream) => {
507+
streamTextWithModel(
508+
dataStream,
509+
anthropic(model),
510+
messages as CoreMessage[],
511+
chat_id,
512+
openai("gpt-4"),
513+
userId
514+
);
515+
},
516+
});
517+
518+
const stream = await metadata.stream("dataStream", dataStreamResponse);
519+
520+
for await (const chunk of stream) {
521+
console.log("Chunk:", chunk);
522+
}
523+
},
524+
});
525+
526+
function streamTextWithModel(
527+
dataStream: DataStreamWriter,
528+
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>,
529+
messages: CoreMessage[],
530+
chat_id: string,
531+
fallbackModel?: ReturnType<typeof anthropic> | ReturnType<typeof openai>,
532+
userId?: string
533+
) {
534+
const result = streamText({
535+
model,
536+
system: "This is the system prompt, please be nice.",
537+
messages,
538+
maxSteps: 20,
539+
toolCallStreaming: true,
540+
onError: (error) => {
541+
logger.error("Error in chatStream task (streamText)", {
542+
error: error instanceof Error ? error.message : "Unknown error",
543+
stack: error instanceof Error ? error.stack : undefined,
544+
provider: model.provider,
545+
});
546+
547+
if (fallbackModel) {
548+
streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId);
549+
}
550+
},
551+
onChunk: async (chunk) => {
552+
console.log("Chunk:", chunk);
553+
},
554+
onFinish: async ({ response, reasoning }) => {
555+
metadata.flush();
556+
logger.info("AI stream finished", {
557+
chat_id,
558+
userId,
559+
messageCount: response.messages.length,
560+
provider: model.provider,
561+
});
562+
563+
if (userId) {
564+
try {
565+
// Pretend to save messages
566+
await new Promise((resolve) => setTimeout(resolve, 1000));
567+
568+
logger.info("Successfully saved AI response messages", {
569+
chat_id,
570+
userId,
571+
messageCount: response.messages.length,
572+
message: JSON.stringify(response.messages, null, 2),
573+
provider: model.provider,
574+
});
575+
} catch (error) {
576+
logger.error("Failed to save AI response messages", {
577+
error: error instanceof Error ? error.message : "Unknown error",
578+
stack: error instanceof Error ? error.stack : undefined,
579+
chat_id,
580+
userId,
581+
provider: model.provider,
582+
});
583+
}
584+
}
585+
},
586+
});
587+
588+
result.consumeStream();
589+
590+
result.mergeIntoDataStream(dataStream, {
591+
sendReasoning: true,
592+
});
593+
}
594+
595+
export const chatStreamCaller2 = schemaTask({
596+
id: "chat-stream-caller-2",
597+
description: "Call the chat stream",
598+
schema: z.object({
599+
prompt: z.string().describe("The prompt to chat with the AI"),
600+
}),
601+
run: async ({ prompt }, { ctx }) => {
602+
const result = await chatStream2.trigger({
603+
messages: [
604+
{
605+
role: "user",
606+
content: prompt,
607+
},
608+
],
609+
message_request_id: ctx.run.id,
610+
});
611+
612+
const stream = await runs.fetchStream(result.id, "dataStream");
613+
614+
for await (const chunk of stream) {
615+
console.log("Chunk:", chunk);
616+
}
617+
618+
return result;
619+
},
620+
});

0 commit comments

Comments
 (0)