diff --git a/server/api/files.ts b/server/api/files.ts index de1ca43d9..17d0ac751 100644 --- a/server/api/files.ts +++ b/server/api/files.ts @@ -139,12 +139,11 @@ export const handleFileUpload = async (c: Context) => { `File "${file.name}" processed successfully for DataSource. Result: ${result.message}`, ) } catch (error) { - const errorMessage = - isDataSourceError(error) - ? error.userMessage - : error instanceof Error - ? error.message - : "Unknown error during DataSource processing" + const errorMessage = isDataSourceError(error) + ? error.userMessage + : error instanceof Error + ? error.message + : "Unknown error during DataSource processing" loggerWithChild({ email: email }).error( error, `Error processing file "${file.name}" for DataSource`, @@ -281,6 +280,16 @@ export const handleAttachmentUpload = async (c: Context) => { chunks_pos: chunks_pos, image_chunks: image_chunks, image_chunks_pos: image_chunks_pos, + chunks_map: chunks.map((_, index) => ({ + chunk_index: index, + page_number: 0, + block_labels: [], + })), + image_chunks_map: image_chunks.map((_, index) => ({ + chunk_index: index, + page_number: 0, + block_labels: [], + })), metadata: JSON.stringify({ originalFileName: file.name, uploadedBy: email, @@ -406,7 +415,10 @@ export const handleAttachmentServe = async (c: Context) => { // Set appropriate headers c.header("Content-Type", fileType || "application/octet-stream") - c.header("Content-Disposition", `inline; filename*=UTF-8''${encodeURIComponent(fileName || 'file')}`) + c.header( + "Content-Disposition", + `inline; filename*=UTF-8''${encodeURIComponent(fileName || "file")}`, + ) c.header("Cache-Control", "public, max-age=31536000") // Cache for 1 year // Stream the file diff --git a/server/api/knowledgeBase.ts b/server/api/knowledgeBase.ts index 1710eb3cf..fe7485500 100644 --- a/server/api/knowledgeBase.ts +++ b/server/api/knowledgeBase.ts @@ -42,9 +42,9 @@ import type { CollectionItem, File as DbFile } from "@/db/schema" import { collectionItems, collections } from "@/db/schema" import { and, eq, isNull, sql } from "drizzle-orm" import { DeleteDocument, GetDocument } from "@/search/vespa" -import { KbItemsSchema } from "@xyne/vespa-ts/types" +import { ChunkMetadata, KbItemsSchema } from "@xyne/vespa-ts/types" import { boss, FileProcessingQueue } from "@/queue/api-server-queue" -import crypto from "crypto" +import * as crypto from "crypto" import { DATASOURCE_CONFIG, getBaseMimeType, @@ -111,7 +111,7 @@ function getStoragePath( collectionId: string, storageKey: string, fileName: string, -): string { +): string { const date = new Date() const year = date.getFullYear() const month = String(date.getMonth() + 1).padStart(2, "0") @@ -189,13 +189,17 @@ export const CreateCollectionApi = async (c: Context) => { }) // Queue after transaction commits to avoid race condition - await boss.send(FileProcessingQueue, { - collectionId: collection.id, - type: ProcessingJobType.COLLECTION - }, { - retryLimit: 3, - expireInHours: 12 - }) + await boss.send( + FileProcessingQueue, + { + collectionId: collection.id, + type: ProcessingJobType.COLLECTION, + }, + { + retryLimit: 3, + expireInHours: 12, + }, + ) loggerWithChild({ email: userEmail }).info( `Created Collection: ${collection.id} for user ${userEmail}`, ) @@ -678,13 +682,17 @@ export const CreateFolderApi = async (c: Context) => { }) // Queue after transaction commits to avoid race condition - await boss.send(FileProcessingQueue, { - folderId: folder.id, - type: ProcessingJobType.FOLDER - }, { - retryLimit: 3, - expireInHours: 12 - }) + await boss.send( + FileProcessingQueue, + { + folderId: folder.id, + type: ProcessingJobType.FOLDER, + }, + { + retryLimit: 3, + expireInHours: 12, + }, + ) loggerWithChild({ email: userEmail }).info( `Created folder: ${folder.id} in Collection: ${collectionId} with Vespa doc: ${folder.vespaDocId}`, @@ -751,11 +759,11 @@ const uploadSessions = new Map< // Clean up old sessions (older than 1 hour) setInterval(() => { const now = Date.now() - for (const [sessionId, session] of uploadSessions.entries()) { + uploadSessions.forEach((session, sessionId) => { if (now - session.createdAt > 3600000) { uploadSessions.delete(sessionId) } - } + }) }, 300000) // Run every 5 minutes // Helper function to ensure folder exists or create it @@ -824,13 +832,17 @@ async function ensureFolderPath( }) // Queue after transaction commits to avoid race condition - await boss.send(FileProcessingQueue, { - folderId: newFolder.id, - type: ProcessingJobType.FOLDER - }, { - retryLimit: 3, - expireInHours: 12 - }) + await boss.send( + FileProcessingQueue, + { + folderId: newFolder.id, + type: ProcessingJobType.FOLDER, + }, + { + retryLimit: 3, + expireInHours: 12, + }, + ) currentFolderId = newFolder.id @@ -1022,7 +1034,6 @@ export const UploadFilesApi = async (c: Context) => { // Parse the file path to extract folder structure const pathParts = filePath.split("/").filter((part) => part.length > 0) const originalFileName = pathParts.pop() || file.name // Get the actual filename - // Skip if the filename is a system file (in case it comes from path) if ( @@ -1193,15 +1204,19 @@ export const UploadFilesApi = async (c: Context) => { }, user.id, user.email, - `File uploaded successfully, queued for processing` // Initial status message + `File uploaded successfully, queued for processing`, // Initial status message ) }) // Queue after transaction commits to avoid race condition - await boss.send(FileProcessingQueue, { fileId: item.id, type: ProcessingJobType.FILE }, { - retryLimit: 3, - expireInHours: 12 - }) + await boss.send( + FileProcessingQueue, + { fileId: item.id, type: ProcessingJobType.FILE }, + { + retryLimit: 3, + expireInHours: 12, + }, + ) uploadResults.push({ success: true, @@ -1232,11 +1247,10 @@ export const UploadFilesApi = async (c: Context) => { try { await unlink(storagePath) } catch (err) { - loggerWithChild({ email: userEmail, }).error( - error, - `Failed to clean up storage file` - ); - + loggerWithChild({ email: userEmail }).error( + error, + `Failed to clean up storage file`, + ) } } @@ -1575,9 +1589,24 @@ export const GetChunkContentApi = async (c: Context) => { throw new HTTPException(404, { message: "Document missing chunk data" }) } - const index = resp.fields.chunks_pos.findIndex( - (pos: number) => pos === chunkIndex, - ) + // Handle both legacy number[] format and new ChunkMetadata[] format + const index = resp.fields.chunks_pos.findIndex((pos: number | ChunkMetadata) => { + // If it's a number (legacy format), compare directly + if (typeof pos === "number") { + return pos === chunkIndex + } + // If it's a ChunkMetadata object, compare the index field + if (typeof pos === "object" && pos !== null) { + if (pos.chunk_index !== undefined) { + return pos.chunk_index === chunkIndex + } else { + loggerWithChild({ email: userEmail }).warn( + `Unexpected chunk position object format: ${JSON.stringify(pos)}`, + ) + } + } + return false + }) if (index === -1) { throw new HTTPException(404, { message: "Chunk index not found" }) } diff --git a/server/docxChunks.ts b/server/docxChunks.ts index 35ee56406..004c23937 100644 --- a/server/docxChunks.ts +++ b/server/docxChunks.ts @@ -2624,7 +2624,7 @@ export async function extractTextAndImagesWithChunksFromDocx( `Reusing description for repeated image ${imagePath}`, ) } else { - if(describeImages) { + if (describeImages) { description = await describeImageWithllm(imageBuffer) } else { description = "This is an image." diff --git a/server/lib/chunkByOCR.ts b/server/lib/chunkByOCR.ts new file mode 100644 index 000000000..855bd2dcb --- /dev/null +++ b/server/lib/chunkByOCR.ts @@ -0,0 +1,795 @@ +import { promises as fsPromises } from "fs" +import * as path from "path" +import { PDFDocument } from "pdf-lib" +import { getLogger } from "../logger" +import { Subsystem, type ChunkMetadata } from "../types" +import type { ProcessingResult } from "../services/fileProcessor" + +const Logger = getLogger(Subsystem.Integrations).child({ + module: "chunkByOCR", +}) + +const DEFAULT_MAX_CHUNK_BYTES = 1024 +const DEFAULT_IMAGE_DIR = "downloads/xyne_images_db" +const DEFAULT_LAYOUT_PARSING_BASE_URL = "http://localhost:8000" +const LAYOUT_PARSING_API_PATH = "/v2/models/layout-parsing/infer" + +type LayoutParsingBlock = { + block_label?: string + block_content?: string + block_bbox?: number[] +} + +type LayoutParsingMarkdown = { + text?: string + isStart?: boolean + isEnd?: boolean + images?: Record +} + +type LayoutParsingResult = { + prunedResult?: { + parsing_res_list?: LayoutParsingBlock[] + } + markdown?: LayoutParsingMarkdown +} + +type LayoutParsingApiEnvelope = { + outputs?: Array<{ + data?: string[] + }> +} + +type LayoutParsingApiPayload = { + layoutParsingResults: LayoutParsingResult[] + dataInfo?: unknown +} + +type TritonRequestPayload = { + inputs: Array<{ + name: string + shape: number[] + datatype: string + data: string[] + }> + outputs: Array<{ + name: string + }> +} + +type ImageLookupEntry = { + base64: string + filePath: string +} + +type ImageMetadata = { + fileName?: string + bboxKey?: string | null + pageIndex: number +} + +type ImageBufferMap = Record +type ImageMetadataMap = Record + +type OcrBlock = { + block_label: string + block_content: string + block_bbox: number[] + image_index?: number +} + +type OcrResponse = Record + +type GlobalSeq = { + value: number +} + +function getByteLength(str: string): number { + return Buffer.byteLength(str, "utf8") +} + +function splitText(text: string, maxBytes: number): string[] { + const sentences = text.match(/[^.!?]+[.!?]+|\S+/g) ?? [] + const chunks: string[] = [] + let currentChunk: string[] = [] + let currentBytes = 0 + + for (const sentence of sentences) { + const sentenceBytes = getByteLength(sentence) + 1 + + if (currentBytes + sentenceBytes > maxBytes) { + if (currentChunk.length > 0) { + chunks.push(currentChunk.join(" ")) + } + currentChunk = [sentence] + currentBytes = sentenceBytes + } else { + currentChunk.push(sentence) + currentBytes += sentenceBytes + } + } + + if (currentChunk.length > 0) { + chunks.push(currentChunk.join(" ")) + } + + return chunks +} + +function detectImageExtension(buffer: Buffer): string { + if (buffer.length >= 4) { + if (buffer[0] === 0xff && buffer[1] === 0xd8 && buffer[2] === 0xff) { + return "jpg" + } + if ( + buffer[0] === 0x89 && + buffer[1] === 0x50 && + buffer[2] === 0x4e && + buffer[3] === 0x47 + ) { + return "png" + } + if (buffer[0] === 0x47 && buffer[1] === 0x49 && buffer[2] === 0x46) { + return "gif" + } + if ( + buffer.slice(0, 4).toString("ascii") === "RIFF" && + buffer.slice(8, 12).toString("ascii") === "WEBP" + ) { + return "webp" + } + } + return "jpg" +} + +function sanitizeFileName(input: string): string { + const sanitized = input.replace(/[^a-zA-Z0-9._-]/g, "_") + return sanitized || "image" +} + +function ensureUniqueFileName(name: string, usedNames: Set): string { + if (!usedNames.has(name)) { + usedNames.add(name) + return name + } + + const parsed = path.parse(name) + let counter = 1 + + while (true) { + const candidate = `${parsed.name}_${counter}${parsed.ext}` + if (!usedNames.has(candidate)) { + usedNames.add(candidate) + return candidate + } + counter += 1 + } +} + +function normalizeBBox(bbox?: number[]): string | null { + if (!Array.isArray(bbox) || bbox.length !== 4) { + return null + } + try { + return bbox.map((value) => Math.round(Number(value))).join("_") + } catch { + return null + } +} + +function parseBBoxKeyFromImagePath(imagePath: string): string | null { + if (!imagePath) { + return null + } + const cleaned = imagePath.replace(/\\/g, "/") + const fileName = cleaned.split("/").pop() + if (!fileName) { + return null + } + const numbers = fileName.match(/\d+/g) + if (!numbers || numbers.length < 4) { + return null + } + return numbers.slice(-4).join("_") +} + +function buildImageLookup( + images: Record, +): Map { + const lookup = new Map() + + for (const [imgPath, base64Data] of Object.entries(images)) { + if (!base64Data) { + continue + } + const bboxKey = parseBBoxKeyFromImagePath(imgPath) + if (!bboxKey) { + continue + } + if (!lookup.has(bboxKey)) { + lookup.set(bboxKey, { + base64: base64Data, + filePath: imgPath, + }) + } + } + + return lookup +} + +function transformBlockContent(label: string, content: string): string { + switch (label) { + case "header": + case "doc_title": + return content ? `# ${content}` : content + case "paragraph_title": + return content ? `## ${content}` : content + case "formula": + return content ? `$$${content}$$` : content + case "figure_title": + return content + ? `
${content}
` + : content + default: + return content + } +} + +function normalizeBlockContent(block: OcrBlock): string { + const content = block.block_content ?? "" + if (!content.trim()) { + return "" + } + + // if (block.block_label === "table") { + // return content + // .replace(/<\/(td|th)>/gi, " ") + // .replace(/<\/tr>/gi, " \n ") + // .replace(/<[^>]+>/g, " ") + // .replace(/\s+/g, " ") + // .trim() + // } + + if (block.block_label === "figure_title") { + return content.trim() + } + + return content.replace(/\s+/g, " ").trim() +} + +function deriveImageFileName( + preferredName: string | undefined, + bboxKey: string | null | undefined, + buffer: Buffer, + imageIndex: number, + pageIndex: number, +): string { + const ext = detectImageExtension(buffer) + + if (preferredName) { + const sanitized = sanitizeFileName(preferredName) + const parsed = path.parse(sanitized) + + if (parsed.ext) { + const normalizedExt = parsed.ext.replace(/\./, "") + if (normalizedExt.toLowerCase() !== ext) { + return `${parsed.name || `image_${imageIndex}`}.${ext}` + } + return sanitized + } + + return `${parsed.name || `image_${imageIndex}`}.${ext}` + } + + if (bboxKey) { + return `img_in_image_box_${bboxKey}.${ext}` + } + + return `page_${pageIndex + 1}_image_${imageIndex}.${ext}` +} + +async function callLayoutParsingApi( + buffer: Buffer, + fileName: string, +): Promise { + const baseUrl = process.env.LAYOUT_PARSING_BASE_URL || DEFAULT_LAYOUT_PARSING_BASE_URL + const apiUrl = baseUrl + LAYOUT_PARSING_API_PATH + const fileType = + Number.parseInt(process.env.LAYOUT_PARSING_FILE_TYPE ?? "0", 10) || 0 + const visualize = process.env.LAYOUT_PARSING_VISUALIZE === "false" + const timeoutMs = Number.parseInt( + process.env.LAYOUT_PARSING_TIMEOUT_MS ?? "120000", + 10, + ) + + Logger.info("Calling layout parsing API", { + apiUrl, + fileName, + fileSize: buffer.length, + }) + + const inputPayload = { + file: buffer.toString("base64"), + fileType, + visualize, + } + + const requestPayload: TritonRequestPayload = { + inputs: [ + { + name: "input", + shape: [1, 1], + datatype: "BYTES", + data: [JSON.stringify(inputPayload)], + }, + ], + outputs: [ + { + name: "output", + }, + ], + } + + const controller = new AbortController() + const timer = + Number.isFinite(timeoutMs) && timeoutMs > 0 + ? setTimeout(() => controller.abort(), timeoutMs) + : undefined + + try { + const response = await fetch(apiUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(requestPayload), + signal: controller.signal, + }) + + if (!response.ok) { + const responseText = await response.text().catch(() => "") + throw new Error( + `Layout parsing API request failed (${response.status}): ${responseText.slice(0, 200)}`, + ) + } + + const envelope = (await response.json()) as LayoutParsingApiEnvelope + + const outputPayload = envelope.outputs?.[0]?.data?.[0] + if (!outputPayload) { + throw new Error("Layout parsing API payload missing expected output data") + } + + let parsed: unknown + try { + parsed = JSON.parse(outputPayload) + } catch (error) { + throw new Error( + `Failed to JSON.parse layout parsing payload: ${(error as Error).message}`, + ) + } + + const result = (parsed as { result?: LayoutParsingApiPayload }).result + if (!result) { + throw new Error("Layout parsing API response missing result field") + } + + return result + } finally { + if (timer) { + clearTimeout(timer) + } + } +} + +function transformLayoutParsingResults( + layoutParsingResults: LayoutParsingResult[], +): { + ocrResponse: OcrResponse + images: ImageBufferMap + imageMetadata: ImageMetadataMap +} { + const ocrResponse: OcrResponse = {} + const images: ImageBufferMap = {} + const imageMetadata: ImageMetadataMap = {} + let nextImageIndex = 0 + + layoutParsingResults.forEach((layout, pageIndex) => { + const parsingResList = layout.prunedResult?.parsing_res_list ?? [] + const markdownImages = layout.markdown?.images ?? {} + const imageLookup = buildImageLookup(markdownImages) + const usedImagePaths = new Set() + const transformedBlocks: OcrBlock[] = [] + + for (const rawBlock of parsingResList) { + const blockLabel = rawBlock.block_label ?? "text" + const rawContent = rawBlock.block_content ?? "" + const transformedContent = transformBlockContent(blockLabel, rawContent) + const blockBBox = Array.isArray(rawBlock.block_bbox) + ? [...rawBlock.block_bbox] + : [] + + const transformedBlock: OcrBlock = { + block_label: blockLabel, + block_content: transformedContent, + block_bbox: blockBBox, + } + + if (blockLabel === "image") { + const bboxKey = normalizeBBox(blockBBox) + const matchedImage = bboxKey ? imageLookup.get(bboxKey) : undefined + + if (matchedImage && !usedImagePaths.has(matchedImage.filePath)) { + try { + const imageBuffer = Buffer.from(matchedImage.base64, "base64") + const imageIndex = nextImageIndex + nextImageIndex += 1 + + transformedBlock.image_index = imageIndex + images[imageIndex] = imageBuffer + imageMetadata[imageIndex] = { + fileName: path.basename(matchedImage.filePath), + bboxKey, + pageIndex, + } + usedImagePaths.add(matchedImage.filePath) + } catch (error) { + Logger.error("Failed to decode image from layout parsing result", { + error: (error as Error).message, + pageIndex, + bboxKey, + }) + } + } else { + Logger.debug("No matching image found for block", { + pageIndex, + bboxKey, + }) + } + } + + transformedBlocks.push(transformedBlock) + } + + ocrResponse[String(pageIndex)] = transformedBlocks + }) + + return { ocrResponse, images, imageMetadata } +} + +async function splitPdfIntoBatches( + buffer: Buffer, + maxPagesPerBatch: number = 30, +): Promise { + const srcPdf = await PDFDocument.load(buffer) + const totalPages = srcPdf.getPageCount() + + if (totalPages <= maxPagesPerBatch) { + return [buffer] + } + + Logger.info("Splitting large PDF into batches", { + totalPages, + maxPagesPerBatch, + estimatedBatches: Math.ceil(totalPages / maxPagesPerBatch), + }) + + const batches: Buffer[] = [] + + for (let startPage = 0; startPage < totalPages; startPage += maxPagesPerBatch) { + const endPage = Math.min(startPage + maxPagesPerBatch, totalPages) + const pageCount = endPage - startPage + + // Create new PDF with subset of pages + const newPdf = await PDFDocument.create() + const pageIndices: number[] = [] + for (let i = 0; i < pageCount; i++) { + pageIndices.push(startPage + i) + } + + const copiedPages = await newPdf.copyPages(srcPdf, pageIndices) + for (const page of copiedPages) { + newPdf.addPage(page) + } + + const pdfBytes = await newPdf.save() + batches.push(Buffer.from(pdfBytes)) + + Logger.info("Created PDF batch", { + batchIndex: batches.length, + startPage: startPage + 1, + endPage, + pagesInBatch: pageCount, + batchSizeBytes: pdfBytes.length, + }) + } + + return batches +} + +function mergeLayoutParsingResults( + results: LayoutParsingApiPayload[], +): LayoutParsingApiPayload { + const allLayoutResults: LayoutParsingResult[] = [] + let pageOffset = 0 + + for (const result of results) { + const layoutResults = result.layoutParsingResults ?? [] + + // Adjust page indices to maintain correct ordering across batches + const adjustedResults = layoutResults.map((layout, localPageIndex) => ({ + ...layout, + // We don't need to modify the layout structure itself since + // transformLayoutParsingResults handles page indexing correctly + })) + + allLayoutResults.push(...adjustedResults) + pageOffset += layoutResults.length + } + + return { + layoutParsingResults: allLayoutResults, + dataInfo: results[0]?.dataInfo, + } +} + +export async function chunkByOCRFromBuffer( + buffer: Buffer, + fileName: string, + docId: string, +): Promise { + + // Check if this is a PDF and handle batching if necessary + const isPdf = fileName.toLowerCase().endsWith('.pdf') + let finalApiResult: LayoutParsingApiPayload + + if (isPdf) { + try { + const srcPdf = await PDFDocument.load(buffer) + const totalPages = srcPdf.getPageCount() + + if (totalPages > 30) { + // Split PDF into batches and process each + const batches = await splitPdfIntoBatches(buffer, 30) + const batchResults: LayoutParsingApiPayload[] = [] + + for (let i = 0; i < batches.length; i++) { + const batch = batches[i] + Logger.info("Processing PDF batch", { + batchIndex: i + 1, + totalBatches: batches.length, + batchSizeBytes: batch.length, + }) + + const batchResult = await callLayoutParsingApi(batch, `${fileName}_batch_${i + 1}`) + batchResults.push(batchResult) + } + + // Merge all batch results + finalApiResult = mergeLayoutParsingResults(batchResults) + + Logger.info("Merged batch results", { + totalBatches: batches.length, + layoutResultsCount: finalApiResult.layoutParsingResults?.length || 0, + }) + } else { + // Small PDF, process normally + finalApiResult = await callLayoutParsingApi(buffer, fileName) + } + } catch (error) { + Logger.warn("Failed to analyze PDF for batching, processing as single file", { + fileName, + error: (error as Error).message, + }) + finalApiResult = await callLayoutParsingApi(buffer, fileName) + } + } else { + // Not a PDF, process normally + finalApiResult = await callLayoutParsingApi(buffer, fileName) + } + + Logger.info("API result received", { + layoutResultsCount: finalApiResult.layoutParsingResults?.length || 0, + }) + + const layoutResults = finalApiResult.layoutParsingResults ?? [] + if (layoutResults.length === 0) { + Logger.warn("Layout parsing API returned no results", { fileName }) + } + + const { ocrResponse, images, imageMetadata } = + transformLayoutParsingResults(layoutResults) + Logger.debug("Transformed layout results", { + ocrResponsePages: Object.keys(ocrResponse).length, + imagesCount: Object.keys(images).length, + imageMetadataCount: Object.keys(imageMetadata).length, + }) + + return chunkByOCR(docId, ocrResponse, images, imageMetadata) +} + +export async function chunkByOCR( + docId: string, + ocrResponse: OcrResponse, + images: ImageBufferMap, + imageMetadata: ImageMetadataMap = {}, +): Promise { + const chunks: string[] = [] + const chunks_map: ChunkMetadata[] = [] + const image_chunks: string[] = [] + const image_chunks_map: ChunkMetadata[] = [] + + const globalSeq: GlobalSeq = { value: 0 } + const maxChunkBytes = Number.parseInt( + process.env.OCR_MAX_CHUNK_BYTES ?? "", + 1024, + ) + const chunkSizeLimit = + Number.isFinite(maxChunkBytes) && maxChunkBytes > 0 + ? maxChunkBytes + : DEFAULT_MAX_CHUNK_BYTES + + let currentTextBuffer = "" + let currentBlockLabels: string[] = [] + let lastPageNumber = -1 + + const imageBaseDir = path.resolve(process.env.IMAGE_DIR || DEFAULT_IMAGE_DIR) + const docImageDir = path.join(imageBaseDir, docId) + await fsPromises.mkdir(docImageDir, { recursive: true }) + const savedImages = new Set() + const usedFileNames = new Set() + + const addChunk = () => { + if (!currentTextBuffer.trim()) { + currentTextBuffer = "" + currentBlockLabels = [] + return + } + + const subChunks = splitText(currentTextBuffer, chunkSizeLimit) + + for (let index = 0; index < subChunks.length; index += 1) { + let chunkContent = subChunks[index] + if (index > 0) { + chunkContent = `(continued) ${chunkContent}` + } + + chunks.push(chunkContent) + chunks_map.push({ + chunk_index: globalSeq.value, + page_number: lastPageNumber, + block_labels: Array.from(new Set(currentBlockLabels)), + }) + + globalSeq.value += 1 + } + + currentTextBuffer = "" + currentBlockLabels = [] + } + + const pageKeys = Object.keys(ocrResponse) + .map((key) => Number.parseInt(key, 10)) + .filter((value) => !Number.isNaN(value)) + .sort((a, b) => a - b) + + for (const pageNumber of pageKeys) { + const blocks = ocrResponse[String(pageNumber)] ?? [] + lastPageNumber = pageNumber + + for (const block of blocks) { + if (block.block_label === "image") { + if (typeof block.image_index !== "number") { + Logger.warn("Image block missing image_index", { + docId, + pageNumber, + }) + continue + } + + const imageBuffer = images[block.image_index] + if (!imageBuffer) { + Logger.warn("No image buffer found for index", { + docId, + pageNumber, + imageIndex: block.image_index, + }) + continue + } + + const metadata = imageMetadata[block.image_index] ?? { + bboxKey: normalizeBBox(block.block_bbox), + pageIndex: pageNumber, + } + + // const fileName = deriveImageFileName( + // metadata.fileName, + // metadata.bboxKey ?? normalizeBBox(block.block_bbox), + // imageBuffer, + // block.image_index, + // metadata.pageIndex ?? pageNumber, + // ) + const extension = detectImageExtension(imageBuffer) + const fileName = String(globalSeq.value) + "." + extension + + const uniqueFileName = ensureUniqueFileName(fileName, usedFileNames) + const imagePath = path.join(docImageDir, uniqueFileName) + + if (!savedImages.has(block.image_index)) { + try { + await fsPromises.writeFile(imagePath, imageBuffer) + savedImages.add(block.image_index) + Logger.info("Saved OCR image", { + docId, + pageNumber, + imageIndex: block.image_index, + imagePath, + }) + } catch (error) { + Logger.error("Failed to save OCR image", { + docId, + pageNumber, + imageIndex: block.image_index, + error: (error as Error).message, + }) + } + } + + const description = block.block_content || "Image from document" + image_chunks.push(description) + image_chunks_map.push({ + chunk_index: globalSeq.value, + page_number: pageNumber, + block_labels: ["image"], + }) + globalSeq.value += 1 + + currentTextBuffer += `${currentTextBuffer ? " " : ""}[IMG#${block.image_index}]` + } else { + const normalizedText = normalizeBlockContent(block) + if (!normalizedText) { + continue + } + + const projectedSize = + getByteLength(currentTextBuffer) + + (currentTextBuffer ? 1 : 0) + + getByteLength(normalizedText) + + if (projectedSize > chunkSizeLimit) { + addChunk() + } + + currentTextBuffer += (currentTextBuffer ? " " : "") + normalizedText + currentBlockLabels.push(block.block_label) + } + } + } + + if (currentTextBuffer.trim()) { + Logger.debug("Adding final text chunk") + addChunk() + } + + const chunks_pos = chunks_map.map((metadata) => metadata.chunk_index) + const image_chunks_pos = image_chunks_map.map( + (metadata) => metadata.chunk_index, + ) + + Logger.info("Processing completed", { + totalTextChunks: chunks.length, + totalImageChunks: image_chunks.length, + totalChunksMetadata: chunks_map.length, + totalImageChunksMetadata: image_chunks_map.length, + finalGlobalSeq: globalSeq.value, + }) + + return { + chunks, + chunks_pos, + image_chunks, + image_chunks_pos, + chunks_map, + image_chunks_map, + } +} diff --git a/server/package.json b/server/package.json index d7795019c..b23101e85 100644 --- a/server/package.json +++ b/server/package.json @@ -73,7 +73,7 @@ "@types/json-schema": "^7.0.15", "@types/jszip": "^3.4.1", "@types/node": "^24.3.0", - "@xyne/vespa-ts": "^1.0.8", + "@xyne/vespa-ts": "1.0.10", "@xynehq/jaf": "^0.1.4", "ai": "^5.0.51", "arctic": "^3.3.0", diff --git a/server/queue/fileProcessor.ts b/server/queue/fileProcessor.ts index 4f8a818cb..3c8d69016 100644 --- a/server/queue/fileProcessor.ts +++ b/server/queue/fileProcessor.ts @@ -217,6 +217,8 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) { chunks_pos: processingResult.chunks_pos, image_chunks: processingResult.image_chunks, image_chunks_pos: processingResult.image_chunks_pos, + chunks_map: processingResult.chunks_map, + image_chunks_map: processingResult.image_chunks_map, metadata: JSON.stringify({ originalFileName: file.originalName || file.fileName, uploadedBy: file.uploadedByEmail || "system", @@ -325,6 +327,8 @@ async function processCollectionJob( image_chunks: [], chunks_pos: [], image_chunks_pos: [], + chunks_map: [], + image_chunks_map: [], metadata: JSON.stringify({ version: "1.0", lastModified: Date.now(), @@ -444,6 +448,8 @@ async function processFolderJob( image_chunks: [], chunks_pos: [], image_chunks_pos: [], + chunks_map: [], + image_chunks_map: [], metadata: JSON.stringify({ version: "1.0", lastModified: Date.now(), diff --git a/server/services/fileProcessor.ts b/server/services/fileProcessor.ts index 0c14f3c74..4206d8d00 100644 --- a/server/services/fileProcessor.ts +++ b/server/services/fileProcessor.ts @@ -2,9 +2,10 @@ import { getErrorMessage } from "@/utils" import { chunkDocument } from "@/chunks" // import { extractTextAndImagesWithChunksFromPDF } from "@/pdf -import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" +import { chunkByOCRFromBuffer } from "@/lib/chunkByOCR" +import { type ChunkMetadata } from "@/types" import * as XLSX from "xlsx" import { getBaseMimeType, @@ -13,15 +14,23 @@ import { isDocxFile, isPptxFile, } from "@/integrations/dataSource/config" +import { getLogger, Subsystem } from "@/logger" + +const Logger = getLogger(Subsystem.Ingest).child({ + module: "fileProcessor", +}) export interface ProcessingResult { chunks: string[] chunks_pos: number[] image_chunks: string[] image_chunks_pos: number[] + chunks_map: ChunkMetadata[] + image_chunks_map: ChunkMetadata[] } export class FileProcessorService { + static async processFile( buffer: Buffer, mimeType: string, @@ -39,15 +48,12 @@ export class FileProcessorService { try { if (baseMimeType === "application/pdf") { - // Process PDF - const result = await extractTextAndImagesWithChunksFromPDFviaGemini( - new Uint8Array(buffer), - vespaDocId, - ) - chunks = result.text_chunks - chunks_pos = result.text_chunk_pos - image_chunks = result.image_chunks || [] - image_chunks_pos = result.image_chunk_pos || [] + // Redirect PDF processing to OCR + const result = await chunkByOCRFromBuffer(buffer, fileName, vespaDocId) + + + + return result } else if (isDocxFile(baseMimeType)) { // Process DOCX const result = await extractTextAndImagesWithChunksFromDocx( @@ -138,9 +144,9 @@ export class FileProcessorService { } } } catch (error) { - console.warn( - `Failed to process file content for ${fileName}: ${getErrorMessage(error)}`, - ) + // Log the processing failure with error details and context + Logger.error(error, `File processing failed for ${fileName} (${baseMimeType}, ${buffer.length} bytes)`) + // Create basic chunk on processing error chunks = [ `File: ${fileName}, Type: ${baseMimeType}, Size: ${buffer.length} bytes`, @@ -148,11 +154,26 @@ export class FileProcessorService { chunks_pos = [0] } + // For non-PDF files, create empty chunks_map and image_chunks_map for backward compatibility + const chunks_map: ChunkMetadata[] = chunks.map((_, index) => ({ + chunk_index: index, + page_number: -1, // Default to page -1 for non-PDF files + block_labels: ["text"], // Default block label + })); + + const image_chunks_map: ChunkMetadata[] = image_chunks.map((_, index) => ({ + chunk_index: index, // Local indexing for image chunks array + page_number: -1, // Default to page -1 for non-PDF files + block_labels: ["image"], // Default block label + })); + return { chunks, chunks_pos, image_chunks, image_chunks_pos, + chunks_map, + image_chunks_map, } } } diff --git a/server/types.ts b/server/types.ts index c60bd2a57..9da0fbb68 100644 --- a/server/types.ts +++ b/server/types.ts @@ -600,3 +600,10 @@ export const UserMetadata = z.object({ }) export type UserMetadataType = z.infer + +// ChunkMetadata type for OCR and file processing +export type ChunkMetadata = { + chunk_index: number; + page_number: number; + block_labels: string[]; +}; diff --git a/server/vespa/schemas/kb_items.sd b/server/vespa/schemas/kb_items.sd index a24ca8b03..6a0208fa9 100644 --- a/server/vespa/schemas/kb_items.sd +++ b/server/vespa/schemas/kb_items.sd @@ -1,4 +1,12 @@ schema kb_items { + + + struct chunk_meta { + field chunk_index type int {} + field page_number type int {} + field block_labels type array {} + } + document kb_items { field docId type string { @@ -62,6 +70,7 @@ schema kb_items { index { enable-bm25 } } + field chunks_pos type array { indexing: attribute | summary } @@ -69,6 +78,19 @@ schema kb_items { field image_chunks_pos type array { indexing: attribute | summary } + + + field chunks_map type array { + indexing: summary + struct-field chunk_index { indexing: attribute | summary } + struct-field page_number { indexing: attribute | summary } + } + + field image_chunks_map type array { + indexing: summary + struct-field chunk_index { indexing: attribute | summary } + struct-field page_number { indexing: attribute | summary } + } field metadata type string { indexing: attribute | summary