Skip to content
225 changes: 132 additions & 93 deletions server/api/chat/agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ import {
getConnectorByExternalId,
getConnectorByApp,
getConnectorById,
} from "@/db/connector";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { SSEClientTransport, type SSEClientTransportOptions } from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport, type StreamableHTTPClientTransportOptions } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
} from "@/db/connector"
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
import {
SSEClientTransport,
type SSEClientTransportOptions,
} from "@modelcontextprotocol/sdk/client/sse.js"
import {
StreamableHTTPClientTransport,
type StreamableHTTPClientTransportOptions,
} from "@modelcontextprotocol/sdk/client/streamableHttp.js"
import {
Models,
QueryType,
Expand Down Expand Up @@ -187,6 +193,7 @@ import {
} from "./chat"
import { agentTools } from "./tools"
import { internalTools, mapGithubToolResponse } from "@/api/chat/mapper"
import { getRecordBypath } from "@/db/knowledgeBase"
const {
JwtPayloadKey,
chatHistoryPageSize,
Expand Down Expand Up @@ -1109,13 +1116,13 @@ export const MessageWithToolsApi = async (c: Context) => {
const client = new Client({
name: `connector-${connectorId}`,
version: connector.config.version,
});
})
try {
const loadedConfig = connector.config as {
url?: string;
command?: string;
args?: string[];
mode?: "sse" | "streamable-http";
url?: string
command?: string
args?: string[]
mode?: "sse" | "streamable-http"
version: string
}

Expand All @@ -1142,7 +1149,7 @@ export const MessageWithToolsApi = async (c: Context) => {
loadedHeaders["Authorization"] =
`Bearer ${connector.apiKey}`
}
loggerWithChild({ email: sub }).info(
loggerWithChild({ email: sub }).info(
`Connecting to MCP client at ${loadedUrl} with mode: ${loadedMode}`,
)

Expand All @@ -1153,15 +1160,18 @@ export const MessageWithToolsApi = async (c: Context) => {
headers: loadedHeaders,
},
}
await client.connect(
new StreamableHTTPClientTransport(new URL(loadedUrl), transportOptions),
)
} else {
await client.connect(
new StreamableHTTPClientTransport(
new URL(loadedUrl),
transportOptions,
),
)
} else {
// 'sse' mode
const transportOptions: SSEClientTransportOptions = {
requestInit: {
headers: loadedHeaders,
}
},
}
await client.connect(
new SSEClientTransport(
Expand All @@ -1183,7 +1193,7 @@ export const MessageWithToolsApi = async (c: Context) => {
)
} else {
throw new Error(
"Invalid MCP connector configuration: missing url or command."
"Invalid MCP connector configuration: missing url or command.",
)
}
} catch (error) {
Expand Down Expand Up @@ -2759,6 +2769,7 @@ export const AgentMessageApiRagOff = async (c: Context) => {
let email = ""
let workspaceId = ""
let via_apiKey = false
let body

try {
let jwtPayload
Expand All @@ -2768,17 +2779,19 @@ export const AgentMessageApiRagOff = async (c: Context) => {
if (jwtPayload) {
email = jwtPayload?.sub
workspaceId = jwtPayload?.workspaceId
// @ts-ignore
body = c.req.valid("query")
} else {
email = c.get("userEmail") ?? ""
workspaceId = c.get("workspaceId") ?? ""
via_apiKey = true
// @ts-ignore
body = c.req.valid("json")
}
loggerWithChild({ email: email }).info("AgentMessageApiRagOff..")
rootSpan.setAttribute("email", email)
rootSpan.setAttribute("workspaceId", workspaceId)

// @ts-ignore
const body = c.req.valid("query")
const attachmentMetadata = parseAttachmentMetadata(c)
const attachmentFileIds = attachmentMetadata.map(
(m: AttachmentMetadata) => m.fileId,
Expand Down Expand Up @@ -2823,7 +2836,6 @@ export const AgentMessageApiRagOff = async (c: Context) => {
}
message = decodeURIComponent(message)
rootSpan.setAttribute("message", message)

const isMsgWithContext = isMessageWithContext(message)
const extractedInfo = isMsgWithContext
? await extractFileIdsFromMessage(message, email)
Expand Down Expand Up @@ -3456,6 +3468,7 @@ export const AgentMessageApi = async (c: Context) => {
let email = ""
let workspaceId = ""
let via_apiKey = false
let body

try {
let jwtPayload
Expand All @@ -3466,17 +3479,19 @@ export const AgentMessageApi = async (c: Context) => {
if (jwtPayload) {
email = jwtPayload?.sub
workspaceId = jwtPayload?.workspaceId
// @ts-ignore
body = c.req.valid("query")
} else {
// fallback if JwtPayloadKey is not available
email = c.get("userEmail") ?? ""
workspaceId = c.get("workspaceId") ?? ""
via_apiKey = true
// @ts-ignore
body = c.req.valid("json")
}
rootSpan.setAttribute("email", email)
rootSpan.setAttribute("workspaceId", workspaceId)

// @ts-ignore
const body = c.req.valid("query")
const attachmentMetadata = parseAttachmentMetadata(c)
const attachmentFileIds = attachmentMetadata.map(
(m: AttachmentMetadata) => m.fileId,
Expand All @@ -3489,6 +3504,7 @@ export const AgentMessageApi = async (c: Context) => {
isReasoningEnabled,
agentId,
streamOff,
path,
}: MessageReqType = body
// const agentPrompt = agentId && isCuid(agentId) ? agentId : "";
const userAndWorkspace = await getUserAndWorkspaceByEmail(
Expand Down Expand Up @@ -3528,14 +3544,18 @@ export const AgentMessageApi = async (c: Context) => {
// Truncate table chats,connectors,nessages;
message = decodeURIComponent(message)
rootSpan.setAttribute("message", message)

const isMsgWithContext = isMessageWithContext(message)
const extractedInfo = isMsgWithContext
? await extractFileIdsFromMessage(message, email)
: {
totalValidFileIdsFromLinkCount: 0,
fileIds: [],
}
let ids
if (path) {
ids = await getRecordBypath(path, db)
}
const isMsgWithContext = isMessageWithContext(message) || (path && ids)
const extractedInfo =
isMsgWithContext || (path && ids)
? await extractFileIdsFromMessage(message, email, ids)
: {
totalValidFileIdsFromLinkCount: 0,
fileIds: [],
}
const fileIds = extractedInfo?.fileIds
const agentDocs = agentForDb?.docIds || []

Expand Down Expand Up @@ -4000,71 +4020,90 @@ export const AgentMessageApi = async (c: Context) => {
}
})

Logger.info(
"Checking if answer is in the conversation or a mandatory query rewrite is needed before RAG",
)
// Limit messages to last 5 for the first LLM call if it's a new chat
const limitedMessages = messagesWithNoErrResponse.slice(-8)

// Extract previous classification for pagination and follow-up queries
let previousClassification: QueryRouterLLMResponse | null = null
if (messages.length >= 2) {
const previousUserMessage = messages[messages.length - 2]
if (previousUserMessage?.queryRouterClassification && previousUserMessage.messageRole === "user") {
try {
const parsedClassification =
typeof previousUserMessage.queryRouterClassification === "string"
? JSON.parse(previousUserMessage.queryRouterClassification)
: previousUserMessage.queryRouterClassification
previousClassification = parsedClassification as QueryRouterLLMResponse
Logger.info(`Found previous classification in agents: ${JSON.stringify(previousClassification)}`)
} catch (error) {
Logger.error(`Error parsing previous classification in agents: ${error}`)
Logger.info(
"Checking if answer is in the conversation or a mandatory query rewrite is needed before RAG",
)
// Limit messages to last 5 for the first LLM call if it's a new chat
const limitedMessages = messagesWithNoErrResponse.slice(-8)

// Extract previous classification for pagination and follow-up queries
let previousClassification: QueryRouterLLMResponse | null = null
if (messages.length >= 2) {
const previousUserMessage = messages[messages.length - 2]
if (
previousUserMessage?.queryRouterClassification &&
previousUserMessage.messageRole === "user"
) {
try {
const parsedClassification =
typeof previousUserMessage.queryRouterClassification ===
"string"
? JSON.parse(
previousUserMessage.queryRouterClassification,
)
: previousUserMessage.queryRouterClassification
previousClassification =
parsedClassification as QueryRouterLLMResponse
Logger.info(
`Found previous classification in agents: ${JSON.stringify(previousClassification)}`,
)
} catch (error) {
Logger.error(
`Error parsing previous classification in agents: ${error}`,
)
}
}
}
}

const searchOrAnswerIterator =
generateSearchQueryOrAnswerFromConversation(message, ctx, {
modelId:
ragPipelineConfig[RagPipelineStages.AnswerOrSearch].modelId,
stream: true,
json: true,
reasoning:
userRequestsReasoning &&
ragPipelineConfig[RagPipelineStages.AnswerOrSearch].reasoning,
messages: limitedMessages,
agentPrompt: agentPromptForLLM,
}, undefined, previousClassification)

// TODO: for now if the answer is from the conversation itself we don't
// add any citations for it, we can refer to the original message for citations
// one more bug is now llm automatically copies the citation text sometimes without any reference
// leads to [NaN] in the answer
let currentAnswer = ""
let answer = ""
let citations = []
let imageCitations: any = []
let citationMap: Record<number, number> = {}
let queryFilters = {
apps: [],
entities: [],
startTime: "",
endTime: "",
count: 0,
sortDirection: "",
intent: {},
offset: 0,
}
let parsed = {
answer: "",
queryRewrite: "",
temporalDirection: null,
filter_query: "",
type: "",
intent: {},
filters: queryFilters,
}

const searchOrAnswerIterator =
generateSearchQueryOrAnswerFromConversation(
message,
ctx,
{
modelId:
ragPipelineConfig[RagPipelineStages.AnswerOrSearch]
.modelId,
stream: true,
json: true,
reasoning:
userRequestsReasoning &&
ragPipelineConfig[RagPipelineStages.AnswerOrSearch]
.reasoning,
messages: limitedMessages,
agentPrompt: agentPromptForLLM,
},
undefined,
previousClassification,
)

// TODO: for now if the answer is from the conversation itself we don't
// add any citations for it, we can refer to the original message for citations
// one more bug is now llm automatically copies the citation text sometimes without any reference
// leads to [NaN] in the answer
let currentAnswer = ""
let answer = ""
let citations = []
let imageCitations: any = []
let citationMap: Record<number, number> = {}
let queryFilters = {
apps: [],
entities: [],
startTime: "",
endTime: "",
count: 0,
sortDirection: "",
intent: {},
offset: 0,
}
let parsed = {
answer: "",
queryRewrite: "",
temporalDirection: null,
filter_query: "",
type: "",
intent: {},
filters: queryFilters,
}

let thinking = ""
let reasoning =
Expand Down
2 changes: 1 addition & 1 deletion server/api/chat/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ import {
getPublicAgentsByUser,
type SharedAgentUsageData,
} from "@/db/sharedAgentUsage"
import type { GroundingSupport } from "@google/genai"
import type {GroundingSupport } from "@google/genai"

const METADATA_NO_DOCUMENTS_FOUND = "METADATA_NO_DOCUMENTS_FOUND_INTERNAL"
const METADATA_FALLBACK_TO_RAG = "METADATA_FALLBACK_TO_RAG_INTERNAL"
Expand Down
Loading