diff --git a/server/ai/context.ts b/server/ai/context.ts index 723041fb6..04b04d6d8 100644 --- a/server/ai/context.ts +++ b/server/ai/context.ts @@ -32,6 +32,152 @@ import { getSortedScoredImageChunks, } from "@xyne/vespa-ts/mappers" import type { UserMetadataType } from "@/types" +import { querySheetChunks } from "@/lib/duckdb" +import { chunkSheetWithHeaders } from "@/sheetChunk" + +// Utility function to extract header from chunks and remove headers from each chunk +const extractHeaderAndDataChunks = ( + chunks_summary: (string | { chunk: string; score: number; index: number })[] | undefined, + matchfeatures?: any +): { + chunks_summary: (string | { chunk: string; score: number; index: number })[]; + matchfeatures?: any; +} => { + if (!chunks_summary || chunks_summary.length === 0) { + return { chunks_summary: [], matchfeatures }; + } + + // Find the header from the first chunk + let headerChunk = ''; + if (chunks_summary.length > 0) { + const firstChunk = typeof chunks_summary[0] === "string" ? chunks_summary[0] : chunks_summary[0].chunk; + const lines = firstChunk.split('\n'); + if (lines.length > 0 && lines[0].includes('\t')) { + headerChunk = lines[0]; // Extract the header line + } + } + + // Process all chunks: remove header from each and keep only data rows + const processedChunks: (string | { chunk: string; score: number; index: number })[] = []; + let newMatchfeatures = matchfeatures; + + // Add header as first chunk if found, using the same structure as original + if (headerChunk) { + if (typeof chunks_summary[0] === "string") { + processedChunks.push(headerChunk); + } else { + processedChunks.push({ + chunk: headerChunk, + score: 1, + index: 0, + }); + } + + // Update matchfeatures to include the header chunk score + if (newMatchfeatures) { + const existingCells = newMatchfeatures.chunk_scores?.cells || {}; + const scores = Object.values(existingCells) as number[]; + const maxScore = scores.length > 0 ? Math.max(...scores) : 0; + // Create new chunk_scores that match the new chunks + const newChunkScores: Record = {} + newChunkScores["0"] = maxScore + 1 + Object.entries(existingCells).forEach(([idx, score]) => { + newChunkScores[(parseInt(idx) + 1).toString()] = score as number + }) + + newMatchfeatures = { + ...newMatchfeatures, + chunk_scores: { + cells: newChunkScores + } + }; + } + } + + // Process each original chunk: remove header and add data rows + for (let i = 0; i < chunks_summary.length; i++) { + const originalChunk = chunks_summary[i]; + const chunkContent = typeof originalChunk === "string" ? originalChunk : originalChunk.chunk; + const lines = chunkContent.split('\n'); + + // Skip the first line (header) and keep only data rows + const dataRows = lines.slice(1).filter(line => line.trim().length > 0); + if (dataRows.length > 0) { + const dataContent = dataRows.join('\n'); + + if (typeof originalChunk === "string") { + processedChunks.push(dataContent); + } else { + processedChunks.push({ + chunk: dataContent, + score: originalChunk.score, + index: originalChunk.index + }); + } + } + } + + return { chunks_summary: processedChunks, matchfeatures: newMatchfeatures }; +}; + +// Utility function to process sheet queries for spreadsheet files +const processSheetQuery = async ( + chunks_summary: (string | { chunk: string; score: number; index: number })[] | undefined, + query: string, + matchfeatures: any +): Promise<{ + chunks_summary: { chunk: string; score: number; index: number }[]; + matchfeatures: any; + maxSummaryChunks: number; +} | null> => { + const duckDBResult = await querySheetChunks( + chunks_summary?.map((c) => typeof c === "string" ? c : c.chunk) || [], + query + ) + + // If DuckDB query failed (null means not metric-related or SQL generation failed), return null to fallback to original approach + if (!duckDBResult) { + return null; + } + + // Create metadata chunk with query information (excluding data) + const metadataChunk = JSON.stringify({ + assumptions: duckDBResult.assumptions, + schema_fragment: duckDBResult.schema_fragment + }, null, 2) + + // Use chunkSheetWithHeaders to chunk the 2D array data + const dataChunks = chunkSheetWithHeaders(duckDBResult.data.rows, {headerRows: 1}) + + // Combine metadata chunk with data chunks + const allChunks = [metadataChunk, ...dataChunks] + + const newChunksSummary = allChunks.map((c, idx) => ({chunk: c, score: 0, index: idx})) + + // Update matchfeatures to correspond to the new chunks + let newMatchfeatures = matchfeatures + if (matchfeatures) { + // Create new chunk_scores that match the new chunks + const newChunkScores: Record = {} + allChunks.forEach((_, idx) => { + newChunkScores[idx.toString()] = 0 // All new chunks get score 0 + }) + + // Update the matchfeatures with new chunk_scores + newMatchfeatures = { + ...matchfeatures, + chunk_scores: { + cells: newChunkScores + } + } + } + + return { + chunks_summary: newChunksSummary, + matchfeatures: newMatchfeatures, + maxSummaryChunks: allChunks.length + } +} // Utility to capitalize the first letter of a string const capitalize = (str: string) => str.charAt(0).toUpperCase() + str.slice(1) @@ -738,13 +884,43 @@ export const answerColoredContextMap = ( } type AiContext = string -export const answerContextMap = ( +export const answerContextMap = async ( searchResult: VespaSearchResults, userMetadata: UserMetadataType, maxSummaryChunks?: number, isSelectedFiles?: boolean, isMsgWithSources?: boolean, -): AiContext => { + query?: string, +): Promise => { + if(searchResult.fields.sddocname === fileSchema || searchResult.fields.sddocname === dataSourceFileSchema || searchResult.fields.sddocname === KbItemsSchema || searchResult.fields.sddocname === mailAttachmentSchema) { + let mimeType + if(searchResult.fields.sddocname === mailAttachmentSchema) { + mimeType = searchResult.fields.fileType + } else { + mimeType = searchResult.fields.mimeType + } + if(mimeType === "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" || + mimeType === "application/vnd.ms-excel" || + mimeType === "text/csv") { + const result = extractHeaderAndDataChunks(searchResult.fields.chunks_summary, searchResult.fields.matchfeatures); + searchResult.fields.chunks_summary = result.chunks_summary; + if (result.matchfeatures) { + searchResult.fields.matchfeatures = result.matchfeatures; + } + + if (query) { + const sheetResult = await processSheetQuery(searchResult.fields.chunks_summary, query, searchResult.fields.matchfeatures) + if (sheetResult) { + const { chunks_summary, matchfeatures, maxSummaryChunks: newMaxSummaryChunks } = sheetResult + searchResult.fields.chunks_summary = chunks_summary + searchResult.fields.matchfeatures = matchfeatures + maxSummaryChunks = newMaxSummaryChunks + } else { + maxSummaryChunks = Math.min(searchResult.fields.chunks_summary?.length || 0, 100) + } + } + } + } if (searchResult.fields.sddocname === fileSchema) { return constructFileContext( searchResult.fields, diff --git a/server/api/chat/agents.ts b/server/api/chat/agents.ts index 701f05e1c..d4375c634 100644 --- a/server/api/chat/agents.ts +++ b/server/api/chat/agents.ts @@ -121,6 +121,7 @@ import { getModelValueFromLabel } from "@/ai/modelConfig" import { buildContext, buildUserQuery, + expandSheetIds, getThreadContext, isContextSelected, UnderstandMessageAndAnswer, @@ -521,13 +522,21 @@ const checkAndYieldCitationsForAgent = async function* ( } } -const vespaResultToMinimalAgentFragment = ( +const vespaResultToMinimalAgentFragment = async ( child: VespaSearchResult, idx: number, userMetadata: UserMetadataType, -): MinimalAgentFragment => ({ + query: string, +): Promise => ({ id: `${(child.fields as any)?.docId || `Frangment_id_${idx}`}`, - content: answerContextMap(child as VespaSearchResults, userMetadata, 0, true), + content: await answerContextMap( + child as VespaSearchResults, + userMetadata, + 0, + true, + undefined, + query, + ), source: searchToCitation(child as VespaSearchResults), confidence: 1.0, }) @@ -836,7 +845,7 @@ export const MessageWithToolsApi = async (c: Context) => { .map((m) => m.fileId) const nonImageAttachmentFileIds = attachmentMetadata .filter((m) => !m.isImage) - .map((m) => m.fileId) + .flatMap((m) => expandSheetIds(m.fileId)) let attachmentStorageError: Error | null = null const contextExtractionSpan = initSpan.startSpan("context_extraction") @@ -1534,11 +1543,13 @@ export const MessageWithToolsApi = async (c: Context) => { if (results?.root?.children && results.root.children.length > 0) { const contextPromises = results?.root?.children?.map( async (v, i) => { - let content = answerContextMap( + let content = await answerContextMap( v as VespaSearchResults, userMetadata, 0, true, + undefined, + message, ) const chatContainerFields = isChatContainerFields(v.fields) && @@ -1608,22 +1619,24 @@ export const MessageWithToolsApi = async (c: Context) => { "\n" + buildContext(threadContexts, 10, userMetadata) } - gatheredFragments = results.root.children.map( - (child: VespaSearchResult, idx) => - vespaResultToMinimalAgentFragment(child, idx, userMetadata), + gatheredFragments = await Promise.all( + results.root.children.map( + async (child: VespaSearchResult, idx) => + await vespaResultToMinimalAgentFragment(child, idx, userMetadata, message), + ) ) if (chatContexts.length > 0) { gatheredFragments.push( - ...chatContexts.map((child, idx) => - vespaResultToMinimalAgentFragment(child, idx, userMetadata), - ), + ...(await Promise.all(chatContexts.map(async (child, idx) => + await vespaResultToMinimalAgentFragment(child, idx, userMetadata, message), + ))), ) } if (threadContexts.length > 0) { gatheredFragments.push( - ...threadContexts.map((child, idx) => - vespaResultToMinimalAgentFragment(child, idx, userMetadata), - ), + ...(await Promise.all(threadContexts.map(async (child, idx) => + await vespaResultToMinimalAgentFragment(child, idx, userMetadata, message), + ))), ) } const parseSynthesisOutput = await performSynthesis( @@ -2896,9 +2909,9 @@ export const AgentMessageApiRagOff = async (c: Context) => { chunksSpan.end() if (allChunks?.root?.children) { const startIndex = 0 - fragments = allChunks.root.children.map((child, idx) => - vespaResultToMinimalAgentFragment(child, idx, userMetadata), - ) + fragments = await Promise.all(allChunks.root.children.map(async (child, idx) => + await vespaResultToMinimalAgentFragment(child, idx, userMetadata, message) + )) context = answerContextMapFromFragments( fragments, maxDefaultSummary, @@ -3166,9 +3179,9 @@ export const AgentMessageApiRagOff = async (c: Context) => { if (docIds.length > 0) { const allChunks = await GetDocumentsByDocIds(docIds, chunksSpan) if (allChunks?.root?.children) { - fragments = allChunks.root.children.map((child, idx) => - vespaResultToMinimalAgentFragment(child, idx, userMetadata), - ) + fragments = await Promise.all(allChunks.root.children.map(async (child, idx) => + await vespaResultToMinimalAgentFragment(child, idx, userMetadata, message), + )) context = answerContextMapFromFragments( fragments, maxDefaultSummary, @@ -3382,7 +3395,7 @@ export const AgentMessageApi = async (c: Context) => { .map((m) => m.fileId) const nonImageAttachmentFileIds = attachmentMetadata .filter((m) => !m.isImage) - .map((m) => m.fileId) + .flatMap((m) => expandSheetIds(m.fileId)) let attachmentStorageError: Error | null = null let { message, diff --git a/server/api/chat/chat.ts b/server/api/chat/chat.ts index 121b2bafe..14f527776 100644 --- a/server/api/chat/chat.ts +++ b/server/api/chat/chat.ts @@ -217,6 +217,27 @@ import { getChunkCountPerDoc } from "./chunk-selection" const METADATA_NO_DOCUMENTS_FOUND = "METADATA_NO_DOCUMENTS_FOUND_INTERNAL" const METADATA_FALLBACK_TO_RAG = "METADATA_FALLBACK_TO_RAG_INTERNAL" +export function expandSheetIds(fileId: string): string[] { + // Check if the fileId matches the pattern docId_sheet_number + const sheetMatch = fileId.match(/^(.+)_sheet_(\d+)$/) + + if (!sheetMatch) { + // Not a sheet ID, return as is + return [fileId] + } + + const [, docId, sheetNumberStr] = sheetMatch + const sheetNumber = parseInt(sheetNumberStr, 10) + // Generate IDs from docId_sheet_0 to docId_sheet_number + const expandedIds: string[] = [] + const upper = Number.isFinite(sheetNumber) ? sheetNumber : 1 + for (let i = 0; i < upper; i++) { + expandedIds.push(`${docId}_sheet_${i}`) + } + + return expandedIds +} + export async function resolveNamesToEmails( intent: Intent, email: string, @@ -853,10 +874,20 @@ export const ChatDeleteApi = async (c: Context) => { for (const fileId of nonImageAttachmentFileIds) { try { // Delete from Vespa kb_items schema using the proper Vespa function - await DeleteDocument(fileId, KbItemsSchema) - loggerWithChild({ email: email }).info( - `Successfully deleted non-image attachment ${fileId} from Vespa kb_items schema`, - ) + const vespaIds = expandSheetIds(fileId) + for (const id of vespaIds) { + try { + await DeleteDocument(id, KbItemsSchema) + loggerWithChild({ email }).info( + `Successfully deleted non-image attachment ${id} from Vespa kb_items schema`, + ) + } catch (error) { + loggerWithChild({ email }).error( + `Failed to delete non-image attachment ${id} from Vespa kb_items schema`, + { error: getErrorMessage(error) } + ) + } + } } catch (error) { const errorMessage = getErrorMessage(error) if (errorMessage.includes("404 Not Found")) { @@ -1189,24 +1220,26 @@ export const replaceDocIdwithUserDocId = async ( return userMap[email] ?? docId } -export function buildContext( +export async function buildContext( results: VespaSearchResult[], maxSummaryCount: number | undefined, userMetadata: UserMetadataType, startIndex: number = 0, -): string { - return cleanContext( - results - ?.map( - (v, i) => - `Index ${i + startIndex} \n ${answerContextMap( - v as VespaSearchResults, - userMetadata, - maxSummaryCount, - )}`, - ) - ?.join("\n"), + builtUserQuery?: string, +): Promise { + const contextPromises = results?.map( + async (v, i) => + `Index ${i + startIndex} \n ${await answerContextMap( + v as VespaSearchResults, + userMetadata, + maxSummaryCount, + undefined, + undefined, + builtUserQuery, + )}`, ) + const contexts = await Promise.all(contextPromises || []) + return cleanContext(contexts.join("\n")) } async function* generateIterativeTimeFilterAndQueryRewrite( @@ -1563,10 +1596,12 @@ async function* generateIterativeTimeFilterAndQueryRewrite( ) vespaSearchSpan?.end() - const initialContext = buildContext( + const initialContext = await buildContext( results?.root?.children, maxSummaryCount, userMetadata, + 0, + message, ) const queryRewriteSpan = rewriteSpan?.startSpan("query_rewriter") @@ -1695,11 +1730,7 @@ async function* generateIterativeTimeFilterAndQueryRewrite( ) totalResultsSpan?.end() const contextSpan = querySpan?.startSpan("build_context") - const initialContext = buildContext( - totalResults, - maxSummaryCount, - userMetadata, - ) + const initialContext = await buildContext(totalResults, maxSummaryCount, userMetadata, 0, message) const { imageFileNames } = extractImageFileNames( initialContext, @@ -1887,11 +1918,12 @@ async function* generateIterativeTimeFilterAndQueryRewrite( pageSearchSpan?.end() const startIndex = isReasoning ? previousResultsLength : 0 const contextSpan = pageSpan?.startSpan("build_context") - const initialContext = buildContext( + const initialContext = await buildContext( results?.root?.children, maxSummaryCount, userMetadata, startIndex, + message, ) const { imageFileNames } = extractImageFileNames( @@ -2182,12 +2214,13 @@ async function* generateAnswerFromGivenContext( const startIndex = isReasoning ? previousResultsLength : 0 const contextPromises = combinedSearchResponse?.map(async (v, i) => { - let content = answerContextMap( + let content = await answerContextMap( v as VespaSearchResults, userMetadata, i < chunksPerDocument.length ? chunksPerDocument[i] : 0, true, isMsgWithSources, + message, ) if ( v.fields && @@ -2828,11 +2861,12 @@ async function* generatePointQueryTimeExpansion( // Prepare context for LLM const contextSpan = iterationSpan?.startSpan("build_context") const startIndex = isReasoning ? previousResultsLength : 0 - const initialContext = buildContext( + const initialContext = await buildContext( combinedResults?.root?.children, maxSummaryCount, userMetadata, startIndex, + message, ) const { imageFileNames } = extractImageFileNames( @@ -2969,7 +3003,7 @@ async function* processResultsForMetadata( "Document chunk size", `full_context maxed to ${chunksCount}`, ) - const context = buildContext(items, chunksCount, userMetadata) + const context = await buildContext(items, chunksCount, userMetadata, 0, input) const { imageFileNames } = extractImageFileNames(context, items) const streamOptions = { stream: true, @@ -3320,7 +3354,7 @@ async function* generateMetadataQueryAnswer( ), ) - pageSpan?.setAttribute("context", buildContext(items, 20, userMetadata)) + pageSpan?.setAttribute("context", await buildContext(items, 20, userMetadata, 0, input)) if (!items.length) { loggerWithChild({ email: email }).info( `No documents found on iteration ${iteration}${ @@ -3497,7 +3531,7 @@ async function* generateMetadataQueryAnswer( ), ) - span?.setAttribute("context", buildContext(items, 20, userMetadata)) + span?.setAttribute("context", await buildContext(items, 20, userMetadata, 0, input)) span?.end() loggerWithChild({ email: email }).info( `Retrieved Documents : ${QueryType.GetItems} - ${items.length}`, @@ -3643,10 +3677,7 @@ async function* generateMetadataQueryAnswer( items.map((v: VespaSearchResult) => (v.fields as any).docId), ), ) - iterationSpan?.setAttribute( - `context`, - buildContext(items, 20, userMetadata), - ) + iterationSpan?.setAttribute(`context`, await buildContext(items, 20, userMetadata, 0, input)) iterationSpan?.end() loggerWithChild({ email: email }).info( @@ -4235,7 +4266,7 @@ export const MessageApi = async (c: Context) => { .map((m) => m.fileId) const nonImageAttachmentFileIds = attachmentMetadata .filter((m) => !m.isImage) - .map((m) => m.fileId) + .flatMap((m) => expandSheetIds(m.fileId)) if (agentPromptValue) { const userAndWorkspaceCheck = await getUserAndWorkspaceByEmail( @@ -4291,7 +4322,7 @@ export const MessageApi = async (c: Context) => { try { const resp = await getCollectionFilesVespaIds(JSON.parse(kbItems), db) fileIds = resp - .map((file) => file.vespaDocId || "") + .flatMap((file) => expandSheetIds(file.vespaDocId || "")) .filter((id) => id !== "") } catch { fileIds = [] diff --git a/server/api/chat/tools.ts b/server/api/chat/tools.ts index 29ad98862..8ff09441c 100644 --- a/server/api/chat/tools.ts +++ b/server/api/chat/tools.ts @@ -446,12 +446,12 @@ async function executeVespaSearch(options: UnifiedSearchOptions): Promise<{ return { result: "No results found.", contexts: [] } } - const fragments: MinimalAgentFragment[] = children.map((r) => { + const fragments: MinimalAgentFragment[] = await Promise.all(children.map(async (r) => { if (r.fields.sddocname === dataSourceFileSchema) { const fields = r.fields as VespaDataSourceFile return { id: `${fields.docId}`, - content: answerContextMap(r, userMetadata, maxDefaultSummary), + content: await answerContextMap(r, userMetadata, maxDefaultSummary), source: { docId: fields.docId, title: fields.fileName || "Untitled", @@ -465,11 +465,11 @@ async function executeVespaSearch(options: UnifiedSearchOptions): Promise<{ const citation = searchToCitation(r) return { id: `${citation.docId}`, - content: answerContextMap(r, userMetadata, maxDefaultSummary), + content: await answerContextMap(r, userMetadata, maxDefaultSummary), source: citation, confidence: r.relevance || 0.7, } - }) + })) let summaryText = `Found ${fragments.length} results` if (query) summaryText += ` matching '${query}'` @@ -1034,13 +1034,13 @@ export const getSlackThreads: AgentTool = { !!(item.fields && "sddocname" in item.fields), ) if (threads.length > 0) { - const fragments: MinimalAgentFragment[] = threads.map( - (item: VespaSearchResults): MinimalAgentFragment => { + const fragments: MinimalAgentFragment[] = await Promise.all(threads.map( + async (item: VespaSearchResults): Promise => { const citation = searchToCitation(item) Logger.debug({ item }, "Processing item in metadata_retrieval tool") const content = item.fields - ? answerContextMap(item, userMetadata, maxDefaultSummary) + ? await answerContextMap(item, userMetadata, maxDefaultSummary) : `Context unavailable for ${citation.title || citation.docId}` return { @@ -1050,7 +1050,7 @@ export const getSlackThreads: AgentTool = { confidence: item.relevance || 0.7, // Use item.relevance if available } }, - ) + )) let responseText = `Found ${fragments.length} Slack message${fragments.length !== 1 ? "s" : ""}` if (params.filter_query) { @@ -1248,13 +1248,13 @@ export const getSlackMessagesFromUser: AgentTool = { ) if (items.length > 0) { - const fragments: MinimalAgentFragment[] = items.map( - (item: VespaSearchResults): MinimalAgentFragment => { + const fragments: MinimalAgentFragment[] = await Promise.all(items.map( + async (item: VespaSearchResults): Promise => { const citation = searchToCitation(item) Logger.debug({ item }, "Processing item in metadata_retrieval tool") const content = item.fields - ? answerContextMap(item, userMetadata, maxDefaultSummary) + ? await answerContextMap(item, userMetadata, maxDefaultSummary) : `Context unavailable for ${citation.title || citation.docId}` return { @@ -1264,7 +1264,7 @@ export const getSlackMessagesFromUser: AgentTool = { confidence: item.relevance || 0.7, // Use item.relevance if available } }, - ) + )) let responseText = `Found ${fragments.length} Slack message${fragments.length !== 1 ? "s" : ""}` if (params.filter_query) { @@ -1488,13 +1488,13 @@ export const getSlackRelatedMessages: AgentTool = { } // Process results into fragments - const fragments: MinimalAgentFragment[] = items.map( - (item: VespaSearchResults): MinimalAgentFragment => { + const fragments: MinimalAgentFragment[] = await Promise.all(items.map( + async (item: VespaSearchResults): Promise => { const citation = searchToCitation(item) Logger.debug({ item }, "Processing Slack message item") const content = item.fields - ? answerContextMap(item, userMetadata, maxDefaultSummary) + ? await answerContextMap(item, userMetadata, maxDefaultSummary) : `Content unavailable for ${citation.title || citation.docId}` return { @@ -1504,7 +1504,7 @@ export const getSlackRelatedMessages: AgentTool = { confidence: item.relevance || 0.7, } }, - ) + )) // Build response message let responseText = `Found ${fragments.length} Slack message${fragments.length !== 1 ? "s" : ""}` @@ -1825,13 +1825,13 @@ export const getSlackMessagesFromChannel: AgentTool = { contexts: [], } } - const fragments: MinimalAgentFragment[] = items.map( - (item: VespaSearchResults): MinimalAgentFragment => { + const fragments: MinimalAgentFragment[] = await Promise.all(items.map( + async (item: VespaSearchResults): Promise => { const citation = searchToCitation(item) Logger.debug({ item }, "Processing item in metadata_retrieval tool") const content = item.fields - ? answerContextMap(item, userMetadata, maxDefaultSummary) + ? await answerContextMap(item, userMetadata, maxDefaultSummary) : `Context unavailable for ${citation.title || citation.docId}` return { @@ -1841,7 +1841,7 @@ export const getSlackMessagesFromChannel: AgentTool = { confidence: item.relevance || 0.7, // Use item.relevance if available } }, - ) + )) let responseText = `Found ${fragments.length} Slack message${fragments.length !== 1 ? "s" : ""}` if (params.filter_query) { @@ -2032,13 +2032,13 @@ export const getSlackMessagesFromTimeRange: AgentTool = { contexts: [], } } - const fragments: MinimalAgentFragment[] = items.map( - (item: VespaSearchResults): MinimalAgentFragment => { + const fragments: MinimalAgentFragment[] = await Promise.all(items.map( + async (item: VespaSearchResults): Promise => { const citation = searchToCitation(item) Logger.debug({ item }, "Processing item in metadata_retrieval tool") const content = item.fields - ? answerContextMap(item, userMetadata, maxDefaultSummary) + ? await answerContextMap(item, userMetadata, maxDefaultSummary) : `Context unavailable for ${citation.title || citation.docId}` return { @@ -2048,7 +2048,7 @@ export const getSlackMessagesFromTimeRange: AgentTool = { confidence: item.relevance || 0.7, // Use item.relevance if available } }, - ) + )) let responseText = `Found ${fragments.length} Slack message${fragments.length !== 1 ? "s" : ""}` if (params.filter_query) { diff --git a/server/api/files.ts b/server/api/files.ts index 23e66e967..189e22e01 100644 --- a/server/api/files.ts +++ b/server/api/files.ts @@ -20,7 +20,7 @@ import { HTTPException } from "hono/http-exception" import { isValidFile, isImageFile } from "shared/fileUtils" import { generateThumbnail, getThumbnailPath } from "@/utils/image" import type { AttachmentMetadata } from "@/shared/types" -import { FileProcessorService } from "@/services/fileProcessor" +import { FileProcessorService, type SheetProcessingResult } from "@/services/fileProcessor" import { Apps, KbItemsSchema, KnowledgeBaseEntity } from "@xyne/vespa-ts/types" import { getBaseMimeType } from "@/integrations/dataSource/config" import { isDataSourceError } from "@/integrations/dataSource/errors" @@ -218,10 +218,11 @@ export const handleAttachmentUpload = async (c: Context) => { } const attachmentMetadata: AttachmentMetadata[] = [] - + for (const file of files) { const fileBuffer = await file.arrayBuffer() const fileId = `att_${crypto.randomUUID()}` + let vespaId = fileId const ext = file.name.split(".").pop()?.toLowerCase() || "" const fullFileName = `${0}.${ext}` const isImage = isImageFile(file.type) @@ -247,7 +248,7 @@ export const handleAttachmentUpload = async (c: Context) => { // For non-images: process through FileProcessorService and ingest into Vespa // Process the file content using FileProcessorService - const processingResult = await FileProcessorService.processFile( + const processingResults = await FileProcessorService.processFile( Buffer.from(fileBuffer), file.type, file.name, @@ -257,61 +258,73 @@ export const handleAttachmentUpload = async (c: Context) => { false, ) - // TODO: Ingest the processed content into Vespa - // This would typically involve calling your Vespa ingestion service - // For now, we'll log the processing result - loggerWithChild({ email }).info( - `Processed non-image file "${file.name}" with ${processingResult.chunks.length} text chunks and ${processingResult.image_chunks.length} image chunks`, - ) - - const { chunks, chunks_pos, image_chunks, image_chunks_pos } = - processingResult - - const vespaDoc = { - docId: fileId, - clId: "attachment", - itemId: fileId, - fileName: file.name, - app: Apps.KnowledgeBase as const, - entity: KnowledgeBaseEntity.Attachment, - description: "", - storagePath: "", - chunks: chunks, - chunks_pos: chunks_pos, - image_chunks: image_chunks, - image_chunks_pos: image_chunks_pos, - chunks_map: chunks.map((_, index) => ({ - chunk_index: index, - page_numbers: [0], - block_labels: [], - })), - image_chunks_map: image_chunks.map((_, index) => ({ - chunk_index: index, - page_numbers: [0], - block_labels: [], - })), - metadata: JSON.stringify({ - originalFileName: file.name, - uploadedBy: email, - chunksCount: chunks.length, - imageChunksCount: image_chunks.length, - processingMethod: getBaseMimeType(file.type || "text/plain"), - lastModified: Date.now(), - }), - createdBy: email, - duration: 0, - mimeType: getBaseMimeType(file.type || "text/plain"), - fileSize: file.size, - createdAt: Date.now(), - updatedAt: Date.now(), + if(processingResults.length > 0 && 'totalSheets' in processingResults[0]) { + vespaId = `${fileId}_sheet_${(processingResults[0] as SheetProcessingResult).totalSheets}` } + // Handle multiple processing results (e.g., for spreadsheets with multiple sheets) + for (const [resultIndex, processingResult] of processingResults.entries()) { + let docId = fileId + let fileName = file.name + + // For sheet processing results, append sheet information + if ('sheetName' in processingResult) { + const sheetResult = processingResult as SheetProcessingResult + fileName = processingResults.length > 1 + ? `${file.name} / ${sheetResult.sheetName}` + : file.name + docId = sheetResult.docId + } + + loggerWithChild({ email }).info( + `Processed non-image file "${fileName}" with ${processingResult.chunks.length} text chunks and ${processingResult.image_chunks.length} image chunks`, + ) - await insert(vespaDoc, KbItemsSchema) + const { chunks, chunks_pos, image_chunks, image_chunks_pos } = + processingResult + + const vespaDoc = { + docId: docId, + clId: "attachment", + itemId: docId, + fileName: fileName, + app: Apps.KnowledgeBase as const, + entity: KnowledgeBaseEntity.Attachment, + description: "", + storagePath: "", + chunks: chunks, + chunks_pos: chunks_pos, + image_chunks: image_chunks, + image_chunks_pos: image_chunks_pos, + chunks_map: processingResult.chunks_map, + image_chunks_map: processingResult.image_chunks_map, + metadata: JSON.stringify({ + originalFileName: file.name, + uploadedBy: email, + chunksCount: chunks.length, + imageChunksCount: image_chunks.length, + processingMethod: getBaseMimeType(file.type || "text/plain"), + lastModified: Date.now(), + ...(('sheetName' in processingResult) && { + sheetName: (processingResult as SheetProcessingResult).sheetName, + sheetIndex: (processingResult as SheetProcessingResult).sheetIndex, + totalSheets: (processingResult as SheetProcessingResult).totalSheets, + }), + }), + createdBy: email, + duration: 0, + mimeType: getBaseMimeType(file.type || "text/plain"), + fileSize: file.size, + createdAt: Date.now(), + updatedAt: Date.now(), + } + + await insert(vespaDoc, KbItemsSchema) + } } // Create attachment metadata const metadata: AttachmentMetadata = { - fileId, + fileId: vespaId, fileName: file.name, fileType: file.type, fileSize: file.size, @@ -321,13 +334,13 @@ export const handleAttachmentUpload = async (c: Context) => { ? path.relative(outputDir, thumbnailPath) : "", createdAt: new Date(), - url: `/api/v1/attachments/${fileId}`, + url: `/api/v1/attachments/${vespaId}`, } attachmentMetadata.push(metadata) loggerWithChild({ email }).info( - `Attachment "${file.name}" processed with ID ${fileId}${isImage ? " (saved to disk with thumbnail)" : " (processed and ingested into Vespa)"}`, + `Attachment "${file.name}" processed with ID ${vespaId}${isImage ? " (saved to disk with thumbnail)" : " (processed and ingested into Vespa)"}`, ) } catch (error) { // Cleanup: remove the directory if file write fails (only for images) diff --git a/server/api/knowledgeBase.ts b/server/api/knowledgeBase.ts index 23acbe31d..aea0684c2 100644 --- a/server/api/knowledgeBase.ts +++ b/server/api/knowledgeBase.ts @@ -58,6 +58,8 @@ import { } from "@/integrations/dataSource/config" import { getAuth, safeGet } from "./agent" import { ApiKeyScopes, UploadStatus } from "@/shared/types" +import { expandSheetIds } from "./chat/chat" +import { checkFileSize } from "@/integrations/dataSource" const EXTENSION_MIME_MAP: Record = { ".pdf": "application/pdf", @@ -101,7 +103,7 @@ const { JwtPayloadKey } = config // Storage configuration for Knowledge Base feature files const KB_STORAGE_ROOT = join(process.cwd(), "storage", "kb_files") -const MAX_FILE_SIZE = 100 * 1024 * 1024 // 100MB max file size +const MAX_FILE_SIZE = 100 // 100MB max file size const MAX_FILES_PER_REQUEST = 100 // Maximum files per upload request // Initialize storage directory for Knowledge Base files @@ -1194,12 +1196,14 @@ export const UploadFilesApi = async (c: Context) => { let storagePath = "" try { // Validate file size - if (file.size > MAX_FILE_SIZE) { + try{ + checkFileSize(file.size, MAX_FILE_SIZE) + } catch (error) { uploadResults.push({ success: false, fileName: file.name, parentId: targetParentId, - message: `Skipped: File too large (${Math.round(file.size / 1024 / 1024)}MB). Maximum size is ${Math.round(MAX_FILE_SIZE / 1024 / 1024)}MB`, + message: `Skipped: File too large (${Math.round(file.size / 1024 / 1024)}MB). Maximum size is ${MAX_FILE_SIZE}MB`, }) loggerWithChild({ email: userEmail }).info( `Skipped large file: ${file.name} (${file.size} bytes)`, @@ -1597,10 +1601,20 @@ export const DeleteItemApi = async (c: Context) => { try { // Delete from Vespa if (itemToDelete.vespaDocId) { - await DeleteDocument(itemToDelete.vespaDocId, KbItemsSchema) - loggerWithChild({ email: userEmail }).info( - `Deleted file from Vespa: ${itemToDelete.vespaDocId}`, - ) + const vespaDocIds = expandSheetIds(itemToDelete.vespaDocId) + for (const id of vespaDocIds) { + try { + await DeleteDocument(id, KbItemsSchema) + loggerWithChild({ email: userEmail }).info( + `Deleted file from Vespa: ${id}`, + ) + } catch (error) { + loggerWithChild({ email: userEmail }).error( + `Failed to delete file from Vespa: ${id}`, + { error: getErrorMessage(error) } + ) + } + } } } catch (error) { loggerWithChild({ email: userEmail }).warn( diff --git a/server/config.ts b/server/config.ts index a74a741f1..768c01373 100644 --- a/server/config.ts +++ b/server/config.ts @@ -48,6 +48,7 @@ let VertexRegion = "" let VertexAIModel = "" let aiProviderBaseUrl = "" let isReasoning = false +let sqlInferenceModel = "" // File processing worker configuration let fileProcessingWorkerThreads = parseInt(process.env.FILE_PROCESSING_WORKER_THREADS || "4", 10) @@ -74,6 +75,7 @@ if (process.env["AWS_ACCESS_KEY"] && process.env["AWS_SECRET_KEY"]) { AwsSecretKey = process.env["AWS_SECRET_KEY"] defaultFastModel = Models.Claude_3_5_Haiku defaultBestModel = Models.Claude_Sonnet_4 + sqlInferenceModel = Models.Claude_Sonnet_4 } else if (process.env["OPENAI_API_KEY"]) { if (process.env["BASE_URL"]) { if (!isURLValid(process.env["BASE_URL"])) { @@ -136,6 +138,7 @@ if (process.env["AWS_ACCESS_KEY"] && process.env["AWS_SECRET_KEY"]) { defaultBestModel = process.env["VERTEX_BEST_MODEL"] ? (process.env["VERTEX_BEST_MODEL"] as Models) : Models.Vertex_Claude_Sonnet_4 // Default best model + sqlInferenceModel = Models.Vertex_Claude_Sonnet_4 } let StartThinkingToken = "" let EndThinkingToken = "" @@ -187,6 +190,7 @@ export default { GeminiAIModel, GeminiApiKey, VertexAIModel, + sqlInferenceModel, VertexProjectId, VertexRegion, aiProviderBaseUrl, diff --git a/server/integrations/dataSource/config.ts b/server/integrations/dataSource/config.ts index b79b6180a..b835278bb 100644 --- a/server/integrations/dataSource/config.ts +++ b/server/integrations/dataSource/config.ts @@ -1,4 +1,3 @@ -import path from "path" export const DATASOURCE_CONFIG = { // File size limits @@ -18,19 +17,15 @@ export const DATASOURCE_CONFIG = { process.env.DATASOURCE_MAX_PPTX_TEXT_LEN || "300000", 10, ), + MAX_SPREADSHEET_FILE_SIZE_MB: parseInt( + process.env.DATASOURCE_MAX_SPREADSHEET_FILE_SIZE_MB || "10", + 10, + ), MAX_TEXT_FILE_SIZE_MB: parseInt( process.env.DATASOURCE_MAX_TEXT_FILE_SIZE_MB || "40", 10, ), MAX_CHUNK_SIZE: parseInt(process.env.DATASOURCE_MAX_CHUNK_SIZE || "512", 10), - MAX_ATTACHMENT_SHEET_ROWS: parseInt( - process.env.DATASOURCE_MAX_ATTACHMENT_SHEET_ROWS || "3000", - 10, - ), - MAX_ATTACHMENT_SHEET_TEXT_LEN: parseInt( - process.env.DATASOURCE_MAX_ATTACHMENT_SHEET_TEXT_LEN || "300000", - 10, - ), MAX_IMAGE_FILE_SIZE_MB: parseInt( process.env.DATASOURCE_MAX_IMAGE_FILE_SIZE_MB || "40", 10, diff --git a/server/integrations/dataSource/errors.ts b/server/integrations/dataSource/errors.ts index 639dbdd47..738bedfd4 100644 --- a/server/integrations/dataSource/errors.ts +++ b/server/integrations/dataSource/errors.ts @@ -92,10 +92,10 @@ export const createFileValidationError = (file: File): FileValidationError => { } export const createFileSizeError = ( - file: File, + size: number, maxSizeMB: number, ): FileSizeExceededError => { - const actualSizeMB = file.size / (1024 * 1024) + const actualSizeMB = size / (1024 * 1024) return new FileSizeExceededError(maxSizeMB, actualSizeMB) } diff --git a/server/integrations/dataSource/index.ts b/server/integrations/dataSource/index.ts index d186e5797..ee3f6708c 100644 --- a/server/integrations/dataSource/index.ts +++ b/server/integrations/dataSource/index.ts @@ -37,6 +37,7 @@ import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" import imageType from "image-type" import { NAMESPACE } from "@/config" +import { chunkSheetWithHeaders } from "@/sheetChunk" const Logger = getLogger(Subsystem.Integrations).child({ module: "dataSourceIntegration", @@ -90,10 +91,10 @@ const validateFile = (file: File): void => { } } -const checkFileSize = (file: File, maxFileSizeMB: number): void => { - const fileSizeMB = file.size / (1024 * 1024) +export const checkFileSize = (size: number, maxFileSizeMB: number): void => { + const fileSizeMB = size / (1024 * 1024) if (fileSizeMB > maxFileSizeMB) { - throw createFileSizeError(file, maxFileSizeMB) + throw createFileSizeError(size, maxFileSizeMB) } } @@ -374,27 +375,8 @@ const processSpreadsheetFile = async ( const worksheet = workbook.Sheets[sheetName] if (!worksheet) continue - const sheetData: string[][] = XLSX.utils.sheet_to_json(worksheet, { - header: 1, - defval: "", - raw: false, - }) - - const validRows = sheetData.filter((row) => - row.some((cell) => cell && cell.toString().trim().length > 0), - ) - - if (validRows.length === 0) continue - - if (validRows?.length > DATASOURCE_CONFIG.MAX_ATTACHMENT_SHEET_ROWS) { - // If there are more rows than MAX_GD_SHEET_ROWS, still index it but with empty content - // Logger.warn( - // `Large no. of rows in ${spreadsheet.name} -> ${sheet.sheetTitle}, indexing with empty content`, - // ) - return [] - } - - const sheetChunks = chunkSheetRows(validRows) + // Use the new header-preserving chunking function + const sheetChunks = chunkSheetWithHeaders(worksheet) const filteredChunks = sheetChunks.filter( (chunk) => chunk.trim().length > 0, @@ -467,55 +449,6 @@ const processSpreadsheetFile = async ( } } -// Function to chunk sheet rows (simplified version of chunkFinalRows) -const chunkSheetRows = (allRows: string[][]): string[] => { - const chunks: string[] = [] - let currentChunk = "" - let totalTextLength = 0 - const MAX_CHUNK_SIZE = 512 - - for (const row of allRows) { - // Filter out numerical cells and empty strings, join textual cells - const textualCells = row - .filter( - (cell) => - cell && isNaN(Number(cell)) && cell.toString().trim().length > 0, - ) - .map((cell) => cell.toString().trim()) - - if (textualCells.length === 0) continue - - const rowText = textualCells.join(" ") - - // Check if adding this rowText would exceed the maximum text length - if ( - totalTextLength + rowText.length > - DATASOURCE_CONFIG.MAX_ATTACHMENT_SHEET_TEXT_LEN - ) { - // Logger.warn(`Text length excedded, indexing with empty content`) - // Return an empty array if the total text length exceeds the limit - return [] - } - - totalTextLength += rowText.length - - if ((currentChunk + " " + rowText).trim().length > MAX_CHUNK_SIZE) { - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - currentChunk = rowText - } else { - currentChunk += (currentChunk ? " " : "") + rowText - } - } - - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - - return chunks -} - // Main export function export const handleDataSourceFileUpload = async ( file: File, @@ -546,7 +479,7 @@ export const handleDataSourceFileUpload = async ( `LLM API endpoint is not set. Skipping image: ${options.fileName}`, ) } - checkFileSize(file, DATASOURCE_CONFIG.MAX_IMAGE_FILE_SIZE_MB) + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_IMAGE_FILE_SIZE_MB) const imageBuffer = Buffer.from(await file.arrayBuffer()) const type = await imageType(new Uint8Array(imageBuffer)) if (!type || !DATASOURCE_CONFIG.SUPPORTED_IMAGE_TYPES.has(type.mime)) { @@ -581,25 +514,26 @@ export const handleDataSourceFileUpload = async ( } else { // Process based on file type if (mimeType === "application/pdf") { - checkFileSize(file, DATASOURCE_CONFIG.MAX_PDF_FILE_SIZE_MB) + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_PDF_FILE_SIZE_MB) const fileBuffer = new Uint8Array(await file.arrayBuffer()) const processedFile = await processPdfContent(fileBuffer, options) processedFiles = [processedFile] } else if (isDocxFile(mimeType)) { - checkFileSize(file, DATASOURCE_CONFIG.MAX_DOCX_FILE_SIZE_MB) + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_DOCX_FILE_SIZE_MB) const fileBuffer = new Uint8Array(await file.arrayBuffer()) const processedFile = await processDocxContent(fileBuffer, options) processedFiles = [processedFile] } else if (isPptxFile(mimeType)) { - checkFileSize(file, DATASOURCE_CONFIG.MAX_PPTX_FILE_SIZE_MB) + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_PPTX_FILE_SIZE_MB) const fileBuffer = new Uint8Array(await file.arrayBuffer()) const processedFile = await processPptxContent(fileBuffer, options) processedFiles = [processedFile] } else if (isSheetFile(mimeType)) { + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_SPREADSHEET_FILE_SIZE_MB) const fileBuffer = Buffer.from(await file.arrayBuffer()) processedFiles = await processSheetContent(fileBuffer, options) } else if (isTextFile(mimeType)) { - checkFileSize(file, DATASOURCE_CONFIG.MAX_TEXT_FILE_SIZE_MB) + checkFileSize(file.size, DATASOURCE_CONFIG.MAX_TEXT_FILE_SIZE_MB) const content = await file.text() const processedFile = await processTextContent(content, options) processedFiles = [processedFile] diff --git a/server/integrations/google/config.ts b/server/integrations/google/config.ts index b5ef691c0..8c97312ee 100644 --- a/server/integrations/google/config.ts +++ b/server/integrations/google/config.ts @@ -12,8 +12,7 @@ export const scopes = [ ] export const MAX_GD_PDF_SIZE = 15 // In MB -export const MAX_GD_SHEET_ROWS = 3000 -export const MAX_GD_SHEET_TEXT_LEN = 300000 +export const MAX_GD_SHEET_SIZE = 10 // In MB export const MAX_GD_SLIDES_TEXT_LEN = 300000 export const ServiceAccountUserConcurrency = 2 export const GoogleDocsConcurrency = 8 @@ -24,8 +23,7 @@ export const MAX_ATTACHMENT_PDF_SIZE = 15 export const MAX_ATTACHMENT_TEXT_SIZE = 10 export const MAX_ATTACHMENT_DOCX_SIZE = 15 export const MAX_ATTACHMENT_PPTX_SIZE = 15 -export const MAX_ATTACHMENT_SHEET_ROWS = 3000 -export const MAX_ATTACHMENT_SHEET_TEXT_LEN = 300000 +export const MAX_ATTACHMENT_SHEET_SIZE = 10 // if true will directly ingest the data without checking // if false will check for its existance in vespa diff --git a/server/integrations/google/index.ts b/server/integrations/google/index.ts index 3c086ea57..473494cff 100644 --- a/server/integrations/google/index.ts +++ b/server/integrations/google/index.ts @@ -100,8 +100,7 @@ import { unlink } from "node:fs/promises" import type { Document } from "@langchain/core/documents" import { MAX_GD_PDF_SIZE, - MAX_GD_SHEET_ROWS, - MAX_GD_SHEET_TEXT_LEN, + MAX_GD_SHEET_SIZE, MAX_GD_SLIDES_TEXT_LEN, PDFProcessingConcurrency, ServiceAccountUserConcurrency, @@ -1074,6 +1073,8 @@ import { totalAttachmentIngested, totalIngestedMails, } from "@/metrics/google/gmail-metrics" +import { chunkSheetWithHeaders } from "@/sheetChunk" +import { checkFileSize } from "../dataSource" const stats = z.object({ type: z.literal(WorkerResponseTypes.Stats), @@ -2180,62 +2181,23 @@ export const getSpreadsheet = async ( } } -// Function to chunk rows of text data into manageable batches -// Excludes numerical data, assuming users do not typically search by numbers -// Concatenates all textual cells in a row into a single string -// Adds rows' string data to a chunk until the 512-character limit is exceeded -// If adding a row exceeds the limit, the chunk is added to the next chunk -// Otherwise, the row is added to the current chunk -const chunkFinalRows = (allRows: string[][]): string[] => { - const chunks: string[] = [] - let currentChunk = "" - let totalTextLength = 0 - - for (const row of allRows) { - // Filter out numerical cells and empty strings - const textualCells = row.filter( - (cell) => isNaN(Number(cell)) && cell.trim().length > 0, - ) - - if (textualCells.length === 0) continue // Skip if no textual data - - const rowText = textualCells.join(" ") - - // Check if adding this rowText would exceed the maximum text length - if (totalTextLength + rowText.length > MAX_GD_SHEET_TEXT_LEN) { - // Logger.warn(`Text length excedded, indexing with empty content`) - // Return an empty array if the total text length exceeds the limit - return [] - } - - totalTextLength += rowText.length - - if ((currentChunk + " " + rowText).trim().length > 512) { - // Add the current chunk to the list and start a new chunk - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - currentChunk = rowText - } else { - // Append the row text to the current chunk - currentChunk += " " + rowText - } - } - - if (currentChunk.trim().length > 0) { - // Add any remaining text as the last chunk - chunks.push(currentChunk.trim()) - } - - return chunks -} - export const getSheetsListFromOneSpreadsheet = async ( sheets: sheets_v4.Sheets, client: GoogleClient, spreadsheet: drive_v3.Schema$File, userEmail: string, ): Promise => { + // Early size check before fetching spreadsheet data + const sizeInBytes = spreadsheet.size ? parseInt(spreadsheet.size, 10) : 0 + try { + checkFileSize(sizeInBytes, MAX_GD_SHEET_SIZE) + } catch (error) { + loggerWithChild({ email: userEmail }).warn( + `Ignoring ${spreadsheet.name} as its size (${Math.round(sizeInBytes / 1024 / 1024)} MB) exceeds the limit of ${MAX_GD_SHEET_SIZE} MB`, + ) + return [] + } + const sheetsArr = [] try { const spreadSheetData = await getSpreadsheet( @@ -2280,17 +2242,7 @@ export const getSheetsListFromOneSpreadsheet = async ( continue } - let chunks: string[] = [] - - if (finalRows?.length > MAX_GD_SHEET_ROWS) { - // If there are more rows than MAX_GD_SHEET_ROWS, still index it but with empty content - // Logger.warn( - // `Large no. of rows in ${spreadsheet.name} -> ${sheet.sheetTitle}, indexing with empty content`, - // ) - chunks = [] - } else { - chunks = chunkFinalRows(finalRows) - } + const chunks: string[] = chunkSheetWithHeaders(finalRows) const sheetDataToBeIngested = { title: `${spreadsheet.name} / ${sheet?.sheetTitle}`, @@ -2965,12 +2917,13 @@ export async function* listFiles( client: GoogleClient, startDate?: string, endDate?: string, + q?: string, ): AsyncIterableIterator { const drive = google.drive({ version: "v3", auth: client }) let nextPageToken = "" // Build the query with date filters if provided - let query = "trashed = false" + let query = q ? `(${q}) and trashed = false` : "trashed = false" const dateFilters: string[] = [] if (startDate) { diff --git a/server/integrations/google/worker-utils.ts b/server/integrations/google/worker-utils.ts index e2c55bfa5..746dd8353 100644 --- a/server/integrations/google/worker-utils.ts +++ b/server/integrations/google/worker-utils.ts @@ -13,13 +13,14 @@ import { MAX_ATTACHMENT_TEXT_SIZE, MAX_ATTACHMENT_DOCX_SIZE, MAX_ATTACHMENT_PPTX_SIZE, - MAX_ATTACHMENT_SHEET_ROWS, - MAX_ATTACHMENT_SHEET_TEXT_LEN, + MAX_ATTACHMENT_SHEET_SIZE, } from "@/integrations/google/config" import * as XLSX from "xlsx" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" +import { chunkSheetWithHeaders } from "@/sheetChunk" +import { checkFileSize } from "../dataSource" const Logger = getLogger(Subsystem.Integrations).child({ module: "google" }) @@ -132,8 +133,9 @@ export const getGmailAttachmentChunks = async ( } if (mimeType === "application/pdf") { - const fileSizeMB = size.value / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_PDF_SIZE) { + try { + checkFileSize(size.value, MAX_ATTACHMENT_PDF_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_PDF_SIZE} MB`, ) @@ -153,8 +155,9 @@ export const getGmailAttachmentChunks = async ( "application/vnd.openxmlformats-officedocument.wordprocessingml.document" || mimeType === "application/msword" ) { - const fileSizeMB = size.value / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_DOCX_SIZE) { + try { + checkFileSize(size.value, MAX_ATTACHMENT_DOCX_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_DOCX_SIZE} MB`, ) @@ -174,8 +177,9 @@ export const getGmailAttachmentChunks = async ( "application/vnd.openxmlformats-officedocument.presentationml.presentation" || mimeType === "application/vnd.ms-powerpoint" ) { - const fileSizeMB = size.value / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_PPTX_SIZE) { + try { + checkFileSize(size.value, MAX_ATTACHMENT_PPTX_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_PPTX_SIZE} MB`, ) @@ -195,8 +199,9 @@ export const getGmailAttachmentChunks = async ( mimeType === "text/html" || mimeType === "text/markdown" ) { - const fileSizeMB = size.value / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_TEXT_SIZE) { + try { + checkFileSize(size.value, MAX_ATTACHMENT_TEXT_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_TEXT_SIZE} MB`, ) @@ -322,6 +327,14 @@ export const getGmailSpreadsheetSheets = async ( mimeType === "application/vnd.ms-excel" || mimeType === "text/csv" ) { + try { + checkFileSize(size.value, MAX_ATTACHMENT_SHEET_SIZE) + } catch (error) { + Logger.error( + `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_SHEET_SIZE} MB`, + ) + return null + } const sheetsData = await processSpreadsheetFileWithSheetInfo( attachmentBuffer, filename, @@ -411,42 +424,7 @@ export const processSpreadsheetFileWithSheetInfo = async ( const worksheet = workbook.Sheets[sheetName] if (!worksheet) continue - // Get the range of the worksheet - const range = XLSX.utils.decode_range(worksheet["!ref"] || "A1") - const totalRows = range.e.r - range.s.r + 1 - - // Skip sheets with too many rows - if (totalRows > MAX_ATTACHMENT_SHEET_ROWS) { - Logger.warn( - `Sheet "${sheetName}" in ${filename} has ${totalRows} rows (max: ${MAX_ATTACHMENT_SHEET_ROWS}), skipping`, - ) - continue - } - - // Convert sheet to JSON array of arrays with a row limit - const sheetData: string[][] = XLSX.utils.sheet_to_json(worksheet, { - header: 1, - defval: "", - raw: false, - range: 0, // Start from first row - blankrows: false, - }) - - // Clean and get valid rows - const validRows = sheetData.filter((row) => - row.some((cell) => cell && cell.toString().trim().length > 0), - ) - - if (validRows.length === 0) { - Logger.debug(`Sheet "${sheetName}" has no valid content, skipping`) - continue - } - - // Chunk the rows for this specific sheet - const sheetChunks = chunkSheetRows(validRows) - const filteredSheetChunks = sheetChunks.filter( - (chunk) => chunk.trim().length > 0, - ) + const filteredSheetChunks = chunkSheetWithHeaders(worksheet); if (filteredSheetChunks.length === 0) { Logger.debug( @@ -524,51 +502,3 @@ export const processSpreadsheetFileWithSheetInfo = async ( } } } - -// Function to chunk sheet rows (simplified version of chunkFinalRows) -const chunkSheetRows = (allRows: string[][]): string[] => { - const chunks: string[] = [] - let currentChunk = "" - let totalTextLength = 0 - const MAX_CHUNK_SIZE = 512 - - for (const row of allRows) { - // Filter out numerical cells and empty strings, join textual cells - const textualCells = row - .filter( - (cell) => - cell && isNaN(Number(cell)) && cell.toString().trim().length > 0, - ) - .map((cell) => cell.toString().trim()) - - if (textualCells.length === 0) continue - - const rowText = textualCells.join(" ") - - // Check if adding this rowText would exceed the maximum text length - if (totalTextLength + rowText.length > MAX_ATTACHMENT_SHEET_TEXT_LEN) { - Logger.warn( - `Text length exceeded for spreadsheet, stopping at ${totalTextLength} characters`, - ) - // If we have some chunks, return them; otherwise return empty - return chunks.length > 0 ? chunks : [] - } - - totalTextLength += rowText.length - - if ((currentChunk + " " + rowText).trim().length > MAX_CHUNK_SIZE) { - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - currentChunk = rowText - } else { - currentChunk += (currentChunk ? " " : "") + rowText - } - } - - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - - return chunks -} diff --git a/server/integrations/microsoft/attachment-utils.ts b/server/integrations/microsoft/attachment-utils.ts index 228e8087d..1b62276b2 100644 --- a/server/integrations/microsoft/attachment-utils.ts +++ b/server/integrations/microsoft/attachment-utils.ts @@ -11,14 +11,15 @@ import { MAX_ATTACHMENT_TEXT_SIZE, MAX_ATTACHMENT_DOCX_SIZE, MAX_ATTACHMENT_PPTX_SIZE, - MAX_ATTACHMENT_SHEET_ROWS, - MAX_ATTACHMENT_SHEET_TEXT_LEN, + MAX_ATTACHMENT_SHEET_SIZE, } from "@/integrations/google/config" import * as XLSX from "xlsx" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" import { makeGraphApiCall, type MicrosoftGraphClient } from "./client" +import { chunkSheetWithHeaders } from "@/sheetChunk" +import { checkFileSize } from "../dataSource" const Logger = getLogger(Subsystem.Integrations).child({ module: "microsoft-attachments", @@ -194,8 +195,9 @@ export const getOutlookAttachmentChunks = async ( // Process based on MIME type if (mimeType === "application/pdf") { - const fileSizeMB = size / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_PDF_SIZE) { + try { + checkFileSize(size, MAX_ATTACHMENT_PDF_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_PDF_SIZE} MB`, ) @@ -215,8 +217,9 @@ export const getOutlookAttachmentChunks = async ( "application/vnd.openxmlformats-officedocument.wordprocessingml.document" || mimeType === "application/msword" ) { - const fileSizeMB = size / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_DOCX_SIZE) { + try { + checkFileSize(size, MAX_ATTACHMENT_DOCX_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_DOCX_SIZE} MB`, ) @@ -236,8 +239,9 @@ export const getOutlookAttachmentChunks = async ( "application/vnd.openxmlformats-officedocument.presentationml.presentation" || mimeType === "application/vnd.ms-powerpoint" ) { - const fileSizeMB = size / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_PPTX_SIZE) { + try { + checkFileSize(size, MAX_ATTACHMENT_PPTX_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_PPTX_SIZE} MB`, ) @@ -257,8 +261,9 @@ export const getOutlookAttachmentChunks = async ( mimeType === "text/html" || mimeType === "text/markdown" ) { - const fileSizeMB = size / (1024 * 1024) - if (fileSizeMB > MAX_ATTACHMENT_TEXT_SIZE) { + try { + checkFileSize(size, MAX_ATTACHMENT_TEXT_SIZE) + } catch (error) { Logger.error( `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_TEXT_SIZE} MB`, ) @@ -360,6 +365,14 @@ export const getOutlookSpreadsheetSheets = async ( mimeType === "application/vnd.ms-excel" || mimeType === "text/csv" ) { + try { + checkFileSize(size, MAX_ATTACHMENT_SHEET_SIZE) + } catch (error) { + Logger.error( + `Ignoring ${filename} as its more than ${MAX_ATTACHMENT_SHEET_SIZE} MB`, + ) + return null + } const sheetsData = await processSpreadsheetFileWithSheetInfo( attachmentBuffer, filename, @@ -441,39 +454,7 @@ export const processSpreadsheetFileWithSheetInfo = async ( const worksheet = workbook.Sheets[sheetName] if (!worksheet) continue - // Get the range of the worksheet - const range = XLSX.utils.decode_range(worksheet["!ref"] || "A1") - const totalRows = range.e.r - range.s.r + 1 - - // Skip sheets with too many rows - if (totalRows > MAX_ATTACHMENT_SHEET_ROWS) { - Logger.warn( - `Sheet "${sheetName}" in ${filename} has ${totalRows} rows (max: ${MAX_ATTACHMENT_SHEET_ROWS}), skipping`, - ) - continue - } - - // Convert sheet to JSON array of arrays with a row limit - const sheetData: string[][] = XLSX.utils.sheet_to_json(worksheet, { - header: 1, - defval: "", - raw: false, - range: 0, // Start from first row - blankrows: false, - }) - - // Clean and get valid rows - const validRows = sheetData.filter((row) => - row.some((cell) => cell && cell.toString().trim().length > 0), - ) - - if (validRows.length === 0) { - Logger.debug(`Sheet "${sheetName}" has no valid content, skipping`) - continue - } - - // Chunk the rows for this specific sheet - const sheetChunks = chunkSheetRows(validRows) + const sheetChunks = chunkSheetWithHeaders(worksheet) const filteredSheetChunks = sheetChunks.filter( (chunk) => chunk.trim().length > 0, ) @@ -555,54 +536,6 @@ export const processSpreadsheetFileWithSheetInfo = async ( } } -// Function to chunk sheet rows (simplified version of chunkFinalRows) -const chunkSheetRows = (allRows: string[][]): string[] => { - const chunks: string[] = [] - let currentChunk = "" - let totalTextLength = 0 - const MAX_CHUNK_SIZE = 512 - - for (const row of allRows) { - // Filter out numerical cells and empty strings, join textual cells - const textualCells = row - .filter( - (cell) => - cell && isNaN(Number(cell)) && cell.toString().trim().length > 0, - ) - .map((cell) => cell.toString().trim()) - - if (textualCells.length === 0) continue - - const rowText = textualCells.join(" ") - - // Check if adding this rowText would exceed the maximum text length - if (totalTextLength + rowText.length > MAX_ATTACHMENT_SHEET_TEXT_LEN) { - Logger.warn( - `Text length exceeded for spreadsheet, stopping at ${totalTextLength} characters`, - ) - // If we have some chunks, return them; otherwise return empty - return chunks.length > 0 ? chunks : [] - } - - totalTextLength += rowText.length - - if ((currentChunk + " " + rowText).trim().length > MAX_CHUNK_SIZE) { - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - currentChunk = rowText - } else { - currentChunk += (currentChunk ? " " : "") + rowText - } - } - - if (currentChunk.trim().length > 0) { - chunks.push(currentChunk.trim()) - } - - return chunks -} - // Helper function to check if a file is a spreadsheet export const isSpreadsheetFile = (mimeType: string): boolean => { return ( diff --git a/server/integrations/ribbie/index.ts b/server/integrations/ribbie/index.ts index 6ed96f727..def6ff6b8 100644 --- a/server/integrations/ribbie/index.ts +++ b/server/integrations/ribbie/index.ts @@ -488,7 +488,7 @@ class RIBBIECircularDownloader { // STEP 3: Process PDF into chunks Logger.info('⚙️ Extracting text and chunks from PDF...'); - const processingResult = await FileProcessorService.processFile( + const processingResults = await FileProcessorService.processFile( pdfBuffer, 'application/pdf', fileName, @@ -498,6 +498,12 @@ class RIBBIECircularDownloader { false // Don't describe images ); + // For PDFs, we expect only one result, but handle array for consistency + const processingResult = processingResults[0]; + if (!processingResult) { + throw new Error('No processing result returned for PDF'); + } + Logger.info(`✅ Extracted ${processingResult.chunks.length} text chunks and ${processingResult.image_chunks.length} image chunks`); // STEP 4: Create proper storage path (following your app's pattern) @@ -749,4 +755,4 @@ if (import.meta.main) { } testCompleteFlow().catch(console.error); -} \ No newline at end of file +} diff --git a/server/lib/duckdb.ts b/server/lib/duckdb.ts new file mode 100644 index 000000000..c18ebdbc8 --- /dev/null +++ b/server/lib/duckdb.ts @@ -0,0 +1,223 @@ +import { Database } from "duckdb-async"; +import { getLogger } from "@/logger"; +import { Subsystem } from "@/types"; +import type { DuckDBResult } from "@/types"; +import { analyzeQueryAndGenerateSQL } from "./sqlInference"; +import { validateSQLQuery } from "./sqlValidator"; +import { writeFileSync, createWriteStream, promises as fs } from "fs"; +import { join } from "path"; +import { tmpdir, cpus } from "os"; + +const Logger = getLogger(Subsystem.Integrations).child({ + module: "duckdb", +}); + +export const querySheetChunks = async ( + sheetChunks: string[], + userQuery: string, +): Promise => { + if (!sheetChunks.length) { + return null; + } + + Logger.debug("Processing sheet chunks with DuckDB"); + + // Clean HTML tags from sheet chunks + const cleanedSheetChunks = sheetChunks.map(chunk => + chunk.replace(/<\/?hi>/g, '') + ); + + // Create a temporary CSV file using streaming for large data + const tmpDir = tmpdir() + const tempFilePath = join(tmpDir, `duckdb_temp_${Date.now()}.tsv`); + Logger.debug(`Writing ${cleanedSheetChunks.length} chunks to temporary file: ${tempFilePath}`); + + if (cleanedSheetChunks.length > 100) { + // Use streaming for large datasets + const ws = createWriteStream(tempFilePath, { encoding: "utf8" }); + for (const chunk of cleanedSheetChunks) { + ws.write(chunk); + ws.write('\n'); + } + await new Promise((resolve, reject) => { + ws.on('finish', resolve); + ws.on('error', reject); + ws.end(); + }); + Logger.debug("Large dataset written using streaming"); + } else { + // Use simple write for small datasets + const combinedData = cleanedSheetChunks.join('\n'); + writeFileSync(tempFilePath, combinedData); + Logger.debug("Small dataset written using simple write"); + } + + // Use on-disk DB and tune pragmas for large files + const dbPath = join(tmpDir, `xyne_${Date.now()}.duckdb`); + const db = await Database.create(dbPath); + const connection = await db.connect(); + + Logger.debug("Setting up DuckDB pragmas for large file processing"); + await connection.run(`PRAGMA temp_directory='${tmpDir}'`); + await connection.run(`PRAGMA threads=${Math.max(1, Math.floor(cpus().length / 2))}`); + await connection.run(`PRAGMA memory_limit='4GB'`); + + const tableName = `v_${Date.now().toString(36)}`; + const startTime = Date.now(); + + try { + Logger.debug(`Creating VIEW ${tableName} over CSV file: ${tempFilePath}`); + + // 1) Create a VIEW over the CSV (no materialization) + Logger.debug(`Escaped path: ${tempFilePath}`); + + try { + await connection.run(` + CREATE OR REPLACE VIEW ${tableName} AS + SELECT * FROM read_csv( + '${tempFilePath}', + delim='\t', + header=true, + quote='"', + escape='"', + null_padding=true, + ignore_errors=false, + strict_mode=false, + sample_size=100000 + ) + `); + Logger.debug(`VIEW ${tableName} created successfully`); + } catch (viewError) { + Logger.error(`Failed to create VIEW ${tableName}:`, viewError); + throw viewError; + } + + // 2) Get schema without loading all rows + Logger.debug(`Getting schema for ${tableName}`); + const schemaResult = await connection.all(`DESCRIBE ${tableName}`); + const schema = schemaResult + .map((col: any) => `${col.column_name}: ${col.column_type}`) + .join('\n'); + Logger.debug(`Schema obtained: ${schema}`); + + // 3) Get sample rows from the source (small scan only) + Logger.debug(`Getting sample rows from ${tableName}`); + const sampleRowsRes = await connection.all( + `SELECT * FROM ${tableName} LIMIT 5` + ); + Logger.debug(`Sample rows obtained: ${sampleRowsRes.length} rows`); + + // Build sample rows text for prompt + const sampleRowsHeader = schemaResult.map((c: any) => c.column_name).join('\t'); + const sampleRowsBody = sampleRowsRes + .map((r: any) => schemaResult.map((c: any) => String(r[c.column_name] ?? '')).join('\t')) + .join('\n'); + const sampleRows = `${sampleRowsHeader}\n${sampleRowsBody}`; + Logger.debug(`Sample rows text prepared: ${sampleRows.length} characters`); + + // 4) Generate SQL using the schema + samples + Logger.debug(`Generating SQL for query: ${userQuery}`); + const duckDBQuery = await analyzeQueryAndGenerateSQL( + userQuery, + tableName, + schema, + sampleRows, + ); + + if (!duckDBQuery) { + Logger.warn("Failed to generate DuckDB query, returning null"); + return null; + } + Logger.debug(`Generated SQL: ${duckDBQuery.sql}`); + + // Validate and sanitize the generated SQL using AST parsing + Logger.debug("Validating generated SQL for security and correctness"); + const validationResult = validateSQLQuery(duckDBQuery.sql, tableName, { + allowSubqueries: true, + allowJoins: false, + allowWindowFunctions: true, + allowCTEs: true, + }); + + if (!validationResult.isValid) { + Logger.error(`SQL validation failed: ${validationResult.error}`); + throw new Error(`SQL validation failed: ${validationResult.error}`); + } + + if (validationResult.warnings && validationResult.warnings.length > 0) { + Logger.warn(`SQL validation warnings: ${validationResult.warnings.join(", ")}`); + } + + const finalSQL = validationResult.sanitizedSQL || duckDBQuery.sql; + Logger.debug(`Final validated SQL: ${finalSQL}`); + + Logger.debug(`Executing DuckDB query: ${finalSQL}`); + const result = await connection.all(finalSQL); + const elapsedMs = Date.now() - startTime; + Logger.debug(`Query executed successfully, returned ${result.length} rows in ${elapsedMs}ms`); + + if (result.length === 0) { + Logger.warn("DuckDB query returned no results, returning null"); + return null; + } + + const columns = Object.keys(result[0] ?? {}); + const rows = [columns, ...result.map((row: any) => Object.values(row))]; + + const resultPackage: DuckDBResult = { + user_question: userQuery, + sql: finalSQL, // Use the validated and sanitized SQL + execution_meta: { + row_count: result.length, + elapsed_ms: elapsedMs, + as_of: new Date().toISOString(), + }, + schema_fragment: { + table: tableName, // it's a VIEW + columns: schemaResult.reduce((acc: Record, col: any) => { + acc[col.column_name] = col.column_type; + return acc; + }, {}), + }, + assumptions: [duckDBQuery.notes], + data: { rows }, + }; + + Logger.debug("DuckDB processing completed successfully"); + return resultPackage; + } catch (error) { + Logger.error("Error querying with DuckDB:", error); + return null; + } finally { + // Clean up + Logger.debug("Cleaning up DuckDB resources"); + try { + if (connection) await connection.close(); + if (db) await db.close(); + Logger.debug("DuckDB connection and database closed"); + } catch (e) { + Logger.warn("Error closing DuckDB resources:", e); + } + + // Clean up temporary TSV file + try { + await fs.unlink(tempFilePath); + Logger.debug(`Temporary TSV file deleted: ${tempFilePath}`); + } catch (e) { + Logger.warn(`Failed to delete temporary TSV file ${tempFilePath}:`, e); + } + + // Clean up temporary DuckDB file + try { + await fs.stat(dbPath); + await fs.unlink(dbPath); + Logger.debug(`Temporary DuckDB file deleted: ${dbPath}`); + } catch (e) { + if (e instanceof Error && 'code' in e && e.code === 'ENOENT') { + Logger.debug(`Temporary DuckDB file already removed: ${dbPath}`); + } else { + Logger.warn(`Failed to delete temporary DuckDB file ${dbPath}:`, e); + } + } + } +}; diff --git a/server/lib/sqlInference.ts b/server/lib/sqlInference.ts new file mode 100644 index 000000000..0dc47b688 --- /dev/null +++ b/server/lib/sqlInference.ts @@ -0,0 +1,129 @@ +import { getLogger } from "@/logger" +import { Subsystem } from "@/types" +import type { DuckDBQuery } from "@/types"; +import { getProviderByModel } from "@/ai/provider" +import type { Models } from "@/ai/types" +import { type Message } from "@aws-sdk/client-bedrock-runtime" +import config from "@/config" + +const Logger = getLogger(Subsystem.Integrations).child({ + module: "sqlInference", +}) + +/** + * Combined function that classifies if a query is metric-related and generates SQL if it is + * @param query The user's query to analyze + * @param tableName The name of the table to query + * @param schema The schema of the table + * @param fewShotSamples Example rows for few-shot learning + * @returns DuckDBQuery if metric-related, null if not + */ +export const analyzeQueryAndGenerateSQL = async ( + query: string, + tableName: string, + schema: string, + fewShotSamples: string +): Promise => { + const model : Models = config.sqlInferenceModel as Models + if (!model) { + Logger.warn("SQL inference model not set, returning null"); + return null; + } + Logger.debug(`Analyzing query and generating SQL`); + + const stripNoise = (s: string) => { + let t = s.trim(); + // remove all code fences + t = t.replace(/```(?:json)?/gi, "").replace(/```/g, ""); + // remove leading/trailing non-JSON text + const start = t.indexOf("{"); + const end = t.lastIndexOf("}"); + if (start !== -1 && end !== -1 && end > start) t = t.slice(start, end + 1); + return t.trim(); + }; + + const prompt = `You are a query analyzer and DuckDB SQL generator. + +First, determine if the user is asking for metrics/statistics/numerical data. +- Metric-related queries: count, counts, sums, averages, KPIs, financial figures, quantitative analysis +- Non-metric queries: descriptive information, definitions, qualitative info, names, categories, context, text-only attributes + +If the query is NOT metric-related, respond with: {"isMetric": false, "sql": null, "notes": "Query is not metric-related"} + +If the query IS metric-related, generate DuckDB SQL following this schema: +{ + "isMetric": true, + "sql": "SELECT ...", + "notes": "brief reasoning in 1-2 lines" +} + +Rules for SQL generation: +- Target database: DuckDB (SQL dialect = DuckDB) +- Use ONLY the provided schema and column names. Do NOT invent fields +- Output a SINGLE statement. No CTEs with CREATE/INSERT/UPDATE/DELETE. SELECT-only +- Disallow: INSTALL, LOAD, PRAGMA, COPY, EXPORT, ATTACH, DETACH, CALL, CREATE/ALTER/DROP, SET/RESET +- Output must be a single-line minified JSON object. Do NOT include markdown, code fences, comments, or any prose +- If ambiguous, choose the simplest interpretation and state the assumption in "notes" + +Context: +- User question: ${query} +- Available tables and columns with types and short descriptions: +table name: ${tableName} +schema: ${schema} +- Example rows (up to 5 per table; strings truncated): +${fewShotSamples}`; + + try { + const provider = getProviderByModel(model); + + const messages: Message[] = [ + { + role: "user", + content: [{ text: prompt }] + } + ] + + const modelParams = { + modelId: model, + temperature: 0.1, + max_new_tokens: 512, + stream: false, + systemPrompt: "You are a helpful assistant that analyzes queries and generates SQL when appropriate." + } + + const response = await provider.converse(messages, modelParams); + const responseText = response.text || ""; + + const cleaned = stripNoise(responseText); + let parsedResponse: { isMetric: boolean; sql: string | null; notes: string }; + + try { + parsedResponse = JSON.parse(cleaned); + } catch (e) { + Logger.error("Failed to parse cleaned LLM response as JSON", { cleaned }); + throw e; + } + + if (!parsedResponse.isMetric) { + Logger.debug(`Query is not metric-related: ${parsedResponse.notes}`); + return null; + } + + if (!parsedResponse.sql) { + Logger.warn("LLM indicated metric query but provided no SQL"); + return null; + } + + const result: DuckDBQuery = { + sql: parsedResponse.sql, + notes: parsedResponse.notes + }; + + return result; + } catch (error) { + Logger.error("Failed to analyze query and generate SQL:", error); + return null; + } +} + + diff --git a/server/lib/sqlValidator.ts b/server/lib/sqlValidator.ts new file mode 100644 index 000000000..a855f95a5 --- /dev/null +++ b/server/lib/sqlValidator.ts @@ -0,0 +1,319 @@ +import { Parser } from "node-sql-parser"; +import { getLogger } from "@/logger"; +import { Subsystem } from "@/types"; + +const Logger = getLogger(Subsystem.Integrations).child({ + module: "sqlValidator", +}); + +export interface SQLValidationResult { + isValid: boolean; + sanitizedSQL?: string; + error?: string; + warnings?: string[]; +} + +export interface SQLValidationOptions { + allowedViewName: string; + allowSubqueries?: boolean; + allowJoins?: boolean; + allowWindowFunctions?: boolean; + allowCTEs?: boolean; +} + +/** + * Comprehensive SQL validator using AST parsing for security and correctness + */ +export class SQLValidator { + private parser: Parser; + private options: SQLValidationOptions; + + constructor(options: SQLValidationOptions) { + this.parser = new Parser(); + this.options = { + allowSubqueries: true, + allowJoins: false, + allowWindowFunctions: true, + allowCTEs: true, + ...options, + }; + } + + /** + * Validates and sanitizes SQL query using AST analysis + */ + public validateSQL(sql: string): SQLValidationResult { + try { + Logger.debug(`Validating SQL: ${sql}`); + + // Parse SQL into AST + const ast = this.parseSQL(sql); + if (!ast) { + return { + isValid: false, + error: "Failed to parse SQL syntax", + }; + } + + // Check for multiple statements + if (Array.isArray(ast)) { + return { + isValid: false, + error: "Multiple statements not allowed", + }; + } + + // Validate statement type + const statementTypeValidation = this.validateStatementType(ast); + if (!statementTypeValidation.isValid) { + return statementTypeValidation; + } + + // Validate table access + const tableValidation = this.validateTableAccess(sql); + if (!tableValidation.isValid) { + return tableValidation; + } + + // Validate query structure + const structureValidation = this.validateQueryStructure(ast); + if (!structureValidation.isValid) { + return structureValidation; + } + + Logger.debug(`SQL validation successful: ${sql}`); + return { + isValid: true, + sanitizedSQL: sql, + warnings: this.collectWarnings(ast), + }; + } catch (error) { + Logger.error("SQL validation error:", error); + return { + isValid: false, + error: `Validation error: ${error instanceof Error ? error.message : String(error)}`, + }; + } + } + + private parseSQL(sql: string): any { + try { + return this.parser.astify(sql); + } catch (error) { + Logger.error("SQL parsing failed:", error); + return null; + } + } + + private validateStatementType(ast: any): SQLValidationResult { + const allowedTypes = ["select", "with"]; + + if (!allowedTypes.includes(ast.type?.toLowerCase())) { + return { + isValid: false, + error: `Statement type '${ast.type}' is not allowed. Only SELECT and WITH statements are permitted.`, + }; + } + + return { isValid: true }; + } + + private validateTableAccess(sql: string): SQLValidationResult { + try { + const tableList = this.parser.tableList(sql); + Logger.debug("Raw table list:", tableList); + const allowedViewName = this.options.allowedViewName.toLowerCase(); + + for (const table of tableList) { + // Extract the actual table name from the complex string format + const tableName = this.extractTableNameFromString(table); + Logger.debug(`Extracted table name: "${tableName}" from "${table}"`); + + if (tableName && tableName.toLowerCase() !== allowedViewName) { + return { + isValid: false, + error: `Access to table '${tableName}' is not allowed. Only '${this.options.allowedViewName}' is permitted.`, + }; + } + } + + return { isValid: true }; + } catch (error) { + Logger.error("Table access validation failed:", error); + return { + isValid: false, + error: "Failed to validate table access", + }; + } + } + + private extractTableNameFromString(tableString: string): string | null { + if (!tableString) return null; + + // Handle the format "select::null::table_name" or similar + const parts = tableString.split('::'); + if (parts.length >= 3) { + // Return the last part which should be the actual table name + return parts[parts.length - 1]; + } + + // If it's a simple table name, return as is + return tableString; + } + + private validateQueryStructure(ast: any): SQLValidationResult { + const warnings: string[] = []; + + // Check for subqueries + if (this.hasSubqueries(ast) && !this.options.allowSubqueries) { + return { + isValid: false, + error: "Subqueries are not allowed", + }; + } + + // Check for joins + if (this.hasJoins(ast) && !this.options.allowJoins) { + return { + isValid: false, + error: "Joins are not allowed", + }; + } + + // Check for window functions + if (this.hasWindowFunctions(ast) && !this.options.allowWindowFunctions) { + return { + isValid: false, + error: "Window functions are not allowed", + }; + } + + // Check for CTEs + if (ast.type === "with" && !this.options.allowCTEs) { + return { + isValid: false, + error: "Common Table Expressions (CTEs) are not allowed", + }; + } + + return { isValid: true, warnings }; + } + + private hasSubqueries(ast: any): boolean { + if (!ast) return false; + + // Check if any FROM clause contains a subquery + if (ast.from) { + for (const fromItem of Array.isArray(ast.from) ? ast.from : [ast.from]) { + if (fromItem.expr && fromItem.expr.type === "select") { + return true; + } + } + } + + // Check WHERE clause for subqueries + if (ast.where && this.hasSubqueryInExpression(ast.where)) { + return true; + } + + // Check HAVING clause for subqueries + if (ast.having && this.hasSubqueryInExpression(ast.having)) { + return true; + } + + return false; + } + + private hasSubqueryInExpression(expr: any): boolean { + if (!expr) return false; + + if (expr.type === "select") return true; + + if (expr.left && this.hasSubqueryInExpression(expr.left)) return true; + if (expr.right && this.hasSubqueryInExpression(expr.right)) return true; + if (expr.operand && this.hasSubqueryInExpression(expr.operand)) return true; + + return false; + } + + private hasJoins(ast: any): boolean { + if (!ast || !ast.from) return false; + + const fromItems = Array.isArray(ast.from) ? ast.from : [ast.from]; + return fromItems.some((item: any) => item.join); + } + + private hasWindowFunctions(ast: any): boolean { + if (!ast) return false; + + // Check SELECT columns for window functions + if (ast.columns) { + for (const column of ast.columns) { + if (this.hasWindowFunctionInExpression(column)) { + return true; + } + } + } + + return false; + } + + private hasWindowFunctionInExpression(expr: any): boolean { + if (!expr) return false; + + if (expr.type === "function" && expr.over) { + return true; + } + + if (expr.left && this.hasWindowFunctionInExpression(expr.left)) return true; + if (expr.right && this.hasWindowFunctionInExpression(expr.right)) return true; + if (expr.operand && this.hasWindowFunctionInExpression(expr.operand)) return true; + + return false; + } + + private astToSQL(ast: any): string | null { + try { + // Convert AST back to SQL without any modifications + return this.parser.sqlify(ast); + } catch (error) { + Logger.error("Failed to convert AST to SQL:", error); + return null; + } + } + + private collectWarnings(ast: any): string[] { + const warnings: string[] = []; + + // Check for potential performance issues + if (this.hasSubqueries(ast)) { + warnings.push("Query contains subqueries which may impact performance"); + } + + if (this.hasJoins(ast)) { + warnings.push("Query contains joins which may impact performance"); + } + + if (this.hasWindowFunctions(ast)) { + warnings.push("Query contains window functions which may impact performance"); + } + + return warnings; + } +} + +/** + * Convenience function to validate SQL with default options + */ +export function validateSQLQuery( + sql: string, + allowedViewName: string, + options?: Partial +): SQLValidationResult { + const validator = new SQLValidator({ + allowedViewName, + ...options, + }); + + return validator.validateSQL(sql); +} diff --git a/server/package.json b/server/package.json index ced1b0697..b2bf46fae 100644 --- a/server/package.json +++ b/server/package.json @@ -88,6 +88,7 @@ "cors": "^2.8.5", "drizzle-orm": "^0.44.5", "drizzle-zod": "^0.8.3", + "duckdb-async": "1.4.0", "fast-xml-parser": "^5.2.5", "file-type": "^21.0.0", "google-auth-library": "^9.14.0", @@ -102,6 +103,7 @@ "livekit-server-sdk": "^2.13.3", "llama3-tokenizer-js": "^1.2.0", "nanoid": "^5.1.5", + "node-sql-parser": "^5.3.12", "ollama": "^0.5.11", "openai": "^5.16.0", "ora": "^8.1.1", diff --git a/server/queue/fileProcessor.ts b/server/queue/fileProcessor.ts index 9dea88d8c..7d94f28f2 100644 --- a/server/queue/fileProcessor.ts +++ b/server/queue/fileProcessor.ts @@ -1,7 +1,7 @@ import { getLogger } from "@/logger" import { Subsystem, ProcessingJobType } from "@/types" import { getErrorMessage } from "@/utils" -import { FileProcessorService } from "@/services/fileProcessor" +import { FileProcessorService, type SheetProcessingResult } from "@/services/fileProcessor" import { insert } from "@/search/vespa" import { Apps, KbItemsSchema, KnowledgeBaseEntity } from "@xyne/vespa-ts/types" import { getBaseMimeType } from "@/integrations/dataSource/config" @@ -195,7 +195,7 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) { const fileBuffer = await readFile(file.storagePath) // Process file to extract content - const processingResult = await FileProcessorService.processFile( + const processingResults = await FileProcessorService.processFile( fileBuffer, file.mimeType || "application/octet-stream", file.fileName, @@ -203,60 +203,91 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) { file.storagePath, ) - // Create Vespa document with proper fileName (matching original logic) - const targetPath = file.path - - // Reconstruct the original filePath (full path from collection root) - const reconstructedFilePath = targetPath === "/" - ? file.fileName - : targetPath.substring(1) + file.fileName // Remove leading "/" and add filename - - const vespaFileName = - targetPath === "/" - ? file.collectionName + targetPath + reconstructedFilePath // Uses full path for root - : file.collectionName + targetPath + file.fileName // Uses filename for nested - - const vespaDoc = { - docId: file.vespaDocId, - clId: file.collectionId, - itemId: file.id, - fileName: vespaFileName, - app: Apps.KnowledgeBase as const, - entity: KnowledgeBaseEntity.File, - description: "", - storagePath: file.storagePath, - chunks: processingResult.chunks, - chunks_pos: processingResult.chunks_pos, - image_chunks: processingResult.image_chunks, - image_chunks_pos: processingResult.image_chunks_pos, - chunks_map: processingResult.chunks_map, - image_chunks_map: processingResult.image_chunks_map, - metadata: JSON.stringify({ - originalFileName: file.originalName || file.fileName, - uploadedBy: file.uploadedByEmail || "system", - chunksCount: processingResult.chunks.length, - imageChunksCount: processingResult.image_chunks.length, - processingMethod: getBaseMimeType(file.mimeType || "text/plain"), - lastModified: Date.now(), - }), - createdBy: file.uploadedByEmail || "system", - duration: 0, - mimeType: getBaseMimeType(file.mimeType || "text/plain"), - fileSize: file.fileSize || 0, - createdAt: Date.now(), - updatedAt: Date.now(), - clFd: file.parentId, + // Handle multiple processing results (e.g., for spreadsheets with multiple sheets) + let totalChunksCount = 0 + let newVespaDocId = "" + if(processingResults.length > 0 && 'totalSheets' in processingResults[0]) { + newVespaDocId = `${file.vespaDocId}_sheet_${(processingResults[0] as SheetProcessingResult).totalSheets}` + } else { + newVespaDocId = file.vespaDocId } + for (const [resultIndex, processingResult] of processingResults.entries()) { + // Create Vespa document with proper fileName (matching original logic) + const targetPath = file.path + + // Reconstruct the original filePath (full path from collection root) + const reconstructedFilePath = targetPath === "/" + ? file.fileName + : targetPath.substring(1) + file.fileName // Remove leading "/" and add filename + + let vespaFileName = + targetPath === "/" + ? file.collectionName + targetPath + reconstructedFilePath // Uses full path for root + : file.collectionName + targetPath + file.fileName // Uses filename for nested + + // For sheet processing results, append sheet information to fileName + let docId = file.vespaDocId + if ('sheetName' in processingResult) { + const sheetResult = processingResult as SheetProcessingResult + vespaFileName = processingResults.length > 1 + ? `${vespaFileName} / ${sheetResult.sheetName}` + : vespaFileName + docId = sheetResult.docId + } else if (processingResults.length > 1) { + // For non-sheet files with multiple results, append index + vespaFileName = `${vespaFileName} (${resultIndex + 1})` + docId = `${file.vespaDocId}_${resultIndex}` + } - // Insert into Vespa - await insert(vespaDoc, KbItemsSchema) + const vespaDoc = { + docId: docId, + clId: file.collectionId, + itemId: file.id, + fileName: vespaFileName, + app: Apps.KnowledgeBase as const, + entity: KnowledgeBaseEntity.File, + description: "", + storagePath: file.storagePath, + chunks: processingResult.chunks, + chunks_pos: processingResult.chunks_pos, + image_chunks: processingResult.image_chunks, + image_chunks_pos: processingResult.image_chunks_pos, + chunks_map: processingResult.chunks_map, + image_chunks_map: processingResult.image_chunks_map, + metadata: JSON.stringify({ + originalFileName: file.originalName || file.fileName, + uploadedBy: file.uploadedByEmail || "system", + chunksCount: processingResult.chunks.length, + imageChunksCount: processingResult.image_chunks.length, + processingMethod: getBaseMimeType(file.mimeType || "text/plain"), + lastModified: Date.now(), + ...(('sheetName' in processingResult) && { + sheetName: (processingResult as SheetProcessingResult).sheetName, + sheetIndex: (processingResult as SheetProcessingResult).sheetIndex, + totalSheets: (processingResult as SheetProcessingResult).totalSheets, + }), + }), + createdBy: file.uploadedByEmail || "system", + duration: 0, + mimeType: getBaseMimeType(file.mimeType || "text/plain"), + fileSize: file.fileSize || 0, + createdAt: Date.now(), + updatedAt: Date.now(), + clFd: file.parentId, + } + + // Insert into Vespa + await insert(vespaDoc, KbItemsSchema) + + totalChunksCount += processingResult.chunks.length + processingResult.image_chunks.length + } // Update status to completed - const chunksCount = - processingResult.chunks.length + processingResult.image_chunks.length + const chunksCount = totalChunksCount await db .update(collectionItems) .set({ + vespaDocId: newVespaDocId, uploadStatus: UploadStatus.COMPLETED, statusMessage: `Successfully processed: ${chunksCount} chunks extracted from ${file.fileName}`, processedAt: new Date(), diff --git a/server/services/fileProcessor.ts b/server/services/fileProcessor.ts index bdc363989..a55315356 100644 --- a/server/services/fileProcessor.ts +++ b/server/services/fileProcessor.ts @@ -6,6 +6,7 @@ import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" import { chunkByOCRFromBuffer } from "@/lib/chunkByOCR" import { type ChunkMetadata } from "@/types" +import { chunkSheetWithHeaders } from "@/sheetChunk" import * as XLSX from "xlsx" import { getBaseMimeType, @@ -29,6 +30,15 @@ export interface ProcessingResult { image_chunks_map: ChunkMetadata[] } +export interface SheetProcessingResult extends ProcessingResult { + sheetName: string + sheetIndex: number + totalSheets: number + docId: string +} + +type ProcessingResultArray = (ProcessingResult | SheetProcessingResult)[] + export class FileProcessorService { static async processFile( @@ -39,7 +49,7 @@ export class FileProcessorService { storagePath?: string, extractImages: boolean = false, describeImages: boolean = false, - ): Promise { + ): Promise { const baseMimeType = getBaseMimeType(mimeType || "text/plain") let chunks: string[] = [] let chunks_pos: number[] = [] @@ -50,10 +60,7 @@ export class FileProcessorService { if (baseMimeType === "application/pdf") { // Redirect PDF processing to OCR const result = await chunkByOCRFromBuffer(buffer, fileName, vespaDocId) - - - - return result + return [result] } else if (isDocxFile(baseMimeType)) { // Process DOCX const result = await extractTextAndImagesWithChunksFromDocx( @@ -86,40 +93,51 @@ export class FileProcessorService { } else { workbook = XLSX.readFile(storagePath) } - const allChunks: string[] = [] - for (const sheetName of workbook.SheetNames) { + if (!workbook.SheetNames || workbook.SheetNames.length === 0) { + throw new Error("No worksheets found in spreadsheet") + } + + const sheetResults: SheetProcessingResult[] = [] + + for (const [sheetIndex, sheetName] of workbook.SheetNames.entries()) { const worksheet = workbook.Sheets[sheetName] if (!worksheet) continue - const sheetData: string[][] = XLSX.utils.sheet_to_json(worksheet, { - header: 1, - defval: "", - raw: false, - }) - - const validRows = sheetData.filter((row) => - row.some((cell) => cell && cell.toString().trim().length > 0), + // Use the same header-preserving chunking function as dataSource integration + const sheetChunks = chunkSheetWithHeaders(worksheet) + + const filteredChunks = sheetChunks.filter( + (chunk) => chunk.trim().length > 0, ) - for (const row of validRows) { - const textualCells = row - .filter( - (cell) => - cell && - isNaN(Number(cell)) && - cell.toString().trim().length > 0, - ) - .map((cell) => cell.toString().trim()) - - if (textualCells.length > 0) { - allChunks.push(textualCells.join(" ")) - } + // Skip sheets with no valid content + if (filteredChunks.length === 0) continue + + // Generate a unique docId for each sheet + const sheetDocId = `${vespaDocId}_sheet_${sheetIndex}` + + const sheetResult: SheetProcessingResult = { + chunks: filteredChunks, + chunks_pos: filteredChunks.map((_, idx) => idx), + image_chunks: [], + image_chunks_pos: [], + chunks_map: [], + image_chunks_map: [], + sheetName, + sheetIndex, + totalSheets: workbook.SheetNames.length, + docId: sheetDocId, } + + sheetResults.push(sheetResult) } - chunks = allChunks - chunks_pos = allChunks.map((_, idx) => idx) + if (sheetResults.length === 0) { + throw new Error("No valid content found in any worksheet") + } + + return sheetResults } else if (isTextFile(baseMimeType)) { // Process text file const content = buffer.toString("utf-8") @@ -165,13 +183,13 @@ export class FileProcessorService { block_labels: ["image"], // Default block label })); - return { + return [{ chunks, chunks_pos, image_chunks, image_chunks_pos, chunks_map, image_chunks_map, - } + }] } } diff --git a/server/sheetChunk.ts b/server/sheetChunk.ts new file mode 100644 index 000000000..968c0fb45 --- /dev/null +++ b/server/sheetChunk.ts @@ -0,0 +1,465 @@ +import * as XLSX from "xlsx" + +// Type checking utilities for spreadsheet data +function isTimestamp(value: any): boolean { + if (typeof value === 'string') { + // Check for ISO timestamp format (YYYY-MM-DDTHH:mm:ss.sssZ) + const timestampRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?(Z|[+-]\d{2}:\d{2})?$/ + if (timestampRegex.test(value)) { + return !isNaN(Date.parse(value)) + } + + // Check for Unix timestamp (seconds or milliseconds since epoch) + const numValue = Number(value) + if (!isNaN(numValue)) { + // Unix timestamp should be reasonable (between 1970 and 2100) + const minTimestamp = 0 // 1970-01-01 in milliseconds (Unix epoch) + const maxTimestamp = 4102444800000 // 2100-01-01 in milliseconds + return numValue >= minTimestamp && numValue <= maxTimestamp + } + } + + // Check if it's a Date object + if (value instanceof Date) { + return !isNaN(value.getTime()) + } + + return false +} + +function isDate(value: any): boolean { + if (typeof value === 'string') { + // Check for date-only format (YYYY-MM-DD, MM/DD/YYYY, DD/MM/YYYY, etc.) + const dateRegex = /^\d{4}-\d{2}-\d{2}$|^\d{1,2}\/\d{1,2}\/\d{4}$|^\d{1,2}-\d{1,2}-\d{4}$/ + if (dateRegex.test(value)) { + return !isNaN(Date.parse(value)) + } + } + + if (value instanceof Date) { + return !isNaN(value.getTime()) + } + + return false +} + +function isTime(value: any): boolean { + if (typeof value === 'string') { + // Check for time-only format (HH:mm:ss, HH:mm, etc.) + const timeRegex = /^([01]?[0-9]|2[0-3]):[0-5][0-9](:[0-5][0-9])?$/ + return timeRegex.test(value) + } + + return false +} + +function isBoolean(value: any): boolean { + if (typeof value === 'boolean') { + return true + } + + if (typeof value === 'string') { + const lowerValue = value.toLowerCase().trim() + return ['true', 'false', 'yes', 'no', 'y', 'n'].includes(lowerValue) + } + + return false +} + +interface ChunkConfig { + maxChunkSize?: number + maxRowsPerChunk?: number + headerRows?: number +} + +interface ChunkingState { + headerRow: string + maxRowsPerChunk: number + maxChunkSize: number + columnCount: number +} + +interface ProcessedSheetData { + headerRow: string[] + dataRows: string[][] +} + +// XLSX Processing Functions + +function unmerge(sheet: XLSX.WorkSheet): void { + (sheet['!merges'] ?? []).forEach((rng) => { + const v = sheet[XLSX.utils.encode_cell({ r: rng.s.r, c: rng.s.c })]?.v + for (let R = rng.s.r; R <= rng.e.r; R++) { + for (let C = rng.s.c; C <= rng.e.c; C++) { + sheet[XLSX.utils.encode_cell({ r: R, c: C })] = { t: "s", v } + } + } + }) +} + +function buildHeaders(rows: any[][], headerRows = 1): { header: string[], dataRows: any[][] } { + if (rows.length === 0) { + return { header: [], dataRows: [] } + } + + const header = rows.slice(0, headerRows) + .reduce((acc, row) => + acc.map((prev, i) => `${prev}_${(row[i] ?? "").toString().trim()}`), + new Array(rows[0].length).fill("") + ) + .map(h => h.replace(/_{2,}/g, "_").replace(/^_+|_+$/g, "")) + + return { + header, + dataRows: rows.slice(headerRows) + } +} + +function guessHeaderRowsByDataTypes(rows: any[][], maxSearchRows = 3): number { + const isHeterogeneousRow = (row: any[]) => { + const types = row + .filter(cell => cell !== null && cell !== undefined && cell.toString().trim() !== '') + .map(cell => { + if (typeof cell === 'number' || !isNaN(Number(cell))) + return 'number' + if (isDate(cell)) + return 'date' + if (isTimestamp(cell)) + return 'timestamp' + if (isTime(cell)) + return 'time' + if (isBoolean(cell)) + return 'boolean' + return 'string' + }) + + const uniqueTypes = new Set(types) + return uniqueTypes.size >= 2 // Consider it heterogeneous if at least 2 types + } + + for (let i = 0; i < Math.min(maxSearchRows, rows.length); i++) { + if (isHeterogeneousRow(rows[i])) { + return i // rows before this are likely headers + } + } + + return 1 +} + + +function guessHeaderRowsByKeywords(rows: any[][], maxSearchRows = 3): number { + const headerKeywords = ['name', 'id', 'date', 'type', 'category', 'description', 'amount', 'total', 'value', 'region', 'country', 'state', 'city', 'zip', 'address', 'phone', 'email', 'website', 'url', 'link', 'title', 'subtitle', 'summary', 'description', 'notes', 'comments', 'remarks', 'details', 'information', 'data', 'statistics', 'metrics', 'measures'] + const lowerKeywords = headerKeywords.map(k => k.toLowerCase()) + + for (let i = 0; i < Math.min(maxSearchRows, rows.length); i++) { + const row = rows[i] + if (!row) continue + + const rowText = row.map(cell => (cell ?? '').toString().toLowerCase()) + + // Count how many cells contain header keywords + const keywordMatches = rowText.filter(cell => + lowerKeywords.some(kw => cell.includes(kw)) + ).length + + // Only consider it a header row if MOST cells contain keywords (not just one) + const totalCells = rowText.filter(cell => cell.trim().length > 0).length + if (totalCells > 0 && keywordMatches >= Math.ceil(totalCells * 0.6)) { + return i + 1 + } + } + return 1 +} + +function inferHeaderRows(input: XLSX.WorkSheet, rows: any[][], isDummyHeader = false): number { + let mergedHeaderRows = 1 + + // Check actual merged cells in XLSX + const merges = input['!merges'] ?? [] + let maxHeaderMergeRow = -1 + + merges.forEach(rng => { + // Only consider merges that START in the header area + if (rng.s.r < 4 && rng.s.r > maxHeaderMergeRow) { + maxHeaderMergeRow = rng.s.r + } + }) + mergedHeaderRows = maxHeaderMergeRow >= 0 ? maxHeaderMergeRow + 2 : 1 + mergedHeaderRows += isDummyHeader ? 1 : 0 + + if (rows.length === 0) return 1 + + const MAX_HEADER_ROWS = isDummyHeader ? 4 : 3 + + // Heuristic 2: Analyze data type patterns + const dataTypeHeaderRows = guessHeaderRowsByDataTypes(rows, MAX_HEADER_ROWS) + + // Heuristic 3: Look for header keywords + const keywordHeaderRows = guessHeaderRowsByKeywords(rows, MAX_HEADER_ROWS) + + // Choose the maximum of these heuristics, but cap at reasonable limit + const inferredRows = Math.max(mergedHeaderRows, dataTypeHeaderRows, keywordHeaderRows, 1) + return Math.min(inferredRows, MAX_HEADER_ROWS) +} + +function processSheetData(input: XLSX.WorkSheet, headerRowsParam?: number): ProcessedSheetData { + let rows: any[][] = [] + try { + // Use sheet_to_json with proper options to preserve empty cells and formatting + rows = XLSX.utils.sheet_to_json(input, { + header: 1, // Generate array of arrays + raw: false, // Use formatted strings (not raw values) + defval: "", // Use empty string for null/undefined values + }) + } catch (error) { + console.error("Error converting sheet to JSON:", error) + return { headerRow: [], dataRows: [] } + } + + let headerRows = headerRowsParam ?? inferHeaderRows(input, rows) + + if (rows.length === 0) { + return { headerRow: [], dataRows: [] } + } + + const isHeaderValid = rows.slice(0, headerRows).every(row => isHeaderRowValid(row)) + if (!isHeaderValid) { + const maxColumns = Math.max(...rows.map(row => row.length)) + const header = Array.from({ length: maxColumns }, (_, i) => `C${i + 1}`) + rows = [header, ...rows] + headerRows = inferHeaderRows(input, rows, true) + } + + // Build composite headers and extract data normally + const result = buildHeaders(rows, headerRows) + const header = result.header + const dataRows = result.dataRows + + // Filter out completely empty rows BEFORE adding row IDs + const validDataRows = dataRows.filter(isRowValid) + + // Add row_id as first column and normalize data + const fullHeader = ["row_id", ...header] + const rowsWithId = validDataRows.map((row, index) => [ + (index + 1).toString(), + ...row.map(cell => (cell ?? "").toString()) + ]) + + // Clear references to help garbage collection + rows = [] + + return { + headerRow: fullHeader, + dataRows: rowsWithId + } +} + +// Helper Functions + +/** + * Calculates byte length of a string using UTF-8 encoding + */ +const getByteLength = (str: string): number => Buffer.byteLength(str, "utf8") + +/** + * Cleans illegal UTF-8 characters and normalizes line endings + */ +const cleanText = (str: string): string => { + const normalized = str.replace(/\r\n|\r/g, "\n") + return normalized.replace( + /[\u0000-\u0008\u000B-\u000C\u000E-\u001F\u007F-\u009F\uFDD0-\uFDEF\uFFFE\uFFFF]/g, + "", + ) +} + +/** + * Normalizes a row to ensure consistent column count and clean data + */ +function normalizeRow(row: string[], columnCount: number): string { + const normalizedCells: string[] = [] + + for (let i = 0; i < columnCount; i++) { + const cell = row[i] + if (cell === undefined || cell === null) { + normalizedCells.push("") + } else { + const cellStr = cell.toString() + const cleanedCell = cleanText(cellStr) + normalizedCells.push(cleanedCell) + } + } + + return normalizedCells.join("\t") +} + +/** + * Validates if a row contains meaningful content + */ +function isRowValid(row: string[]): boolean { + if (!Array.isArray(row) || row.length === 0) return false + + return row.some(cell => { + if (cell === undefined || cell === null || cell === "") return false + const cellStr = cell.toString().trim() + return cellStr.length > 0 + }) +} + +/** + * Validates if a header row has all cells filled (no empty, undefined, or null cells) + */ +function isHeaderRowValid(row: any[]): boolean { + if (!Array.isArray(row) || row.length === 0) return false + + return row.every(cell => { + if (cell === undefined || cell === null) return false + const cellStr = cell.toString().trim() + return cellStr.length > 0 + }) +} + +/** + * Truncates string to specified byte length while preserving character boundaries + */ +function truncateToByteLength(str: string, limit: number): string { + let bytes = 0 + let result = '' + + for (const char of str) { + const charBytes = getByteLength(char) + if (bytes + charBytes > limit) break + result += char + bytes += charBytes + } + + return result +} + +/** + * Creates chunks from data rows with size and row limits + */ +function createChunks(dataRows: string[][], state: ChunkingState): string[] { + const chunks: string[] = [] + let currentBatch: string[] = [] + + for (const row of dataRows) { + const normalizedRow = normalizeRow(row, state.columnCount) + + const potentialChunk = createChunkFromBatch( + [...currentBatch, normalizedRow], + state.headerRow + ) + + const wouldExceedRowLimit = currentBatch.length >= state.maxRowsPerChunk + const wouldExceedSizeLimit = getByteLength(potentialChunk) > state.maxChunkSize + + if ((wouldExceedRowLimit || wouldExceedSizeLimit) && currentBatch.length > 0) { + chunks.push(createChunkFromBatch(currentBatch, state.headerRow)) + + // Handle rows that exceed size limit + if (getByteLength(normalizedRow) > state.maxChunkSize) { + const truncatedRow = truncateToByteLength( + normalizedRow, + state.maxChunkSize - getByteLength(state.headerRow) - 1 + ) + chunks.push(createChunkFromBatch([truncatedRow], state.headerRow)) + currentBatch = [] + } else { + currentBatch = [normalizedRow] + } + } else { + currentBatch.push(normalizedRow) + } + } + + if (currentBatch.length > 0) { + chunks.push(createChunkFromBatch(currentBatch, state.headerRow)) + } + + return chunks +} + +/** + * Creates a single chunk from batch of rows and header + */ +function createChunkFromBatch(batch: string[], headerRow: string): string { + if (batch.length === 0) return headerRow + return [headerRow, ...batch].join("\n") +} + +function normalizeToWorksheet(input: string[][] | XLSX.WorkSheet): XLSX.WorkSheet { + if (Array.isArray(input)) { + return XLSX.utils.aoa_to_sheet(input) + } + return input +} + +// Main Export Functions + +/** + * Chunks spreadsheet data with intelligent header preservation + * Applies smart processing to both XLSX WorkSheet objects and string[][] arrays + * - Smart header detection with multiple heuristics + * - Multi-row header flattening + * - Row ID addition for traceability + * - Merged cell handling (XLSX only) + * - Adaptive chunking for wide spreadsheets + */ +export function chunkSheetWithHeaders( + input: string[][] | XLSX.WorkSheet, + config?: ChunkConfig, +): string[] { + let worksheet: XLSX.WorkSheet | null = null + let processedData: ProcessedSheetData | null = null + + try { + // Process input with unified smart logic + worksheet = normalizeToWorksheet(input) + unmerge(worksheet) + processedData = processSheetData(worksheet, config?.headerRows) + const { headerRow, dataRows } = processedData + + if (headerRow.length === 0) { + return [] + } + + // Configuration with sensible defaults + const maxChunkSize = config?.maxChunkSize ?? 1024 + const maxRowsPerChunk = config?.maxRowsPerChunk ?? 10 + + const columnCount = headerRow.length + + // Adaptive chunking for wide spreadsheets + const adaptiveMaxRowsPerChunk = columnCount > 15 + ? Math.max(3, Math.floor(maxRowsPerChunk * 0.6)) + : maxRowsPerChunk + + const state: ChunkingState = { + headerRow: normalizeRow(headerRow, columnCount), + maxRowsPerChunk: adaptiveMaxRowsPerChunk, + maxChunkSize, + columnCount, + } + + if (dataRows.length === 0) { + return [state.headerRow] + } + + const chunks = createChunks(dataRows, state) + + // Clear references to help garbage collection + processedData = null + + return chunks + } finally { + // Clean up worksheet reference if it was created from array + if (Array.isArray(input) && worksheet) { + // Clear the worksheet to help garbage collection + const keys = Object.keys(worksheet) + for (const key of keys) { + delete worksheet[key] + } + worksheet = null + } + } +} diff --git a/server/types.ts b/server/types.ts index 5ee42098e..a7bf2b193 100644 --- a/server/types.ts +++ b/server/types.ts @@ -610,3 +610,28 @@ export type ChunkMetadata = { page_numbers: number[]; block_labels: string[]; }; + +// DuckDB related types +export interface DuckDBQuery { + sql: string + notes: string +} + +export interface DuckDBResult { + user_question: string + resolved_metric?: string + sql: string + execution_meta: { + row_count: number + elapsed_ms: number + as_of: string + } + schema_fragment?: { + table: string + columns: Record + } + assumptions: string[] + data: { + rows: unknown[][] + } +}