From 676cd8830876de8fd393876c90f1eef49a82d7c3 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 12:08:04 +0530 Subject: [PATCH 1/7] feat: XYNE-189 instance based requests for large pdfswith retry /backoff --- server/lib/chunkByOCR.ts | 1093 ++++++++++++++++++++++++--- server/scripts/testOcrDispatcher.ts | 464 ++++++++++++ 2 files changed, 1470 insertions(+), 87 deletions(-) create mode 100644 server/scripts/testOcrDispatcher.ts diff --git a/server/lib/chunkByOCR.ts b/server/lib/chunkByOCR.ts index ce943447b..6be043d46 100644 --- a/server/lib/chunkByOCR.ts +++ b/server/lib/chunkByOCR.ts @@ -17,8 +17,25 @@ const DEFAULT_LAYOUT_PARSING_VISUALIZE = false const LAYOUT_PARSING_API_PATH = "/v2/models/layout-parsing/infer" const DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST = 100 const TEXT_CHUNK_OVERLAP_CHARS = 32 -const USE_SEQUENTIAL_BATCH_PROCESSING=true +// Configuration constants +const LAYOUT_PARSING_BASE_URL = process.env.LAYOUT_PARSING_BASE_URL || DEFAULT_LAYOUT_PARSING_BASE_URL +const LAYOUT_PARSING_TIMEOUT_MS = Number.parseInt(process.env.LAYOUT_PARSING_TIMEOUT_MS ?? "300000", 10) +const OCR_MAX_CHUNK_BYTES = Number.parseInt(process.env.OCR_MAX_CHUNK_BYTES ?? "", 10) +const IMAGE_DIR = process.env.IMAGE_DIR || DEFAULT_IMAGE_DIR +const LAYOUT_PARSING_MAX_PAGES_PER_REQUEST = Number.parseInt(process.env.LAYOUT_PARSING_MAX_PAGES_PER_REQUEST ?? "", 10) + +const DEFAULT_STATUS_ENDPOINT = "http://localhost:8081/instance_status" +const DEFAULT_POLL_INTERVAL_MS = 300 +const DEFAULT_REQUEST_TIMEOUT_MS = 120_000 +const DEFAULT_MAX_RETRIES = 2 +const DEFAULT_THRESHOLD_FOR_CONCURRENCY = 1 +const STATUS_FETCH_TIMEOUT_MS = 2_000 +const STATUS_FETCH_MAX_RETRIES = 3 +const BACKOFF_BASE_MS = 500 +const BACKOFF_FACTOR = 2 +const BACKOFF_MAX_MS = 8_000 +const BACKOFF_JITTER_RATIO = 0.2 type LayoutParsingBlock = { block_label?: string @@ -90,6 +107,778 @@ type GlobalSeq = { value: number } +type PdfPageBatch = { + buffer: Buffer + startPage: number + endPage: number +} + +export interface PdfOcrBatch { + id: string + fileName: string + startPage: number + endPage: number + pdfBuffer: Buffer +} + +export interface DispatchOptions { + statusEndpoint?: string + pollIntervalMs?: number + requestTimeoutMs?: number + maxRetries?: number + thresholdForConcurrency?: number + signal?: AbortSignal + logger?: Pick + metrics?: { + incr(name: string, tags?: Record): void + observe(name: string, value: number, tags?: Record): void + } + sendBatch?: ( + batch: PdfOcrBatch, + options: SendPdfBatchOptions, + ) => Promise +} + +export interface DispatchReport { + total: number + succeeded: number + failed: number + startedAt: number + endedAt: number + perItem: Array<{ + id: string + status: "ok" | "failed" + attempts: number + latencyMs: number + error?: string + }> +} + +type BatchDispatchResult = DispatchReport["perItem"][number] + +type InstanceStatusPayload = { + active_instances?: unknown + configured_instances?: unknown + idle_instances?: unknown + last_updated?: unknown +} + + + +const noopLogger: Pick = { + info() {}, + warn() {}, + error() {}, +} + +const noopMetrics = { + incr() {}, + observe() {}, +} + +type SendPdfBatchOptions = { + timeoutMs: number +} + +// Placeholder implementation for integrating with the OCR service. +// This uses the same endpoint as callLayoutParsingApi +async function sendPdfOcrBatch( + batch: PdfOcrBatch, + { timeoutMs }: SendPdfBatchOptions, +): Promise { + const baseUrl = LAYOUT_PARSING_BASE_URL.replace(/\/+$/, '') + const apiUrl = baseUrl + '/' + LAYOUT_PARSING_API_PATH.replace(/^\/+/, '') + + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), timeoutMs) + + try { + const response = await fetch(apiUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + // TODO: adjust payload to match OCR batch contract. + id: batch.id, + fileName: batch.fileName, + startPage: batch.startPage, + endPage: batch.endPage, + pdfBase64: batch.pdfBuffer.toString("base64"), + }), + signal: controller.signal, + }) + + if (!response.ok) { + const responseText = await response.text().catch(() => "") + throw new Error( + `OCR batch request failed (${response.status}): ${responseText.slice(0, 200)}`, + ) + } + + // TODO: handle the response payload if the OCR service returns useful data. + } catch (error) { + if ((error as Error).name === "AbortError") { + throw new Error("OCR batch request aborted due to timeout") + } + throw error + } finally { + clearTimeout(timer) + } +} + +function coerceStatusNumber(raw: unknown): number | undefined { + if (typeof raw === "number") { + if (Number.isFinite(raw)) { + return raw + } + return undefined + } + + if (typeof raw === "string") { + const trimmed = raw.trim() + if (trimmed.length === 0) { + return undefined + } + const parsed = Number(trimmed) + if (Number.isFinite(parsed)) { + return parsed + } + } + + return undefined +} + +function sanitizeIdleValue(raw: unknown): number { + const numericValue = coerceStatusNumber(raw) + if (numericValue === undefined) { + return 0 + } + if (numericValue <= 0) { + return 0 + } + return Math.floor(numericValue) +} + +function createAbortError(): Error { + const error = new Error("aborted") + error.name = "AbortError" + return error +} + +function isAbortError(error: unknown): boolean { + return ( + error instanceof Error && + (error.name === "AbortError" || error.message === "aborted") + ) +} + +function applyBackoffJitter(durationMs: number): number { + const jitter = + 1 + (Math.random() * 2 - 1) * Math.max(0, Math.min(1, BACKOFF_JITTER_RATIO)) + return Math.min(BACKOFF_MAX_MS, Math.round(durationMs * jitter)) +} + +async function sleep(ms: number, signal?: AbortSignal): Promise { + if (ms <= 0) { + if (signal?.aborted) { + throw createAbortError() + } + return + } + + if (signal?.aborted) { + throw createAbortError() + } + + await new Promise((resolve, reject) => { + let timer: ReturnType | null = null + const abortHandler = () => { + if (timer) { + clearTimeout(timer) + timer = null + } + if (signal) { + signal.removeEventListener("abort", abortHandler) + } + reject(createAbortError()) + } + + timer = setTimeout(() => { + if (signal) { + signal.removeEventListener("abort", abortHandler) + } + timer = null + resolve() + }, ms) + + if (signal) { + signal.addEventListener("abort", abortHandler) + } + }) +} + +async function fetchIdleInstances( + endpoint: string, + logger: Pick, + metrics: DispatchOptions["metrics"], +): Promise { + let attempt = 0 + let lastError: unknown + + while (attempt < STATUS_FETCH_MAX_RETRIES) { + attempt += 1 + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), STATUS_FETCH_TIMEOUT_MS) + + try { + const response = await fetch(endpoint, { + method: "GET", + signal: controller.signal, + }) + + clearTimeout(timer) + + if (!response.ok) { + throw new Error(`unexpected status ${response.status}`) + } + + const payload = (await response.json()) as InstanceStatusPayload + const idle = sanitizeIdleValue(payload.idle_instances) + const active = coerceStatusNumber(payload.active_instances) + const configured = coerceStatusNumber(payload.configured_instances) + const lastUpdated = coerceStatusNumber(payload.last_updated) + + metrics?.observe("ocr_dispatch.instances.idle", idle) + if (active !== undefined) { + metrics?.observe("ocr_dispatch.instances.active", active) + } + if (configured !== undefined) { + metrics?.observe("ocr_dispatch.instances.configured", configured) + } + if (lastUpdated !== undefined) { + metrics?.observe("ocr_dispatch.instances.last_updated", lastUpdated) + } + + // Success - return the idle count + if (attempt > 1) { + logger.info("Successfully fetched OCR instance status after retry", { + attempt, + idle, + endpoint, + }) + } + + return idle + } catch (error) { + lastError = error + clearTimeout(timer) + + const errorMessage = error instanceof Error ? error.message : String(error) + + if (attempt < STATUS_FETCH_MAX_RETRIES) { + const backoffMs = computeBackoffMs(attempt) + logger.warn( + `Failed to fetch OCR instance status (attempt ${attempt}/${STATUS_FETCH_MAX_RETRIES}), retrying in ${backoffMs}ms`, + { + error: errorMessage, + endpoint, + attempt, + backoffMs, + }, + ) + metrics?.incr("ocr_dispatch.status_fetch_retry") + + await sleep(backoffMs) + } else { + // Max retries exceeded + logger.error( + `Failed to fetch OCR instance status after ${STATUS_FETCH_MAX_RETRIES} attempts`, + { + error: errorMessage, + endpoint, + attempts: STATUS_FETCH_MAX_RETRIES, + }, + ) + metrics?.incr("ocr_dispatch.status_fetch_error") + + throw new Error( + `Instance status server unreachable after ${STATUS_FETCH_MAX_RETRIES} attempts: ${errorMessage}`, + ) + } + } + } + + // This should never be reached, but TypeScript needs it + const finalMessage = + lastError instanceof Error ? lastError.message : String(lastError ?? "unknown error") + throw new Error( + `Instance status server unreachable after ${STATUS_FETCH_MAX_RETRIES} attempts: ${finalMessage}`, + ) +} + +function computeBackoffMs(attempt: number): number { + const exponent = Math.max(0, attempt - 1) + const baseDelay = BACKOFF_BASE_MS * Math.pow(BACKOFF_FACTOR, exponent) + return applyBackoffJitter(baseDelay) +} + +async function processBatch( + batch: PdfOcrBatch, + options: { + requestTimeoutMs: number + maxRetries: number + logger: Pick + metrics: DispatchOptions["metrics"] + signal?: AbortSignal + sendBatch: ( + batch: PdfOcrBatch, + sendOptions: SendPdfBatchOptions, + ) => Promise + }, +): Promise { + const { + requestTimeoutMs, + maxRetries, + logger, + metrics = noopMetrics, + signal, + sendBatch, + } = options + + const totalAttemptsAllowed = Math.max(0, maxRetries) + 1 + let attempt = 0 + let lastError: unknown + + while (attempt < totalAttemptsAllowed) { + if (signal?.aborted) { + return { + id: batch.id, + status: "failed", + attempts: attempt, + latencyMs: 0, + error: "aborted", + } + } + + attempt += 1 + const attemptStartedAt = Date.now() + + try { + await sendBatch(batch, { timeoutMs: requestTimeoutMs }) + const latencyMs = Date.now() - attemptStartedAt + logger.info( + `[batch=${batch.id}] attempt=${attempt} latencyMs=${latencyMs} ok`, + ) + metrics.observe("ocr_dispatch.latency_ms", latencyMs, { + status: "ok", + }) + metrics.incr("ocr_dispatch.attempts", { status: "ok" }) + return { + id: batch.id, + status: "ok", + attempts: attempt, + latencyMs, + } + } catch (error) { + const latencyMs = Date.now() - attemptStartedAt + lastError = error + const errorMessage = error instanceof Error ? error.message : String(error) + + logger.warn( + `[batch=${batch.id}] attempt=${attempt} latencyMs=${latencyMs} error=${errorMessage}`, + ) + metrics.observe("ocr_dispatch.latency_ms", latencyMs, { + status: "failed", + }) + metrics.incr("ocr_dispatch.attempts", { status: "failed" }) + + if (attempt >= totalAttemptsAllowed) { + return { + id: batch.id, + status: "failed", + attempts: attempt, + latencyMs, + error: errorMessage, + } + } + + const backoffMs = computeBackoffMs(attempt) + logger.info( + `[batch=${batch.id}] attempt=${attempt} error=${errorMessage} backoffMs=${backoffMs}`, + ) + + try { + await sleep(backoffMs, signal) + } catch (abortError) { + if (isAbortError(abortError)) { + return { + id: batch.id, + status: "failed", + attempts: attempt, + latencyMs, + error: "aborted", + } + } + throw abortError + } + } + } + + const finalMessage = + lastError instanceof Error ? lastError.message : String(lastError ?? "") + return { + id: batch.id, + status: "failed", + attempts: totalAttemptsAllowed, + latencyMs: 0, + error: finalMessage || "failed", + } +} + +type InFlightState = { + batch: PdfOcrBatch + promise: Promise + done: boolean + result?: BatchDispatchResult +} + +function createInFlightState( + batch: PdfOcrBatch, + options: { + requestTimeoutMs: number + maxRetries: number + logger: Pick + metrics: DispatchOptions["metrics"] + signal?: AbortSignal + sendBatch: ( + batch: PdfOcrBatch, + sendOptions: SendPdfBatchOptions, + ) => Promise + }, +): InFlightState { + const state: InFlightState = { + batch, + promise: Promise.resolve({} as BatchDispatchResult), + done: false, + } + + state.promise = processBatch(batch, options) + .then((result) => { + state.done = true + state.result = result + return result + }) + .catch((error) => { + state.done = true + const errorMessage = error instanceof Error ? error.message : String(error) + const fallback: BatchDispatchResult = { + id: batch.id, + status: "failed", + attempts: 0, + latencyMs: 0, + error: errorMessage, + } + state.result = fallback + return fallback + }) + + return state +} + +async function waitForAtLeastOne( + inFlight: InFlightState[], +): Promise { + if (inFlight.length === 0) { + return [] + } + + await Promise.allSettled(inFlight.map((state) => state.promise)) + return inFlight +} + +function recordOutcome( + result: BatchDispatchResult, + perItem: DispatchReport["perItem"], + counters: { succeeded: number; failed: number }, +): void { + perItem.push(result) + if (result.status === "ok") { + counters.succeeded += 1 + } else { + counters.failed += 1 + } +} + +async function runSequentialDispatch( + batches: PdfOcrBatch[], + options: { + requestTimeoutMs: number + maxRetries: number + statusEndpoint: string + logger: Pick + metrics: DispatchOptions["metrics"] + signal?: AbortSignal + sendBatch: ( + batch: PdfOcrBatch, + sendOptions: SendPdfBatchOptions, + ) => Promise + }, + perItem: DispatchReport["perItem"], + counters: { succeeded: number; failed: number }, +): Promise { + const { logger, statusEndpoint, signal, sendBatch } = options + + const idle = await fetchIdleInstances(statusEndpoint, logger, options.metrics) + logger.info( + `[cycle=sequential] idle=${idle} remaining=${batches.length} inflight=0 dispatching=1`, + ) + + for (const batch of batches) { + if (signal?.aborted) { + recordOutcome( + { + id: batch.id, + status: "failed", + attempts: 0, + latencyMs: 0, + error: "aborted", + }, + perItem, + counters, + ) + continue + } + + const result = await processBatch(batch, { + ...options, + sendBatch, + }) + recordOutcome(result, perItem, counters) + } +} + +async function runConcurrentDispatch( + batches: PdfOcrBatch[], + options: { + requestTimeoutMs: number + maxRetries: number + statusEndpoint: string + pollIntervalMs: number + logger: Pick + metrics: DispatchOptions["metrics"] + signal?: AbortSignal + sendBatch: ( + batch: PdfOcrBatch, + sendOptions: SendPdfBatchOptions, + ) => Promise + }, + perItem: DispatchReport["perItem"], + counters: { succeeded: number; failed: number }, +): Promise { + const { + requestTimeoutMs, + maxRetries, + statusEndpoint, + pollIntervalMs, + logger, + metrics, + signal, + sendBatch, + } = options + + let inFlight: InFlightState[] = [] + const pendingQueue: PdfOcrBatch[] = [...batches] + let cycle = 0 + let aborted = signal?.aborted ?? false + + const onAbort = () => { + if (!aborted) { + aborted = true + logger.warn("Dispatch aborted by signal; draining in-flight tasks") + } + } + + signal?.addEventListener("abort", onAbort) + + try { + while (pendingQueue.length > 0 || inFlight.length > 0) { + cycle += 1 + + let idle = 0 + if (!aborted) { + idle = await fetchIdleInstances(statusEndpoint, logger, metrics) + } + + const remaining = pendingQueue.length + let toDispatch = 0 + if (!aborted && remaining > 0) { + toDispatch = idle > 0 ? Math.min(idle/2, remaining) : 1 + } + + logger.info( + `[cycle=${cycle}] idle=${idle} remaining=${remaining} inflight=${inFlight.length} dispatching=${toDispatch}${aborted ? " aborted=true" : ""}`, + ) + + for (let index = 0; index < toDispatch; index += 1) { + const nextBatch = pendingQueue.shift() + if (!nextBatch) { + break + } + const state = createInFlightState(nextBatch, { + requestTimeoutMs, + maxRetries, + logger, + metrics, + signal, + sendBatch, + }) + inFlight.push(state) + } + + if (inFlight.length === 0) { + if (aborted) { + while (pendingQueue.length > 0) { + const batch = pendingQueue.shift() + if (!batch) { + continue + } + recordOutcome( + { + id: batch.id, + status: "failed", + attempts: 0, + latencyMs: 0, + error: "aborted", + }, + perItem, + counters, + ) + } + } + break + } + + const completedStates = await waitForAtLeastOne(inFlight) + if (completedStates.length > 0) { + for (const state of completedStates) { + if (state.result) { + recordOutcome(state.result, perItem, counters) + } + } + inFlight = inFlight.filter((state) => !state.done) + } + + if (aborted) { + while (pendingQueue.length > 0) { + const batch = pendingQueue.shift() + if (!batch) { + continue + } + recordOutcome( + { + id: batch.id, + status: "failed", + attempts: 0, + latencyMs: 0, + error: "aborted", + }, + perItem, + counters, + ) + } + } + + if ( + (pendingQueue.length > 0 || inFlight.length > 0) && + !aborted && + pollIntervalMs > 0 + ) { + try { + await sleep(pollIntervalMs, signal) + } catch (error) { + if (isAbortError(error)) { + aborted = true + } else { + throw error + } + } + } + } + } finally { + signal?.removeEventListener("abort", onAbort) + } +} + +export async function dispatchOCRBatches( + batches: PdfOcrBatch[], + opts: DispatchOptions = {}, +): Promise { + const startedAt = Date.now() + const total = batches.length + + if (total === 0) { + return { + total: 0, + succeeded: 0, + failed: 0, + startedAt, + endedAt: startedAt, + perItem: [], + } + } + + const { + statusEndpoint = DEFAULT_STATUS_ENDPOINT, + pollIntervalMs = DEFAULT_POLL_INTERVAL_MS, + requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS, + maxRetries = DEFAULT_MAX_RETRIES, + thresholdForConcurrency = DEFAULT_THRESHOLD_FOR_CONCURRENCY, + signal, + logger: providedLogger, + metrics: providedMetrics, + sendBatch: providedSendBatch, + } = opts + + const logger = providedLogger ?? Logger ?? noopLogger + const metrics = providedMetrics ?? noopMetrics + const sendBatch = + providedSendBatch ?? + ((batch: PdfOcrBatch, options: SendPdfBatchOptions) => + sendPdfOcrBatch(batch, options)) + + const perItem: DispatchReport["perItem"] = [] + const counters = { succeeded: 0, failed: 0 } + + const dispatchOptions = { + requestTimeoutMs, + maxRetries, + statusEndpoint, + pollIntervalMs, + logger, + metrics, + signal, + sendBatch, + } + + if (total <= thresholdForConcurrency) { + await runSequentialDispatch(batches, dispatchOptions, perItem, counters) + } else { + await runConcurrentDispatch(batches, dispatchOptions, perItem, counters) + } + + const endedAt = Date.now() + + return { + total, + succeeded: counters.succeeded, + failed: counters.failed, + startedAt, + endedAt, + perItem, + } +} + function looksLikePdf(buffer: Buffer, fileName: string): boolean { if (fileName.toLowerCase().endsWith(".pdf")) { return true @@ -322,15 +1111,12 @@ async function callLayoutParsingApi( buffer: Buffer, fileName: string, ): Promise { - const baseUrl = (process.env.LAYOUT_PARSING_BASE_URL || DEFAULT_LAYOUT_PARSING_BASE_URL).replace(/\/+$/, '') + const baseUrl = LAYOUT_PARSING_BASE_URL.replace(/\/+$/, '') const apiUrl = baseUrl + '/' + LAYOUT_PARSING_API_PATH.replace(/^\/+/, '') const fileType = DEFAULT_LAYOUT_PARSING_FILE_TYPE const visualize = DEFAULT_LAYOUT_PARSING_VISUALIZE - const timeoutMs = Number.parseInt( - process.env.LAYOUT_PARSING_TIMEOUT_MS ?? "300000", - 10, - ) + const timeoutMs = LAYOUT_PARSING_TIMEOUT_MS Logger.info("Calling layout parsing API", { apiUrl, @@ -505,12 +1291,18 @@ async function splitPdfIntoBatches( buffer: Buffer, maxPagesPerBatch: number = DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST, preloadedPdf?: PDFDocument, -): Promise { +): Promise { const sourcePdf = preloadedPdf ?? (await PDFDocument.load(buffer)) const totalPages = sourcePdf.getPageCount() if (totalPages <= maxPagesPerBatch) { - return [buffer] + return [ + { + buffer, + startPage: 1, + endPage: totalPages, + }, + ] } Logger.info("Splitting large PDF into batches", { @@ -519,11 +1311,13 @@ async function splitPdfIntoBatches( estimatedBatches: Math.ceil(totalPages / maxPagesPerBatch), }) - const batches: Buffer[] = [] + const batches: PdfPageBatch[] = [] for (let startPage = 0; startPage < totalPages; startPage += maxPagesPerBatch) { const endPage = Math.min(startPage + maxPagesPerBatch, totalPages) const pageCount = endPage - startPage + const startPageNumber = startPage + 1 + const endPageNumber = endPage // Create new PDF with subset of pages const newPdf = await PDFDocument.create() @@ -538,12 +1332,16 @@ async function splitPdfIntoBatches( } const pdfBytes = await newPdf.save() - batches.push(Buffer.from(pdfBytes)) + batches.push({ + buffer: Buffer.from(pdfBytes), + startPage: startPageNumber, + endPage: endPageNumber, + }) Logger.info("Created PDF batch", { batchIndex: batches.length, - startPage: startPage + 1, - endPage, + startPage: startPageNumber, + endPage: endPageNumber, pagesInBatch: pageCount, batchSizeBytes: pdfBytes.length, }) @@ -576,77 +1374,85 @@ function mergeLayoutParsingResults( } } -async function processBatchesConcurrently( - batches: Buffer[], +async function processPdfBatchesWithDispatcher( + batches: PdfPageBatch[], fileName: string, ): Promise { - Logger.info("Processing PDF batches concurrently", { + if (batches.length === 0) { + return [] + } + + if (batches.length === 1) { + const singleBatch = batches[0] + Logger.info("Processing single PDF batch via OCR pipeline", { + fileName, + startPage: singleBatch.startPage, + endPage: singleBatch.endPage, + pagesInBatch: singleBatch.endPage - singleBatch.startPage + 1, + }) + + const result = await callLayoutParsingApi( + singleBatch.buffer, + `${fileName}_pages_${singleBatch.startPage}-${singleBatch.endPage}`, + ) + + Logger.info("Completed single PDF batch", { + fileName, + layoutResultsCount: result.layoutParsingResults?.length ?? 0, + }) + + return [result] + } + + Logger.info("Dispatching PDF batches via OCR dispatcher", { fileName, totalBatches: batches.length, }) - return Promise.all( - batches.map(async (batch, index) => { - const batchIndex = index + 1 - Logger.info("Processing PDF batch", { - fileName, - batchIndex, - totalBatches: batches.length, - batchSizeBytes: batch.length, + const resultsByBatchId = new Map() + const dispatchBatches: PdfOcrBatch[] = batches.map((batch, index) => ({ + id: `${fileName}_batch_${index + 1}`, + fileName, + startPage: batch.startPage, + endPage: batch.endPage, + pdfBuffer: batch.buffer, + })) + + const dispatchOptions: DispatchOptions = { + logger: Logger, + sendBatch: async (batch) => { + const label = `${batch.fileName}_pages_${batch.startPage}-${batch.endPage}` + Logger.info("Sending PDF batch to layout parsing API", { + batchId: batch.id, + fileName: batch.fileName, + startPage: batch.startPage, + endPage: batch.endPage, + pagesInBatch: batch.endPage - batch.startPage + 1, }) - const batchResult = await callLayoutParsingApi( - batch, - `${fileName}_batch_${batchIndex}`, - ) + const result = await callLayoutParsingApi(batch.pdfBuffer, label) + resultsByBatchId.set(batch.id, result) Logger.info("Completed PDF batch", { - fileName, - batchIndex, - layoutResultsCount: - batchResult.layoutParsingResults?.length ?? 0, + batchId: batch.id, + layoutResultsCount: result.layoutParsingResults?.length ?? 0, }) - return batchResult - }), - ) -} - -async function processBatchesSequentially( - batches: Buffer[], - fileName: string, -): Promise { - Logger.info("Processing PDF batches sequentially", { - fileName, - totalBatches: batches.length, - }) + }, + } - const batchResults: LayoutParsingApiPayload[] = [] + const report = await dispatchOCRBatches(dispatchBatches, dispatchOptions) - for (let index = 0; index < batches.length; index++) { - const batch = batches[index] - const batchIndex = index + 1 - - Logger.info("Processing PDF batch sequentially", { - fileName, - batchIndex, - totalBatches: batches.length, - batchSizeBytes: batch.length, - }) - - const batchResult = await callLayoutParsingApi( - batch, - `${fileName}_batch_${batchIndex}`, + if (report.failed > 0) { + throw new Error( + `Failed to process ${report.failed} PDF batch(es) via OCR dispatcher`, ) - - Logger.info("Completed PDF batch sequentially", { - fileName, - batchIndex, - layoutResultsCount: - batchResult.layoutParsingResults?.length ?? 0, - }) - - batchResults.push(batchResult) } - return batchResults + return dispatchBatches.map((batch) => { + const batchResult = resultsByBatchId.get(batch.id) + if (!batchResult) { + throw new Error(`Missing OCR result for batch ${batch.id}`) + } + return batchResult + }) } export async function chunkByOCRFromBuffer( @@ -654,13 +1460,9 @@ export async function chunkByOCRFromBuffer( fileName: string, docId: string, ): Promise { - const maxPagesEnv = Number.parseInt( - process.env.LAYOUT_PARSING_MAX_PAGES_PER_REQUEST ?? "", - 10, - ) const maxPagesPerRequest = - Number.isFinite(maxPagesEnv) && maxPagesEnv > 0 - ? maxPagesEnv + Number.isFinite(LAYOUT_PARSING_MAX_PAGES_PER_REQUEST) && LAYOUT_PARSING_MAX_PAGES_PER_REQUEST > 0 + ? LAYOUT_PARSING_MAX_PAGES_PER_REQUEST : DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST let finalApiResult: LayoutParsingApiPayload @@ -686,12 +1488,12 @@ export async function chunkByOCRFromBuffer( fileName, totalPages, batches: batches.length, - processingMode: USE_SEQUENTIAL_BATCH_PROCESSING ? 'sequential' : 'concurrent', }) - const batchResults = USE_SEQUENTIAL_BATCH_PROCESSING - ? await processBatchesSequentially(batches, fileName) - : await processBatchesConcurrently(batches, fileName) + const batchResults = await processPdfBatchesWithDispatcher( + batches, + fileName, + ) finalApiResult = mergeLayoutParsingResults(batchResults) Logger.info("Merged batch results", { @@ -734,6 +1536,127 @@ export async function chunkByOCRFromBuffer( return chunkByOCR(docId, ocrResponse, images, imageMetadata) } +export async function demoPdfOcrDispatcher(): Promise { + const samplePdfPath = + "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" + + const pdfBuffer = await fsPromises.readFile(samplePdfPath) + const pdfDocument = await PDFDocument.load(pdfBuffer) + const pdfFileName = path.basename(samplePdfPath) + + const pageBatches = await splitPdfIntoBatches(pdfBuffer, 25, pdfDocument) + if (pageBatches.length === 0) { + console.warn("[demoPdfOcrDispatcher] No batches generated from sample PDF") + return + } + + const dispatchBatches: PdfOcrBatch[] = pageBatches.map((batch, index) => ({ + id: `demo-batch-${index + 1}`, + fileName: pdfFileName, + startPage: batch.startPage, + endPage: batch.endPage, + pdfBuffer: batch.buffer, + })) + + const sampleBatches = dispatchBatches.slice(0, Math.min(dispatchBatches.length, 6)) + + const scenarios: Array<{ label: string; idleSequence: number[] }> = [ + { label: "idle=5", idleSequence: Array(8).fill(5) }, + { label: "idle=8", idleSequence: Array(8).fill(8) }, + { label: "idle=1", idleSequence: Array(8).fill(1) }, + { label: "idle=0", idleSequence: Array(8).fill(0) }, + { label: "idle sequence 5โ†’8โ†’1โ†’0", idleSequence: [5, 8, 1, 0] }, + ] + + for (const scenario of scenarios) { + console.log(`\n[demoPdfOcrDispatcher] Scenario: ${scenario.label}`) + await runScenario(sampleBatches, scenario.idleSequence, scenario.label) + } +} + +async function runScenario( + batches: PdfOcrBatch[], + idleSequence: number[], + label: string, +): Promise { + let statusCallCount = 0 + const originalFetch = globalThis.fetch + + globalThis.fetch = async (input, init) => { + if (typeof input === "string" && input.includes("/instance_status")) { + const idle = + idleSequence[ + statusCallCount < idleSequence.length + ? statusCallCount + : idleSequence.length - 1 + ] ?? 0 + statusCallCount += 1 + + return new Response( + JSON.stringify({ + active_instances: Math.max(0, idle), + idle_instances: idle, + }), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ) + } + + throw new Error( + `[demoPdfOcrDispatcher] Unexpected fetch target: ${String(input)}`, + ) + } + + const logger: Pick = { + info(message?: unknown, meta?: unknown) { + console.log( + `[${label}] INFO ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + warn(message?: unknown, meta?: unknown) { + console.warn( + `[${label}] WARN ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + error(message?: unknown, meta?: unknown) { + console.error( + `[${label}] ERROR ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + } + + try { + const report = await dispatchOCRBatches(batches, { + logger, + thresholdForConcurrency: 1, + pollIntervalMs: 50, + sendBatch: async (batch, { timeoutMs }) => { + console.log( + `[${label}] Dispatching ${batch.id} pages ${batch.startPage}-${batch.endPage} (timeout ${timeoutMs}ms)`, + ) + await sleep(40) + console.log( + `[${label}] Completed ${batch.id} pages ${batch.startPage}-${batch.endPage}`, + ) + }, + }) + + console.log( + `[${label}] Report total=${report.total} succeeded=${report.succeeded} failed=${report.failed}`, + ) + } finally { + globalThis.fetch = originalFetch + } +} + export async function chunkByOCR( docId: string, ocrResponse: OcrResponse, @@ -746,13 +1669,9 @@ export async function chunkByOCR( const image_chunks_map: ChunkMetadata[] = [] const globalSeq: GlobalSeq = { value: 0 } - const maxChunkBytes = Number.parseInt( - process.env.OCR_MAX_CHUNK_BYTES ?? "", - 1024, - ) const chunkSizeLimit = - Number.isFinite(maxChunkBytes) && maxChunkBytes > 0 - ? maxChunkBytes + Number.isFinite(OCR_MAX_CHUNK_BYTES) && OCR_MAX_CHUNK_BYTES > 0 + ? OCR_MAX_CHUNK_BYTES : DEFAULT_MAX_CHUNK_BYTES let currentTextBuffer = "" @@ -760,7 +1679,7 @@ export async function chunkByOCR( let currentPageNumbers: Set = new Set() let lastTextChunk: string | null = null - const imageBaseDir = path.resolve(process.env.IMAGE_DIR || DEFAULT_IMAGE_DIR) + const imageBaseDir = path.resolve(IMAGE_DIR) const docImageDir = path.join(imageBaseDir, docId) await fsPromises.mkdir(docImageDir, { recursive: true }) const savedImages = new Set() @@ -868,7 +1787,7 @@ export async function chunkByOCR( } const extension = detectImageExtension(imageBuffer) - const fileName = String(globalSeq.value) + "." + extension + const fileName = globalSeq.value.toString() + "." + extension const uniqueFileName = ensureUniqueFileName(fileName, usedFileNames) const imagePath = path.join(docImageDir, uniqueFileName) diff --git a/server/scripts/testOcrDispatcher.ts b/server/scripts/testOcrDispatcher.ts new file mode 100644 index 000000000..771473a8a --- /dev/null +++ b/server/scripts/testOcrDispatcher.ts @@ -0,0 +1,464 @@ +#!/usr/bin/env bun +/** + * Test file for OCR Dispatcher Logic + * + * This script tests the dispatch logic implementation in chunkByOCR.ts + * It mocks the instance status endpoint and PDF processing to demonstrate + * how batches are processed under various scenarios. + * + * Run with: bun run server/scripts/testOcrDispatcher.ts + */ + +import { promises as fsPromises } from "fs" +import * as path from "path" +import { PDFDocument } from "pdf-lib" +import type { PdfOcrBatch, DispatchOptions, DispatchReport } from "../lib/chunkByOCR" + +// Import the dispatcher function +import { dispatchOCRBatches } from "../lib/chunkByOCR" + +// ============================================================================ +// MOCK CONFIGURATION +// ============================================================================ + +const MOCK_PDF_PATH = "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" +const PAGES_PER_BATCH = 100 +const MAX_BATCHES_TO_TEST = 100 + +// ============================================================================ +// HELPER FUNCTIONS +// ============================================================================ + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function createMockBatches( + pdfPath: string, + pagesPerBatch: number, + maxBatches: number, +): Promise { + console.log(`\n๐Ÿ“„ Loading PDF: ${path.basename(pdfPath)}`) + + const pdfBuffer = await fsPromises.readFile(pdfPath) + const pdfDocument = await PDFDocument.load(pdfBuffer) + const totalPages = pdfDocument.getPageCount() + const pdfFileName = path.basename(pdfPath) + + console.log(` Total pages: ${totalPages}`) + console.log(` Pages per batch: ${pagesPerBatch}`) + + const batches: PdfOcrBatch[] = [] + + for (let startPage = 0; startPage < totalPages && batches.length < maxBatches; startPage += pagesPerBatch) { + const endPage = Math.min(startPage + pagesPerBatch, totalPages) + const pageCount = endPage - startPage + + // Create new PDF with subset of pages + const newPdf = await PDFDocument.create() + const pageIndices: number[] = [] + for (let i = 0; i < pageCount; i++) { + pageIndices.push(startPage + i) + } + + const copiedPages = await newPdf.copyPages(pdfDocument, pageIndices) + for (const page of copiedPages) { + newPdf.addPage(page) + } + + const pdfBytes = await newPdf.save() + + batches.push({ + id: `batch-${batches.length + 1}`, + fileName: pdfFileName, + startPage: startPage + 1, + endPage: endPage, + pdfBuffer: Buffer.from(pdfBytes), + }) + } + + console.log(` Created ${batches.length} batches\n`) + return batches +} + +// ============================================================================ +// MOCK INSTANCE STATUS ENDPOINT +// ============================================================================ + +type MockStatusConfig = { + idleSequence: number[] + label: string + serverDown?: boolean +} + +function createMockFetch(config: MockStatusConfig) { + let statusCallCount = 0 + const originalFetch = globalThis.fetch + + return { + install: () => { + globalThis.fetch = async (input, init) => { + if (typeof input === "string" && input.includes("/instance_status")) { + statusCallCount += 1 + + // Simulate server being unreachable + if (config.serverDown) { + throw new Error("ECONNREFUSED: Connection refused - instance status server is down") + } + + const idle = + config.idleSequence[ + statusCallCount < config.idleSequence.length + ? statusCallCount - 1 + : config.idleSequence.length - 1 + ] ?? 0 + + return new Response( + JSON.stringify({ + active_instances: Math.max(0, 10 - idle), + configured_instances: 10, + idle_instances: idle, + last_updated: Date.now(), + }), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ) + } + + throw new Error( + `[testOcrDispatcher] Unexpected fetch target: ${String(input)}`, + ) + } + }, + restore: () => { + globalThis.fetch = originalFetch + }, + getCallCount: () => statusCallCount, + } +} + +// ============================================================================ +// MOCK BATCH PROCESSOR +// ============================================================================ + +type BatchProcessingStats = { + totalDispatched: number + totalCompleted: number + totalFailed: number + batchTimings: Array<{ + id: string + startTime: number + endTime: number + duration: number + }> +} + +function createMockBatchProcessor() { + const stats: BatchProcessingStats = { + totalDispatched: 0, + totalCompleted: 0, + totalFailed: 0, + batchTimings: [], + } + + const sendBatch = async ( + batch: PdfOcrBatch, + options: { timeoutMs: number }, + ): Promise => { + stats.totalDispatched += 1 + const startTime = Date.now() + + console.log( + ` ๐Ÿš€ Dispatching ${batch.id} (pages ${batch.startPage}-${batch.endPage}, timeout ${options.timeoutMs}ms)`, + ) + + // Simulate processing time (30-80ms) + const processingTime = 30 + Math.random() * 50 + await sleep(processingTime) + + // Simulate occasional failures (5% chance) + if (Math.random() < 0.05) { + stats.totalFailed += 1 + throw new Error(`Simulated processing failure for ${batch.id}`) + } + + const endTime = Date.now() + stats.totalCompleted += 1 + stats.batchTimings.push({ + id: batch.id, + startTime, + endTime, + duration: endTime - startTime, + }) + + console.log( + ` โœ… Completed ${batch.id} (${Math.round(processingTime)}ms)`, + ) + } + + return { sendBatch, stats } +} + +// ============================================================================ +// CUSTOM LOGGER +// ============================================================================ + +function createTestLogger(scenarioLabel: string) { + return { + info(message?: unknown, meta?: unknown) { + const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" + console.log(` [${scenarioLabel}] โ„น๏ธ ${String(message)}${metaStr}`) + }, + warn(message?: unknown, meta?: unknown) { + const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" + console.warn(` [${scenarioLabel}] โš ๏ธ ${String(message)}${metaStr}`) + }, + error(message?: unknown, meta?: unknown) { + const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" + console.error(` [${scenarioLabel}] โŒ ${String(message)}${metaStr}`) + }, + } +} + +// ============================================================================ +// METRICS COLLECTOR +// ============================================================================ + +type MetricsData = { + counters: Record + observations: Array<{ + name: string + value: number + tags?: Record + }> +} + +function createMetricsCollector() { + const data: MetricsData = { + counters: {}, + observations: [], + } + + return { + incr(name: string, tags?: Record) { + const key = tags ? `${name}:${JSON.stringify(tags)}` : name + data.counters[key] = (data.counters[key] || 0) + 1 + }, + observe(name: string, value: number, tags?: Record) { + data.observations.push({ name, value, tags }) + }, + getData: () => data, + } +} + +// ============================================================================ +// TEST SCENARIOS +// ============================================================================ + +type TestScenario = { + name: string + description: string + idleSequence: number[] + thresholdForConcurrency: number + pollIntervalMs: number + maxRetries: number +} + +const TEST_SCENARIOS: TestScenario[] = [ + { + name: "Sequential Processing (Low Batch Count)", + description: "Tests sequential dispatch when batch count is below threshold", + idleSequence: [5], + thresholdForConcurrency: 100, + pollIntervalMs: 50, + maxRetries: 2, + }, + { + name: "Concurrent - High Idle Capacity", + description: "Tests concurrent dispatch with consistently high idle instances", + idleSequence: [8, 8, 8, 8, 8], + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 2, + }, + { + name: "Concurrent - Low Idle Capacity", + description: "Tests concurrent dispatch with low idle instances", + idleSequence: [1, 1, 1, 1, 1], + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 2, + }, + { + name: "Concurrent - No Idle Capacity", + description: "Tests behavior when no idle instances are available", + idleSequence: [0, 0, 0, 1, 2], + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 2, + }, + { + name: "Concurrent - Fluctuating Capacity", + description: "Tests adaptive dispatch with varying idle instances", + idleSequence: [5, 8, 3, 1, 0, 2, 6, 4], + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 2, + }, + { + name: "Retry Logic Test", + description: "Tests retry behavior with higher failure rate", + idleSequence: [5, 5, 5], + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 3, + }, + { + name: "Instance Server Down", + description: "Tests behavior when instance status server is unreachable", + idleSequence: [], // Not used - server will be unreachable + thresholdForConcurrency: 1, + pollIntervalMs: 50, + maxRetries: 2, + }, +] + +// ============================================================================ +// RUN SCENARIO +// ============================================================================ + +async function runScenario( + scenario: TestScenario, + batches: PdfOcrBatch[], +): Promise { + console.log(`\n${"=".repeat(80)}`) + console.log(`๐Ÿงช TEST: ${scenario.name}`) + console.log(` ${scenario.description}`) + console.log(` Batches: ${batches.length}`) + console.log(` Idle sequence: [${scenario.idleSequence.join(", ")}]`) + console.log(` Threshold: ${scenario.thresholdForConcurrency}`) + console.log(`${"=".repeat(80)}\n`) + + // Check if this is the "Instance Server Down" scenario + const isServerDownScenario = scenario.name === "Instance Server Down" + + const mockFetch = createMockFetch({ + idleSequence: scenario.idleSequence, + label: scenario.name, + serverDown: isServerDownScenario, + }) + const mockProcessor = createMockBatchProcessor() + const logger = createTestLogger(scenario.name) + const metrics = createMetricsCollector() + + mockFetch.install() + + try { + const startTime = Date.now() + + const options: DispatchOptions = { + logger, + metrics, + sendBatch: mockProcessor.sendBatch, + thresholdForConcurrency: scenario.thresholdForConcurrency, + pollIntervalMs: scenario.pollIntervalMs, + maxRetries: scenario.maxRetries, + } + + const report: DispatchReport = await dispatchOCRBatches(batches, options) + + const endTime = Date.now() + const totalDuration = endTime - startTime + + // Print results + console.log(`\n๐Ÿ“Š RESULTS:`) + console.log(` Total batches: ${report.total}`) + console.log(` Succeeded: ${report.succeeded} โœ…`) + console.log(` Failed: ${report.failed} โŒ`) + console.log(` Total duration: ${totalDuration}ms`) + console.log(` Status endpoint calls: ${mockFetch.getCallCount()}`) + + // Print per-batch details + console.log(`\n๐Ÿ“‹ Per-Batch Details:`) + for (const item of report.perItem) { + const status = item.status === "ok" ? "โœ…" : "โŒ" + const error = item.error ? ` (${item.error})` : "" + console.log( + ` ${status} ${item.id}: ${item.attempts} attempt(s), ${item.latencyMs}ms${error}`, + ) + } + + // Print metrics summary + const metricsData = metrics.getData() + console.log(`\n๐Ÿ“ˆ Metrics:`) + console.log(` Counters:`, metricsData.counters) + + const latencies = metricsData.observations + .filter((o) => o.name === "ocr_dispatch.latency_ms") + .map((o) => o.value) + + if (latencies.length > 0) { + const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length + const minLatency = Math.min(...latencies) + const maxLatency = Math.max(...latencies) + console.log(` Latency: avg=${Math.round(avgLatency)}ms, min=${Math.round(minLatency)}ms, max=${Math.round(maxLatency)}ms`) + } + + // Print processing stats + console.log(`\n๐Ÿ”ง Processing Stats:`) + console.log(` Total dispatched: ${mockProcessor.stats.totalDispatched}`) + console.log(` Total completed: ${mockProcessor.stats.totalCompleted}`) + console.log(` Total failed: ${mockProcessor.stats.totalFailed}`) + + } catch (error) { + console.error(`\nโŒ Scenario failed with error:`, error) + } finally { + mockFetch.restore() + } +} + +// ============================================================================ +// MAIN EXECUTION +// ============================================================================ + +async function main() { + console.log(` +โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— +โ•‘ OCR DISPATCHER TEST SUITE โ•‘ +โ•‘ โ•‘ +โ•‘ This test suite verifies the dispatch logic implementation in โ•‘ +โ•‘ chunkByOCR.ts by running various scenarios with mocked dependencies. โ•‘ +โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• +`) + + try { + // Create mock batches from the PDF + const batches = await createMockBatches( + MOCK_PDF_PATH, + PAGES_PER_BATCH, + MAX_BATCHES_TO_TEST, + ) + + // Run all test scenarios + for (const scenario of TEST_SCENARIOS) { + await runScenario(scenario, batches) // Use first 6 batches for testing + await sleep(100) // Small delay between scenarios + } + + console.log(`\n${"=".repeat(80)}`) + console.log(`โœ… All test scenarios completed successfully!`) + console.log(`${"=".repeat(80)}\n`) + + } catch (error) { + console.error(`\nโŒ Test suite failed:`, error) + process.exit(1) + } +} + +// Run the test suite +main().catch((error) => { + console.error("Fatal error:", error) + process.exit(1) +}) From 30c80a963968eaeb24467f69277ab0f4a3996bb4 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 12:12:12 +0530 Subject: [PATCH 2/7] feat: XYN-203 enable for drive pdfs mail attachments etc --- server/integrations/dataSource/index.ts | 24 ++++++++++++------- server/integrations/google/worker-utils.ts | 12 ++++++---- .../microsoft/attachment-utils.ts | 11 +++++---- server/lib/chunkPdfWithGemini.ts | 3 --- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/server/integrations/dataSource/index.ts b/server/integrations/dataSource/index.ts index ee3f6708c..b6a4de193 100644 --- a/server/integrations/dataSource/index.ts +++ b/server/integrations/dataSource/index.ts @@ -32,7 +32,7 @@ import { } from "./errors" import { describeImageWithllm } from "@/lib/describeImageWithllm" import { promises as fsPromises } from "fs" -import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" +import { PdfProcessor } from "@/lib/pdfProcessor" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" import imageType from "image-type" @@ -208,9 +208,15 @@ const processPdfContent = async ( ): Promise => { try { const docId = `dsf-${createId()}` - const { text_chunks, image_chunks, text_chunk_pos, image_chunk_pos } = - await extractTextAndImagesWithChunksFromPDFviaGemini(pdfBuffer, docId) - if (text_chunks.length === 0 && image_chunks.length === 0) { + const result = await PdfProcessor.processWithFallback( + Buffer.from(pdfBuffer), + options.fileName, + docId, + true, + true, + ) + + if (result.chunks.length === 0 && result.image_chunks.length === 0) { throw new ContentExtractionError( "No chunks generated from PDF content", "PDF", @@ -218,12 +224,12 @@ const processPdfContent = async ( } return createVespaDataSourceFile( - text_chunks, + result.chunks, options, - "pdf_processing", - image_chunks, - text_chunk_pos, - image_chunk_pos, + result.processingMethod || "pdf_processing", + result.image_chunks, + result.chunks_pos, + result.image_chunks_pos, docId, ) } catch (error) { diff --git a/server/integrations/google/worker-utils.ts b/server/integrations/google/worker-utils.ts index 746dd8353..79088a3ce 100644 --- a/server/integrations/google/worker-utils.ts +++ b/server/integrations/google/worker-utils.ts @@ -18,7 +18,7 @@ import { import * as XLSX from "xlsx" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" -import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" +import { PdfProcessor } from "@/lib/pdfProcessor" import { chunkSheetWithHeaders } from "@/sheetChunk" import { checkFileSize } from "../dataSource" @@ -49,12 +49,14 @@ const processPdfFile = async ( attachmentId: string, ): Promise => { try { - // Handle non-spreadsheet files as before - const pdfResult = await extractTextAndImagesWithChunksFromPDFviaGemini( - pdfBuffer, + const result = await PdfProcessor.processWithFallback( + Buffer.from(pdfBuffer), + `attachment-${attachmentId}`, attachmentId, + false, + false, ) - return pdfResult.text_chunks.filter((v) => v.trim()) + return result.chunks.filter((v) => v.trim()) } catch (error) { Logger.error(error, `Error processing PDF buffer`) return [] diff --git a/server/integrations/microsoft/attachment-utils.ts b/server/integrations/microsoft/attachment-utils.ts index 1b62276b2..16a4bf63c 100644 --- a/server/integrations/microsoft/attachment-utils.ts +++ b/server/integrations/microsoft/attachment-utils.ts @@ -16,7 +16,7 @@ import { import * as XLSX from "xlsx" import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" import { extractTextAndImagesWithChunksFromPptx } from "@/pptChunks" -import { extractTextAndImagesWithChunksFromPDFviaGemini } from "@/lib/chunkPdfWithGemini" +import { PdfProcessor } from "@/lib/pdfProcessor" import { makeGraphApiCall, type MicrosoftGraphClient } from "./client" import { chunkSheetWithHeaders } from "@/sheetChunk" import { checkFileSize } from "../dataSource" @@ -49,11 +49,14 @@ const processPdfFile = async ( attachmentId: string, ): Promise => { try { - const pdfResult = await extractTextAndImagesWithChunksFromPDFviaGemini( - pdfBuffer, + const result = await PdfProcessor.processWithFallback( + Buffer.from(pdfBuffer), + `attachment-${attachmentId}`, attachmentId, + false, + false, ) - return pdfResult.text_chunks.filter((v) => v.trim()) + return result.chunks.filter((v) => v.trim()) } catch (error) { Logger.error(error, `Error processing PDF buffer`) return [] diff --git a/server/lib/chunkPdfWithGemini.ts b/server/lib/chunkPdfWithGemini.ts index 0627b394d..ae4dc535b 100644 --- a/server/lib/chunkPdfWithGemini.ts +++ b/server/lib/chunkPdfWithGemini.ts @@ -348,9 +348,6 @@ export function parseGeminiChunkBlocks(raw: string): string[] { } /** - * Gemini-backed PDF extractor that returns the same shape as - * extractTextAndImagesWithChunksFromPDF in server/pdfChunks.ts. - * * Notes: * - image_chunks and image_chunk_pos are intentionally empty. * - Maintains chunk positions sequentially (0..n-1), equivalent to From fba575a396877bc55411c5074da6757388a330f7 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 13:15:02 +0530 Subject: [PATCH 3/7] feat: XYN-189 removed local testing function --- server/lib/chunkByOCR.ts | 156 +-------------------------------------- 1 file changed, 2 insertions(+), 154 deletions(-) diff --git a/server/lib/chunkByOCR.ts b/server/lib/chunkByOCR.ts index 6be043d46..ab8d220f7 100644 --- a/server/lib/chunkByOCR.ts +++ b/server/lib/chunkByOCR.ts @@ -586,7 +586,7 @@ function createInFlightState( return state } -async function waitForAtLeastOne( +async function waitForAll( inFlight: InFlightState[], ): Promise { if (inFlight.length === 0) { @@ -759,7 +759,7 @@ async function runConcurrentDispatch( break } - const completedStates = await waitForAtLeastOne(inFlight) + const completedStates = await waitForAll(inFlight) if (completedStates.length > 0) { for (const state of completedStates) { if (state.result) { @@ -1076,37 +1076,6 @@ function normalizeBlockContent(block: OcrBlock): string { return content.replace(/\s+/g, " ").trim() } -function deriveImageFileName( - preferredName: string | undefined, - bboxKey: string | null | undefined, - buffer: Buffer, - imageIndex: number, - pageIndex: number, -): string { - const ext = detectImageExtension(buffer) - - if (preferredName) { - const sanitized = sanitizeFileName(preferredName) - const parsed = path.parse(sanitized) - - if (parsed.ext) { - const normalizedExt = parsed.ext.replace(/\./, "") - if (normalizedExt.toLowerCase() !== ext) { - return `${parsed.name || `image_${imageIndex}`}.${ext}` - } - return sanitized - } - - return `${parsed.name || `image_${imageIndex}`}.${ext}` - } - - if (bboxKey) { - return `img_in_image_box_${bboxKey}.${ext}` - } - - return `page_${pageIndex + 1}_image_${imageIndex}.${ext}` -} - async function callLayoutParsingApi( buffer: Buffer, fileName: string, @@ -1536,127 +1505,6 @@ export async function chunkByOCRFromBuffer( return chunkByOCR(docId, ocrResponse, images, imageMetadata) } -export async function demoPdfOcrDispatcher(): Promise { - const samplePdfPath = - "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" - - const pdfBuffer = await fsPromises.readFile(samplePdfPath) - const pdfDocument = await PDFDocument.load(pdfBuffer) - const pdfFileName = path.basename(samplePdfPath) - - const pageBatches = await splitPdfIntoBatches(pdfBuffer, 25, pdfDocument) - if (pageBatches.length === 0) { - console.warn("[demoPdfOcrDispatcher] No batches generated from sample PDF") - return - } - - const dispatchBatches: PdfOcrBatch[] = pageBatches.map((batch, index) => ({ - id: `demo-batch-${index + 1}`, - fileName: pdfFileName, - startPage: batch.startPage, - endPage: batch.endPage, - pdfBuffer: batch.buffer, - })) - - const sampleBatches = dispatchBatches.slice(0, Math.min(dispatchBatches.length, 6)) - - const scenarios: Array<{ label: string; idleSequence: number[] }> = [ - { label: "idle=5", idleSequence: Array(8).fill(5) }, - { label: "idle=8", idleSequence: Array(8).fill(8) }, - { label: "idle=1", idleSequence: Array(8).fill(1) }, - { label: "idle=0", idleSequence: Array(8).fill(0) }, - { label: "idle sequence 5โ†’8โ†’1โ†’0", idleSequence: [5, 8, 1, 0] }, - ] - - for (const scenario of scenarios) { - console.log(`\n[demoPdfOcrDispatcher] Scenario: ${scenario.label}`) - await runScenario(sampleBatches, scenario.idleSequence, scenario.label) - } -} - -async function runScenario( - batches: PdfOcrBatch[], - idleSequence: number[], - label: string, -): Promise { - let statusCallCount = 0 - const originalFetch = globalThis.fetch - - globalThis.fetch = async (input, init) => { - if (typeof input === "string" && input.includes("/instance_status")) { - const idle = - idleSequence[ - statusCallCount < idleSequence.length - ? statusCallCount - : idleSequence.length - 1 - ] ?? 0 - statusCallCount += 1 - - return new Response( - JSON.stringify({ - active_instances: Math.max(0, idle), - idle_instances: idle, - }), - { - status: 200, - headers: { "Content-Type": "application/json" }, - }, - ) - } - - throw new Error( - `[demoPdfOcrDispatcher] Unexpected fetch target: ${String(input)}`, - ) - } - - const logger: Pick = { - info(message?: unknown, meta?: unknown) { - console.log( - `[${label}] INFO ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - warn(message?: unknown, meta?: unknown) { - console.warn( - `[${label}] WARN ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - error(message?: unknown, meta?: unknown) { - console.error( - `[${label}] ERROR ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - } - - try { - const report = await dispatchOCRBatches(batches, { - logger, - thresholdForConcurrency: 1, - pollIntervalMs: 50, - sendBatch: async (batch, { timeoutMs }) => { - console.log( - `[${label}] Dispatching ${batch.id} pages ${batch.startPage}-${batch.endPage} (timeout ${timeoutMs}ms)`, - ) - await sleep(40) - console.log( - `[${label}] Completed ${batch.id} pages ${batch.startPage}-${batch.endPage}`, - ) - }, - }) - - console.log( - `[${label}] Report total=${report.total} succeeded=${report.succeeded} failed=${report.failed}`, - ) - } finally { - globalThis.fetch = originalFetch - } -} - export async function chunkByOCR( docId: string, ocrResponse: OcrResponse, From f80d6fab6aa2e92b973a86a4138b5c1ba3865038 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 13:18:46 +0530 Subject: [PATCH 4/7] feat: XYN-189 remved local file paths --- server/scripts/testOcrDispatcher.ts | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/server/scripts/testOcrDispatcher.ts b/server/scripts/testOcrDispatcher.ts index 771473a8a..9bc3acae0 100644 --- a/server/scripts/testOcrDispatcher.ts +++ b/server/scripts/testOcrDispatcher.ts @@ -6,7 +6,8 @@ * It mocks the instance status endpoint and PDF processing to demonstrate * how batches are processed under various scenarios. * - * Run with: bun run server/scripts/testOcrDispatcher.ts + * Usage: bun run server/scripts/testOcrDispatcher.ts + * Example: bun run server/scripts/testOcrDispatcher.ts ./sample.pdf */ import { promises as fsPromises } from "fs" @@ -21,7 +22,6 @@ import { dispatchOCRBatches } from "../lib/chunkByOCR" // MOCK CONFIGURATION // ============================================================================ -const MOCK_PDF_PATH = "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" const PAGES_PER_BATCH = 100 const MAX_BATCHES_TO_TEST = 100 @@ -433,10 +433,33 @@ async function main() { โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• `) + // Parse command-line arguments + const args = process.argv.slice(2) + + if (args.length === 0) { + console.error(` +โŒ Error: PDF file path is required + +Usage: bun run server/scripts/testOcrDispatcher.ts +Example: bun run server/scripts/testOcrDispatcher.ts ./sample.pdf +`) + process.exit(1) + } + + const pdfPath = args[0] + + // Validate that the file exists + try { + await fsPromises.access(pdfPath) + } catch (error) { + console.error(`\nโŒ Error: PDF file not found at path: ${pdfPath}\n`) + process.exit(1) + } + try { // Create mock batches from the PDF const batches = await createMockBatches( - MOCK_PDF_PATH, + pdfPath, PAGES_PER_BATCH, MAX_BATCHES_TO_TEST, ) From 0fa6588f9d9964011f0d56867779a43566b70823 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 14:10:35 +0530 Subject: [PATCH 5/7] Revert "feat: XYN-189 remved local file paths" This reverts commit f80d6fab6aa2e92b973a86a4138b5c1ba3865038. --- server/scripts/testOcrDispatcher.ts | 29 +++-------------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/server/scripts/testOcrDispatcher.ts b/server/scripts/testOcrDispatcher.ts index 9bc3acae0..771473a8a 100644 --- a/server/scripts/testOcrDispatcher.ts +++ b/server/scripts/testOcrDispatcher.ts @@ -6,8 +6,7 @@ * It mocks the instance status endpoint and PDF processing to demonstrate * how batches are processed under various scenarios. * - * Usage: bun run server/scripts/testOcrDispatcher.ts - * Example: bun run server/scripts/testOcrDispatcher.ts ./sample.pdf + * Run with: bun run server/scripts/testOcrDispatcher.ts */ import { promises as fsPromises } from "fs" @@ -22,6 +21,7 @@ import { dispatchOCRBatches } from "../lib/chunkByOCR" // MOCK CONFIGURATION // ============================================================================ +const MOCK_PDF_PATH = "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" const PAGES_PER_BATCH = 100 const MAX_BATCHES_TO_TEST = 100 @@ -433,33 +433,10 @@ async function main() { โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• `) - // Parse command-line arguments - const args = process.argv.slice(2) - - if (args.length === 0) { - console.error(` -โŒ Error: PDF file path is required - -Usage: bun run server/scripts/testOcrDispatcher.ts -Example: bun run server/scripts/testOcrDispatcher.ts ./sample.pdf -`) - process.exit(1) - } - - const pdfPath = args[0] - - // Validate that the file exists - try { - await fsPromises.access(pdfPath) - } catch (error) { - console.error(`\nโŒ Error: PDF file not found at path: ${pdfPath}\n`) - process.exit(1) - } - try { // Create mock batches from the PDF const batches = await createMockBatches( - pdfPath, + MOCK_PDF_PATH, PAGES_PER_BATCH, MAX_BATCHES_TO_TEST, ) From ee84f74b81a71b6ff4e6a86c4908fa2ba9932893 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 14:11:01 +0530 Subject: [PATCH 6/7] Revert "feat: XYN-189 removed local testing function" This reverts commit fba575a396877bc55411c5074da6757388a330f7. --- server/lib/chunkByOCR.ts | 156 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 154 insertions(+), 2 deletions(-) diff --git a/server/lib/chunkByOCR.ts b/server/lib/chunkByOCR.ts index ab8d220f7..6be043d46 100644 --- a/server/lib/chunkByOCR.ts +++ b/server/lib/chunkByOCR.ts @@ -586,7 +586,7 @@ function createInFlightState( return state } -async function waitForAll( +async function waitForAtLeastOne( inFlight: InFlightState[], ): Promise { if (inFlight.length === 0) { @@ -759,7 +759,7 @@ async function runConcurrentDispatch( break } - const completedStates = await waitForAll(inFlight) + const completedStates = await waitForAtLeastOne(inFlight) if (completedStates.length > 0) { for (const state of completedStates) { if (state.result) { @@ -1076,6 +1076,37 @@ function normalizeBlockContent(block: OcrBlock): string { return content.replace(/\s+/g, " ").trim() } +function deriveImageFileName( + preferredName: string | undefined, + bboxKey: string | null | undefined, + buffer: Buffer, + imageIndex: number, + pageIndex: number, +): string { + const ext = detectImageExtension(buffer) + + if (preferredName) { + const sanitized = sanitizeFileName(preferredName) + const parsed = path.parse(sanitized) + + if (parsed.ext) { + const normalizedExt = parsed.ext.replace(/\./, "") + if (normalizedExt.toLowerCase() !== ext) { + return `${parsed.name || `image_${imageIndex}`}.${ext}` + } + return sanitized + } + + return `${parsed.name || `image_${imageIndex}`}.${ext}` + } + + if (bboxKey) { + return `img_in_image_box_${bboxKey}.${ext}` + } + + return `page_${pageIndex + 1}_image_${imageIndex}.${ext}` +} + async function callLayoutParsingApi( buffer: Buffer, fileName: string, @@ -1505,6 +1536,127 @@ export async function chunkByOCRFromBuffer( return chunkByOCR(docId, ocrResponse, images, imageMetadata) } +export async function demoPdfOcrDispatcher(): Promise { + const samplePdfPath = + "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" + + const pdfBuffer = await fsPromises.readFile(samplePdfPath) + const pdfDocument = await PDFDocument.load(pdfBuffer) + const pdfFileName = path.basename(samplePdfPath) + + const pageBatches = await splitPdfIntoBatches(pdfBuffer, 25, pdfDocument) + if (pageBatches.length === 0) { + console.warn("[demoPdfOcrDispatcher] No batches generated from sample PDF") + return + } + + const dispatchBatches: PdfOcrBatch[] = pageBatches.map((batch, index) => ({ + id: `demo-batch-${index + 1}`, + fileName: pdfFileName, + startPage: batch.startPage, + endPage: batch.endPage, + pdfBuffer: batch.buffer, + })) + + const sampleBatches = dispatchBatches.slice(0, Math.min(dispatchBatches.length, 6)) + + const scenarios: Array<{ label: string; idleSequence: number[] }> = [ + { label: "idle=5", idleSequence: Array(8).fill(5) }, + { label: "idle=8", idleSequence: Array(8).fill(8) }, + { label: "idle=1", idleSequence: Array(8).fill(1) }, + { label: "idle=0", idleSequence: Array(8).fill(0) }, + { label: "idle sequence 5โ†’8โ†’1โ†’0", idleSequence: [5, 8, 1, 0] }, + ] + + for (const scenario of scenarios) { + console.log(`\n[demoPdfOcrDispatcher] Scenario: ${scenario.label}`) + await runScenario(sampleBatches, scenario.idleSequence, scenario.label) + } +} + +async function runScenario( + batches: PdfOcrBatch[], + idleSequence: number[], + label: string, +): Promise { + let statusCallCount = 0 + const originalFetch = globalThis.fetch + + globalThis.fetch = async (input, init) => { + if (typeof input === "string" && input.includes("/instance_status")) { + const idle = + idleSequence[ + statusCallCount < idleSequence.length + ? statusCallCount + : idleSequence.length - 1 + ] ?? 0 + statusCallCount += 1 + + return new Response( + JSON.stringify({ + active_instances: Math.max(0, idle), + idle_instances: idle, + }), + { + status: 200, + headers: { "Content-Type": "application/json" }, + }, + ) + } + + throw new Error( + `[demoPdfOcrDispatcher] Unexpected fetch target: ${String(input)}`, + ) + } + + const logger: Pick = { + info(message?: unknown, meta?: unknown) { + console.log( + `[${label}] INFO ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + warn(message?: unknown, meta?: unknown) { + console.warn( + `[${label}] WARN ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + error(message?: unknown, meta?: unknown) { + console.error( + `[${label}] ERROR ${String(message)}${ + meta ? ` ${JSON.stringify(meta)}` : "" + }`, + ) + }, + } + + try { + const report = await dispatchOCRBatches(batches, { + logger, + thresholdForConcurrency: 1, + pollIntervalMs: 50, + sendBatch: async (batch, { timeoutMs }) => { + console.log( + `[${label}] Dispatching ${batch.id} pages ${batch.startPage}-${batch.endPage} (timeout ${timeoutMs}ms)`, + ) + await sleep(40) + console.log( + `[${label}] Completed ${batch.id} pages ${batch.startPage}-${batch.endPage}`, + ) + }, + }) + + console.log( + `[${label}] Report total=${report.total} succeeded=${report.succeeded} failed=${report.failed}`, + ) + } finally { + globalThis.fetch = originalFetch + } +} + export async function chunkByOCR( docId: string, ocrResponse: OcrResponse, From 6b60ae4a04daadf65b738d770bdba85433107516 Mon Sep 17 00:00:00 2001 From: Aayushjshah <2001aayushshah@gmail.com> Date: Thu, 16 Oct 2025 18:53:42 +0530 Subject: [PATCH 7/7] Revert "feat: XYN-189 remved local file paths" This reverts commit f80d6fab6aa2e92b973a86a4138b5c1ba3865038. --- server/lib/chunkByOCR.ts | 1159 +++------------------------ server/scripts/testOcrDispatcher.ts | 464 ----------- 2 files changed, 131 insertions(+), 1492 deletions(-) delete mode 100644 server/scripts/testOcrDispatcher.ts diff --git a/server/lib/chunkByOCR.ts b/server/lib/chunkByOCR.ts index 6be043d46..63409cea2 100644 --- a/server/lib/chunkByOCR.ts +++ b/server/lib/chunkByOCR.ts @@ -17,25 +17,7 @@ const DEFAULT_LAYOUT_PARSING_VISUALIZE = false const LAYOUT_PARSING_API_PATH = "/v2/models/layout-parsing/infer" const DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST = 100 const TEXT_CHUNK_OVERLAP_CHARS = 32 - -// Configuration constants -const LAYOUT_PARSING_BASE_URL = process.env.LAYOUT_PARSING_BASE_URL || DEFAULT_LAYOUT_PARSING_BASE_URL -const LAYOUT_PARSING_TIMEOUT_MS = Number.parseInt(process.env.LAYOUT_PARSING_TIMEOUT_MS ?? "300000", 10) -const OCR_MAX_CHUNK_BYTES = Number.parseInt(process.env.OCR_MAX_CHUNK_BYTES ?? "", 10) -const IMAGE_DIR = process.env.IMAGE_DIR || DEFAULT_IMAGE_DIR -const LAYOUT_PARSING_MAX_PAGES_PER_REQUEST = Number.parseInt(process.env.LAYOUT_PARSING_MAX_PAGES_PER_REQUEST ?? "", 10) - -const DEFAULT_STATUS_ENDPOINT = "http://localhost:8081/instance_status" -const DEFAULT_POLL_INTERVAL_MS = 300 -const DEFAULT_REQUEST_TIMEOUT_MS = 120_000 -const DEFAULT_MAX_RETRIES = 2 -const DEFAULT_THRESHOLD_FOR_CONCURRENCY = 1 -const STATUS_FETCH_TIMEOUT_MS = 2_000 -const STATUS_FETCH_MAX_RETRIES = 3 -const BACKOFF_BASE_MS = 500 -const BACKOFF_FACTOR = 2 -const BACKOFF_MAX_MS = 8_000 -const BACKOFF_JITTER_RATIO = 0.2 +const USE_SEQUENTIAL_BATCH_PROCESSING = true type LayoutParsingBlock = { block_label?: string @@ -107,778 +89,6 @@ type GlobalSeq = { value: number } -type PdfPageBatch = { - buffer: Buffer - startPage: number - endPage: number -} - -export interface PdfOcrBatch { - id: string - fileName: string - startPage: number - endPage: number - pdfBuffer: Buffer -} - -export interface DispatchOptions { - statusEndpoint?: string - pollIntervalMs?: number - requestTimeoutMs?: number - maxRetries?: number - thresholdForConcurrency?: number - signal?: AbortSignal - logger?: Pick - metrics?: { - incr(name: string, tags?: Record): void - observe(name: string, value: number, tags?: Record): void - } - sendBatch?: ( - batch: PdfOcrBatch, - options: SendPdfBatchOptions, - ) => Promise -} - -export interface DispatchReport { - total: number - succeeded: number - failed: number - startedAt: number - endedAt: number - perItem: Array<{ - id: string - status: "ok" | "failed" - attempts: number - latencyMs: number - error?: string - }> -} - -type BatchDispatchResult = DispatchReport["perItem"][number] - -type InstanceStatusPayload = { - active_instances?: unknown - configured_instances?: unknown - idle_instances?: unknown - last_updated?: unknown -} - - - -const noopLogger: Pick = { - info() {}, - warn() {}, - error() {}, -} - -const noopMetrics = { - incr() {}, - observe() {}, -} - -type SendPdfBatchOptions = { - timeoutMs: number -} - -// Placeholder implementation for integrating with the OCR service. -// This uses the same endpoint as callLayoutParsingApi -async function sendPdfOcrBatch( - batch: PdfOcrBatch, - { timeoutMs }: SendPdfBatchOptions, -): Promise { - const baseUrl = LAYOUT_PARSING_BASE_URL.replace(/\/+$/, '') - const apiUrl = baseUrl + '/' + LAYOUT_PARSING_API_PATH.replace(/^\/+/, '') - - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), timeoutMs) - - try { - const response = await fetch(apiUrl, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - // TODO: adjust payload to match OCR batch contract. - id: batch.id, - fileName: batch.fileName, - startPage: batch.startPage, - endPage: batch.endPage, - pdfBase64: batch.pdfBuffer.toString("base64"), - }), - signal: controller.signal, - }) - - if (!response.ok) { - const responseText = await response.text().catch(() => "") - throw new Error( - `OCR batch request failed (${response.status}): ${responseText.slice(0, 200)}`, - ) - } - - // TODO: handle the response payload if the OCR service returns useful data. - } catch (error) { - if ((error as Error).name === "AbortError") { - throw new Error("OCR batch request aborted due to timeout") - } - throw error - } finally { - clearTimeout(timer) - } -} - -function coerceStatusNumber(raw: unknown): number | undefined { - if (typeof raw === "number") { - if (Number.isFinite(raw)) { - return raw - } - return undefined - } - - if (typeof raw === "string") { - const trimmed = raw.trim() - if (trimmed.length === 0) { - return undefined - } - const parsed = Number(trimmed) - if (Number.isFinite(parsed)) { - return parsed - } - } - - return undefined -} - -function sanitizeIdleValue(raw: unknown): number { - const numericValue = coerceStatusNumber(raw) - if (numericValue === undefined) { - return 0 - } - if (numericValue <= 0) { - return 0 - } - return Math.floor(numericValue) -} - -function createAbortError(): Error { - const error = new Error("aborted") - error.name = "AbortError" - return error -} - -function isAbortError(error: unknown): boolean { - return ( - error instanceof Error && - (error.name === "AbortError" || error.message === "aborted") - ) -} - -function applyBackoffJitter(durationMs: number): number { - const jitter = - 1 + (Math.random() * 2 - 1) * Math.max(0, Math.min(1, BACKOFF_JITTER_RATIO)) - return Math.min(BACKOFF_MAX_MS, Math.round(durationMs * jitter)) -} - -async function sleep(ms: number, signal?: AbortSignal): Promise { - if (ms <= 0) { - if (signal?.aborted) { - throw createAbortError() - } - return - } - - if (signal?.aborted) { - throw createAbortError() - } - - await new Promise((resolve, reject) => { - let timer: ReturnType | null = null - const abortHandler = () => { - if (timer) { - clearTimeout(timer) - timer = null - } - if (signal) { - signal.removeEventListener("abort", abortHandler) - } - reject(createAbortError()) - } - - timer = setTimeout(() => { - if (signal) { - signal.removeEventListener("abort", abortHandler) - } - timer = null - resolve() - }, ms) - - if (signal) { - signal.addEventListener("abort", abortHandler) - } - }) -} - -async function fetchIdleInstances( - endpoint: string, - logger: Pick, - metrics: DispatchOptions["metrics"], -): Promise { - let attempt = 0 - let lastError: unknown - - while (attempt < STATUS_FETCH_MAX_RETRIES) { - attempt += 1 - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), STATUS_FETCH_TIMEOUT_MS) - - try { - const response = await fetch(endpoint, { - method: "GET", - signal: controller.signal, - }) - - clearTimeout(timer) - - if (!response.ok) { - throw new Error(`unexpected status ${response.status}`) - } - - const payload = (await response.json()) as InstanceStatusPayload - const idle = sanitizeIdleValue(payload.idle_instances) - const active = coerceStatusNumber(payload.active_instances) - const configured = coerceStatusNumber(payload.configured_instances) - const lastUpdated = coerceStatusNumber(payload.last_updated) - - metrics?.observe("ocr_dispatch.instances.idle", idle) - if (active !== undefined) { - metrics?.observe("ocr_dispatch.instances.active", active) - } - if (configured !== undefined) { - metrics?.observe("ocr_dispatch.instances.configured", configured) - } - if (lastUpdated !== undefined) { - metrics?.observe("ocr_dispatch.instances.last_updated", lastUpdated) - } - - // Success - return the idle count - if (attempt > 1) { - logger.info("Successfully fetched OCR instance status after retry", { - attempt, - idle, - endpoint, - }) - } - - return idle - } catch (error) { - lastError = error - clearTimeout(timer) - - const errorMessage = error instanceof Error ? error.message : String(error) - - if (attempt < STATUS_FETCH_MAX_RETRIES) { - const backoffMs = computeBackoffMs(attempt) - logger.warn( - `Failed to fetch OCR instance status (attempt ${attempt}/${STATUS_FETCH_MAX_RETRIES}), retrying in ${backoffMs}ms`, - { - error: errorMessage, - endpoint, - attempt, - backoffMs, - }, - ) - metrics?.incr("ocr_dispatch.status_fetch_retry") - - await sleep(backoffMs) - } else { - // Max retries exceeded - logger.error( - `Failed to fetch OCR instance status after ${STATUS_FETCH_MAX_RETRIES} attempts`, - { - error: errorMessage, - endpoint, - attempts: STATUS_FETCH_MAX_RETRIES, - }, - ) - metrics?.incr("ocr_dispatch.status_fetch_error") - - throw new Error( - `Instance status server unreachable after ${STATUS_FETCH_MAX_RETRIES} attempts: ${errorMessage}`, - ) - } - } - } - - // This should never be reached, but TypeScript needs it - const finalMessage = - lastError instanceof Error ? lastError.message : String(lastError ?? "unknown error") - throw new Error( - `Instance status server unreachable after ${STATUS_FETCH_MAX_RETRIES} attempts: ${finalMessage}`, - ) -} - -function computeBackoffMs(attempt: number): number { - const exponent = Math.max(0, attempt - 1) - const baseDelay = BACKOFF_BASE_MS * Math.pow(BACKOFF_FACTOR, exponent) - return applyBackoffJitter(baseDelay) -} - -async function processBatch( - batch: PdfOcrBatch, - options: { - requestTimeoutMs: number - maxRetries: number - logger: Pick - metrics: DispatchOptions["metrics"] - signal?: AbortSignal - sendBatch: ( - batch: PdfOcrBatch, - sendOptions: SendPdfBatchOptions, - ) => Promise - }, -): Promise { - const { - requestTimeoutMs, - maxRetries, - logger, - metrics = noopMetrics, - signal, - sendBatch, - } = options - - const totalAttemptsAllowed = Math.max(0, maxRetries) + 1 - let attempt = 0 - let lastError: unknown - - while (attempt < totalAttemptsAllowed) { - if (signal?.aborted) { - return { - id: batch.id, - status: "failed", - attempts: attempt, - latencyMs: 0, - error: "aborted", - } - } - - attempt += 1 - const attemptStartedAt = Date.now() - - try { - await sendBatch(batch, { timeoutMs: requestTimeoutMs }) - const latencyMs = Date.now() - attemptStartedAt - logger.info( - `[batch=${batch.id}] attempt=${attempt} latencyMs=${latencyMs} ok`, - ) - metrics.observe("ocr_dispatch.latency_ms", latencyMs, { - status: "ok", - }) - metrics.incr("ocr_dispatch.attempts", { status: "ok" }) - return { - id: batch.id, - status: "ok", - attempts: attempt, - latencyMs, - } - } catch (error) { - const latencyMs = Date.now() - attemptStartedAt - lastError = error - const errorMessage = error instanceof Error ? error.message : String(error) - - logger.warn( - `[batch=${batch.id}] attempt=${attempt} latencyMs=${latencyMs} error=${errorMessage}`, - ) - metrics.observe("ocr_dispatch.latency_ms", latencyMs, { - status: "failed", - }) - metrics.incr("ocr_dispatch.attempts", { status: "failed" }) - - if (attempt >= totalAttemptsAllowed) { - return { - id: batch.id, - status: "failed", - attempts: attempt, - latencyMs, - error: errorMessage, - } - } - - const backoffMs = computeBackoffMs(attempt) - logger.info( - `[batch=${batch.id}] attempt=${attempt} error=${errorMessage} backoffMs=${backoffMs}`, - ) - - try { - await sleep(backoffMs, signal) - } catch (abortError) { - if (isAbortError(abortError)) { - return { - id: batch.id, - status: "failed", - attempts: attempt, - latencyMs, - error: "aborted", - } - } - throw abortError - } - } - } - - const finalMessage = - lastError instanceof Error ? lastError.message : String(lastError ?? "") - return { - id: batch.id, - status: "failed", - attempts: totalAttemptsAllowed, - latencyMs: 0, - error: finalMessage || "failed", - } -} - -type InFlightState = { - batch: PdfOcrBatch - promise: Promise - done: boolean - result?: BatchDispatchResult -} - -function createInFlightState( - batch: PdfOcrBatch, - options: { - requestTimeoutMs: number - maxRetries: number - logger: Pick - metrics: DispatchOptions["metrics"] - signal?: AbortSignal - sendBatch: ( - batch: PdfOcrBatch, - sendOptions: SendPdfBatchOptions, - ) => Promise - }, -): InFlightState { - const state: InFlightState = { - batch, - promise: Promise.resolve({} as BatchDispatchResult), - done: false, - } - - state.promise = processBatch(batch, options) - .then((result) => { - state.done = true - state.result = result - return result - }) - .catch((error) => { - state.done = true - const errorMessage = error instanceof Error ? error.message : String(error) - const fallback: BatchDispatchResult = { - id: batch.id, - status: "failed", - attempts: 0, - latencyMs: 0, - error: errorMessage, - } - state.result = fallback - return fallback - }) - - return state -} - -async function waitForAtLeastOne( - inFlight: InFlightState[], -): Promise { - if (inFlight.length === 0) { - return [] - } - - await Promise.allSettled(inFlight.map((state) => state.promise)) - return inFlight -} - -function recordOutcome( - result: BatchDispatchResult, - perItem: DispatchReport["perItem"], - counters: { succeeded: number; failed: number }, -): void { - perItem.push(result) - if (result.status === "ok") { - counters.succeeded += 1 - } else { - counters.failed += 1 - } -} - -async function runSequentialDispatch( - batches: PdfOcrBatch[], - options: { - requestTimeoutMs: number - maxRetries: number - statusEndpoint: string - logger: Pick - metrics: DispatchOptions["metrics"] - signal?: AbortSignal - sendBatch: ( - batch: PdfOcrBatch, - sendOptions: SendPdfBatchOptions, - ) => Promise - }, - perItem: DispatchReport["perItem"], - counters: { succeeded: number; failed: number }, -): Promise { - const { logger, statusEndpoint, signal, sendBatch } = options - - const idle = await fetchIdleInstances(statusEndpoint, logger, options.metrics) - logger.info( - `[cycle=sequential] idle=${idle} remaining=${batches.length} inflight=0 dispatching=1`, - ) - - for (const batch of batches) { - if (signal?.aborted) { - recordOutcome( - { - id: batch.id, - status: "failed", - attempts: 0, - latencyMs: 0, - error: "aborted", - }, - perItem, - counters, - ) - continue - } - - const result = await processBatch(batch, { - ...options, - sendBatch, - }) - recordOutcome(result, perItem, counters) - } -} - -async function runConcurrentDispatch( - batches: PdfOcrBatch[], - options: { - requestTimeoutMs: number - maxRetries: number - statusEndpoint: string - pollIntervalMs: number - logger: Pick - metrics: DispatchOptions["metrics"] - signal?: AbortSignal - sendBatch: ( - batch: PdfOcrBatch, - sendOptions: SendPdfBatchOptions, - ) => Promise - }, - perItem: DispatchReport["perItem"], - counters: { succeeded: number; failed: number }, -): Promise { - const { - requestTimeoutMs, - maxRetries, - statusEndpoint, - pollIntervalMs, - logger, - metrics, - signal, - sendBatch, - } = options - - let inFlight: InFlightState[] = [] - const pendingQueue: PdfOcrBatch[] = [...batches] - let cycle = 0 - let aborted = signal?.aborted ?? false - - const onAbort = () => { - if (!aborted) { - aborted = true - logger.warn("Dispatch aborted by signal; draining in-flight tasks") - } - } - - signal?.addEventListener("abort", onAbort) - - try { - while (pendingQueue.length > 0 || inFlight.length > 0) { - cycle += 1 - - let idle = 0 - if (!aborted) { - idle = await fetchIdleInstances(statusEndpoint, logger, metrics) - } - - const remaining = pendingQueue.length - let toDispatch = 0 - if (!aborted && remaining > 0) { - toDispatch = idle > 0 ? Math.min(idle/2, remaining) : 1 - } - - logger.info( - `[cycle=${cycle}] idle=${idle} remaining=${remaining} inflight=${inFlight.length} dispatching=${toDispatch}${aborted ? " aborted=true" : ""}`, - ) - - for (let index = 0; index < toDispatch; index += 1) { - const nextBatch = pendingQueue.shift() - if (!nextBatch) { - break - } - const state = createInFlightState(nextBatch, { - requestTimeoutMs, - maxRetries, - logger, - metrics, - signal, - sendBatch, - }) - inFlight.push(state) - } - - if (inFlight.length === 0) { - if (aborted) { - while (pendingQueue.length > 0) { - const batch = pendingQueue.shift() - if (!batch) { - continue - } - recordOutcome( - { - id: batch.id, - status: "failed", - attempts: 0, - latencyMs: 0, - error: "aborted", - }, - perItem, - counters, - ) - } - } - break - } - - const completedStates = await waitForAtLeastOne(inFlight) - if (completedStates.length > 0) { - for (const state of completedStates) { - if (state.result) { - recordOutcome(state.result, perItem, counters) - } - } - inFlight = inFlight.filter((state) => !state.done) - } - - if (aborted) { - while (pendingQueue.length > 0) { - const batch = pendingQueue.shift() - if (!batch) { - continue - } - recordOutcome( - { - id: batch.id, - status: "failed", - attempts: 0, - latencyMs: 0, - error: "aborted", - }, - perItem, - counters, - ) - } - } - - if ( - (pendingQueue.length > 0 || inFlight.length > 0) && - !aborted && - pollIntervalMs > 0 - ) { - try { - await sleep(pollIntervalMs, signal) - } catch (error) { - if (isAbortError(error)) { - aborted = true - } else { - throw error - } - } - } - } - } finally { - signal?.removeEventListener("abort", onAbort) - } -} - -export async function dispatchOCRBatches( - batches: PdfOcrBatch[], - opts: DispatchOptions = {}, -): Promise { - const startedAt = Date.now() - const total = batches.length - - if (total === 0) { - return { - total: 0, - succeeded: 0, - failed: 0, - startedAt, - endedAt: startedAt, - perItem: [], - } - } - - const { - statusEndpoint = DEFAULT_STATUS_ENDPOINT, - pollIntervalMs = DEFAULT_POLL_INTERVAL_MS, - requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS, - maxRetries = DEFAULT_MAX_RETRIES, - thresholdForConcurrency = DEFAULT_THRESHOLD_FOR_CONCURRENCY, - signal, - logger: providedLogger, - metrics: providedMetrics, - sendBatch: providedSendBatch, - } = opts - - const logger = providedLogger ?? Logger ?? noopLogger - const metrics = providedMetrics ?? noopMetrics - const sendBatch = - providedSendBatch ?? - ((batch: PdfOcrBatch, options: SendPdfBatchOptions) => - sendPdfOcrBatch(batch, options)) - - const perItem: DispatchReport["perItem"] = [] - const counters = { succeeded: 0, failed: 0 } - - const dispatchOptions = { - requestTimeoutMs, - maxRetries, - statusEndpoint, - pollIntervalMs, - logger, - metrics, - signal, - sendBatch, - } - - if (total <= thresholdForConcurrency) { - await runSequentialDispatch(batches, dispatchOptions, perItem, counters) - } else { - await runConcurrentDispatch(batches, dispatchOptions, perItem, counters) - } - - const endedAt = Date.now() - - return { - total, - succeeded: counters.succeeded, - failed: counters.failed, - startedAt, - endedAt, - perItem, - } -} - function looksLikePdf(buffer: Buffer, fileName: string): boolean { if (fileName.toLowerCase().endsWith(".pdf")) { return true @@ -928,7 +138,10 @@ function trimChunkToByteLimit(content: string, byteLimit: number): string { let endIndex = content.length - while (endIndex > 0 && getByteLength(content.slice(0, endIndex)) > byteLimit) { + while ( + endIndex > 0 && + getByteLength(content.slice(0, endIndex)) > byteLimit + ) { endIndex -= 1 } @@ -1111,12 +324,17 @@ async function callLayoutParsingApi( buffer: Buffer, fileName: string, ): Promise { - const baseUrl = LAYOUT_PARSING_BASE_URL.replace(/\/+$/, '') - - const apiUrl = baseUrl + '/' + LAYOUT_PARSING_API_PATH.replace(/^\/+/, '') + const baseUrl = ( + process.env.LAYOUT_PARSING_BASE_URL || DEFAULT_LAYOUT_PARSING_BASE_URL + ).replace(/\/+$/, "") + + const apiUrl = baseUrl + "/" + LAYOUT_PARSING_API_PATH.replace(/^\/+/, "") const fileType = DEFAULT_LAYOUT_PARSING_FILE_TYPE const visualize = DEFAULT_LAYOUT_PARSING_VISUALIZE - const timeoutMs = LAYOUT_PARSING_TIMEOUT_MS + const timeoutMs = Number.parseInt( + process.env.LAYOUT_PARSING_TIMEOUT_MS ?? "300000", + 10, + ) Logger.info("Calling layout parsing API", { apiUrl, @@ -1193,17 +411,25 @@ async function callLayoutParsingApi( return result } catch (error) { // Log the layout parsing API failure with context - Logger.error(error, `Layout parsing API call failed for file: ${fileName}`, { - fileName, - fileSize: buffer.length, - apiUrl, - }) - + Logger.error( + error, + `Layout parsing API call failed for file: ${fileName}`, + { + fileName, + fileSize: buffer.length, + apiUrl, + }, + ) + // Re-throw with enhanced error message for better debugging if (error instanceof Error) { - throw new Error(`Layout parsing API failed for "${fileName}": ${error.message}`) + throw new Error( + `Layout parsing API failed for "${fileName}": ${error.message}`, + ) } else { - throw new Error(`Layout parsing API failed for "${fileName}": Unknown error occurred`) + throw new Error( + `Layout parsing API failed for "${fileName}": Unknown error occurred`, + ) } } finally { if (timer) { @@ -1291,18 +517,12 @@ async function splitPdfIntoBatches( buffer: Buffer, maxPagesPerBatch: number = DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST, preloadedPdf?: PDFDocument, -): Promise { +): Promise { const sourcePdf = preloadedPdf ?? (await PDFDocument.load(buffer)) const totalPages = sourcePdf.getPageCount() if (totalPages <= maxPagesPerBatch) { - return [ - { - buffer, - startPage: 1, - endPage: totalPages, - }, - ] + return [buffer] } Logger.info("Splitting large PDF into batches", { @@ -1311,13 +531,15 @@ async function splitPdfIntoBatches( estimatedBatches: Math.ceil(totalPages / maxPagesPerBatch), }) - const batches: PdfPageBatch[] = [] + const batches: Buffer[] = [] - for (let startPage = 0; startPage < totalPages; startPage += maxPagesPerBatch) { + for ( + let startPage = 0; + startPage < totalPages; + startPage += maxPagesPerBatch + ) { const endPage = Math.min(startPage + maxPagesPerBatch, totalPages) const pageCount = endPage - startPage - const startPageNumber = startPage + 1 - const endPageNumber = endPage // Create new PDF with subset of pages const newPdf = await PDFDocument.create() @@ -1332,16 +554,12 @@ async function splitPdfIntoBatches( } const pdfBytes = await newPdf.save() - batches.push({ - buffer: Buffer.from(pdfBytes), - startPage: startPageNumber, - endPage: endPageNumber, - }) + batches.push(Buffer.from(pdfBytes)) Logger.info("Created PDF batch", { batchIndex: batches.length, - startPage: startPageNumber, - endPage: endPageNumber, + startPage: startPage + 1, + endPage, pagesInBatch: pageCount, batchSizeBytes: pdfBytes.length, }) @@ -1361,7 +579,7 @@ function mergeLayoutParsingResults( // Adjust page indices to maintain correct ordering across batches const adjustedResults = layoutResults.map((layout, localPageIndex) => ({ ...layout, - // We don't need to modify the layout structure itself since + // We don't need to modify the layout structure itself since // transformLayoutParsingResults handles page indexing correctly })) @@ -1374,85 +592,75 @@ function mergeLayoutParsingResults( } } -async function processPdfBatchesWithDispatcher( - batches: PdfPageBatch[], +async function processBatchesConcurrently( + batches: Buffer[], fileName: string, ): Promise { - if (batches.length === 0) { - return [] - } - - if (batches.length === 1) { - const singleBatch = batches[0] - Logger.info("Processing single PDF batch via OCR pipeline", { - fileName, - startPage: singleBatch.startPage, - endPage: singleBatch.endPage, - pagesInBatch: singleBatch.endPage - singleBatch.startPage + 1, - }) - - const result = await callLayoutParsingApi( - singleBatch.buffer, - `${fileName}_pages_${singleBatch.startPage}-${singleBatch.endPage}`, - ) - - Logger.info("Completed single PDF batch", { - fileName, - layoutResultsCount: result.layoutParsingResults?.length ?? 0, - }) - - return [result] - } - - Logger.info("Dispatching PDF batches via OCR dispatcher", { + Logger.info("Processing PDF batches concurrently", { fileName, totalBatches: batches.length, }) - const resultsByBatchId = new Map() - const dispatchBatches: PdfOcrBatch[] = batches.map((batch, index) => ({ - id: `${fileName}_batch_${index + 1}`, - fileName, - startPage: batch.startPage, - endPage: batch.endPage, - pdfBuffer: batch.buffer, - })) - - const dispatchOptions: DispatchOptions = { - logger: Logger, - sendBatch: async (batch) => { - const label = `${batch.fileName}_pages_${batch.startPage}-${batch.endPage}` - Logger.info("Sending PDF batch to layout parsing API", { - batchId: batch.id, - fileName: batch.fileName, - startPage: batch.startPage, - endPage: batch.endPage, - pagesInBatch: batch.endPage - batch.startPage + 1, + return Promise.all( + batches.map(async (batch, index) => { + const batchIndex = index + 1 + Logger.info("Processing PDF batch", { + fileName, + batchIndex, + totalBatches: batches.length, + batchSizeBytes: batch.length, }) - const result = await callLayoutParsingApi(batch.pdfBuffer, label) - resultsByBatchId.set(batch.id, result) + const batchResult = await callLayoutParsingApi( + batch, + `${fileName}_batch_${batchIndex}`, + ) Logger.info("Completed PDF batch", { - batchId: batch.id, - layoutResultsCount: result.layoutParsingResults?.length ?? 0, + fileName, + batchIndex, + layoutResultsCount: batchResult.layoutParsingResults?.length ?? 0, }) - }, - } + return batchResult + }), + ) +} - const report = await dispatchOCRBatches(dispatchBatches, dispatchOptions) +async function processBatchesSequentially( + batches: Buffer[], + fileName: string, +): Promise { + Logger.info("Processing PDF batches sequentially", { + fileName, + totalBatches: batches.length, + }) + + const batchResults: LayoutParsingApiPayload[] = [] + + for (let index = 0; index < batches.length; index++) { + const batch = batches[index] + const batchIndex = index + 1 + + Logger.info("Processing PDF batch sequentially", { + fileName, + batchIndex, + totalBatches: batches.length, + batchSizeBytes: batch.length, + }) - if (report.failed > 0) { - throw new Error( - `Failed to process ${report.failed} PDF batch(es) via OCR dispatcher`, + const batchResult = await callLayoutParsingApi( + batch, + `${fileName}_batch_${batchIndex}`, ) + + Logger.info("Completed PDF batch sequentially", { + fileName, + batchIndex, + layoutResultsCount: batchResult.layoutParsingResults?.length ?? 0, + }) + + batchResults.push(batchResult) } - return dispatchBatches.map((batch) => { - const batchResult = resultsByBatchId.get(batch.id) - if (!batchResult) { - throw new Error(`Missing OCR result for batch ${batch.id}`) - } - return batchResult - }) + return batchResults } export async function chunkByOCRFromBuffer( @@ -1460,9 +668,13 @@ export async function chunkByOCRFromBuffer( fileName: string, docId: string, ): Promise { + const maxPagesEnv = Number.parseInt( + process.env.LAYOUT_PARSING_MAX_PAGES_PER_REQUEST ?? "", + 10, + ) const maxPagesPerRequest = - Number.isFinite(LAYOUT_PARSING_MAX_PAGES_PER_REQUEST) && LAYOUT_PARSING_MAX_PAGES_PER_REQUEST > 0 - ? LAYOUT_PARSING_MAX_PAGES_PER_REQUEST + Number.isFinite(maxPagesEnv) && maxPagesEnv > 0 + ? maxPagesEnv : DEFAULT_MAX_PAGES_PER_LAYOUT_REQUEST let finalApiResult: LayoutParsingApiPayload @@ -1488,27 +700,31 @@ export async function chunkByOCRFromBuffer( fileName, totalPages, batches: batches.length, + processingMode: USE_SEQUENTIAL_BATCH_PROCESSING + ? "sequential" + : "concurrent", }) - const batchResults = await processPdfBatchesWithDispatcher( - batches, - fileName, - ) + const batchResults = USE_SEQUENTIAL_BATCH_PROCESSING + ? await processBatchesSequentially(batches, fileName) + : await processBatchesConcurrently(batches, fileName) finalApiResult = mergeLayoutParsingResults(batchResults) Logger.info("Merged batch results", { totalBatches: batches.length, - layoutResultsCount: - finalApiResult.layoutParsingResults?.length || 0, + layoutResultsCount: finalApiResult.layoutParsingResults?.length || 0, }) } else { finalApiResult = await callLayoutParsingApi(buffer, fileName) } } catch (error) { - Logger.warn("Failed to analyze PDF for batching, processing as single file", { - fileName, - error: (error as Error).message, - }) + Logger.warn( + "Failed to analyze PDF for batching, processing as single file", + { + fileName, + error: (error as Error).message, + }, + ) finalApiResult = await callLayoutParsingApi(buffer, fileName) } } else { @@ -1536,127 +752,6 @@ export async function chunkByOCRFromBuffer( return chunkByOCR(docId, ocrResponse, images, imageMetadata) } -export async function demoPdfOcrDispatcher(): Promise { - const samplePdfPath = - "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" - - const pdfBuffer = await fsPromises.readFile(samplePdfPath) - const pdfDocument = await PDFDocument.load(pdfBuffer) - const pdfFileName = path.basename(samplePdfPath) - - const pageBatches = await splitPdfIntoBatches(pdfBuffer, 25, pdfDocument) - if (pageBatches.length === 0) { - console.warn("[demoPdfOcrDispatcher] No batches generated from sample PDF") - return - } - - const dispatchBatches: PdfOcrBatch[] = pageBatches.map((batch, index) => ({ - id: `demo-batch-${index + 1}`, - fileName: pdfFileName, - startPage: batch.startPage, - endPage: batch.endPage, - pdfBuffer: batch.buffer, - })) - - const sampleBatches = dispatchBatches.slice(0, Math.min(dispatchBatches.length, 6)) - - const scenarios: Array<{ label: string; idleSequence: number[] }> = [ - { label: "idle=5", idleSequence: Array(8).fill(5) }, - { label: "idle=8", idleSequence: Array(8).fill(8) }, - { label: "idle=1", idleSequence: Array(8).fill(1) }, - { label: "idle=0", idleSequence: Array(8).fill(0) }, - { label: "idle sequence 5โ†’8โ†’1โ†’0", idleSequence: [5, 8, 1, 0] }, - ] - - for (const scenario of scenarios) { - console.log(`\n[demoPdfOcrDispatcher] Scenario: ${scenario.label}`) - await runScenario(sampleBatches, scenario.idleSequence, scenario.label) - } -} - -async function runScenario( - batches: PdfOcrBatch[], - idleSequence: number[], - label: string, -): Promise { - let statusCallCount = 0 - const originalFetch = globalThis.fetch - - globalThis.fetch = async (input, init) => { - if (typeof input === "string" && input.includes("/instance_status")) { - const idle = - idleSequence[ - statusCallCount < idleSequence.length - ? statusCallCount - : idleSequence.length - 1 - ] ?? 0 - statusCallCount += 1 - - return new Response( - JSON.stringify({ - active_instances: Math.max(0, idle), - idle_instances: idle, - }), - { - status: 200, - headers: { "Content-Type": "application/json" }, - }, - ) - } - - throw new Error( - `[demoPdfOcrDispatcher] Unexpected fetch target: ${String(input)}`, - ) - } - - const logger: Pick = { - info(message?: unknown, meta?: unknown) { - console.log( - `[${label}] INFO ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - warn(message?: unknown, meta?: unknown) { - console.warn( - `[${label}] WARN ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - error(message?: unknown, meta?: unknown) { - console.error( - `[${label}] ERROR ${String(message)}${ - meta ? ` ${JSON.stringify(meta)}` : "" - }`, - ) - }, - } - - try { - const report = await dispatchOCRBatches(batches, { - logger, - thresholdForConcurrency: 1, - pollIntervalMs: 50, - sendBatch: async (batch, { timeoutMs }) => { - console.log( - `[${label}] Dispatching ${batch.id} pages ${batch.startPage}-${batch.endPage} (timeout ${timeoutMs}ms)`, - ) - await sleep(40) - console.log( - `[${label}] Completed ${batch.id} pages ${batch.startPage}-${batch.endPage}`, - ) - }, - }) - - console.log( - `[${label}] Report total=${report.total} succeeded=${report.succeeded} failed=${report.failed}`, - ) - } finally { - globalThis.fetch = originalFetch - } -} - export async function chunkByOCR( docId: string, ocrResponse: OcrResponse, @@ -1669,9 +764,13 @@ export async function chunkByOCR( const image_chunks_map: ChunkMetadata[] = [] const globalSeq: GlobalSeq = { value: 0 } + const maxChunkBytes = Number.parseInt( + process.env.OCR_MAX_CHUNK_BYTES ?? "", + 1024, + ) const chunkSizeLimit = - Number.isFinite(OCR_MAX_CHUNK_BYTES) && OCR_MAX_CHUNK_BYTES > 0 - ? OCR_MAX_CHUNK_BYTES + Number.isFinite(maxChunkBytes) && maxChunkBytes > 0 + ? maxChunkBytes : DEFAULT_MAX_CHUNK_BYTES let currentTextBuffer = "" @@ -1679,7 +778,7 @@ export async function chunkByOCR( let currentPageNumbers: Set = new Set() let lastTextChunk: string | null = null - const imageBaseDir = path.resolve(IMAGE_DIR) + const imageBaseDir = path.resolve(process.env.IMAGE_DIR || DEFAULT_IMAGE_DIR) const docImageDir = path.join(imageBaseDir, docId) await fsPromises.mkdir(docImageDir, { recursive: true }) const savedImages = new Set() @@ -1707,14 +806,18 @@ export async function chunkByOCR( const overlap = lastTextChunk.slice(-TEXT_CHUNK_OVERLAP_CHARS) if (overlap && !chunkContent.startsWith(overlap)) { const needsSeparator = - !/\s$/.test(overlap) && chunkContent.length > 0 && !/^\s/.test(chunkContent) + !/\s$/.test(overlap) && + chunkContent.length > 0 && + !/^\s/.test(chunkContent) chunkContent = `${overlap}${needsSeparator ? " " : ""}${chunkContent}` } } chunkContent = trimChunkToByteLimit(chunkContent, chunkSizeLimit) - const pageNumbersArray = Array.from(currentPageNumbers).sort((a, b) => a - b) + const pageNumbersArray = Array.from(currentPageNumbers).sort( + (a, b) => a - b, + ) const blockLabelsArray = Array.from(new Set(currentBlockLabels)) chunks.push(chunkContent) @@ -1787,7 +890,7 @@ export async function chunkByOCR( } const extension = detectImageExtension(imageBuffer) - const fileName = globalSeq.value.toString() + "." + extension + const fileName = String(globalSeq.value) + "." + extension const uniqueFileName = ensureUniqueFileName(fileName, usedFileNames) const imagePath = path.join(docImageDir, uniqueFileName) diff --git a/server/scripts/testOcrDispatcher.ts b/server/scripts/testOcrDispatcher.ts deleted file mode 100644 index 771473a8a..000000000 --- a/server/scripts/testOcrDispatcher.ts +++ /dev/null @@ -1,464 +0,0 @@ -#!/usr/bin/env bun -/** - * Test file for OCR Dispatcher Logic - * - * This script tests the dispatch logic implementation in chunkByOCR.ts - * It mocks the instance status endpoint and PDF processing to demonstrate - * how batches are processed under various scenarios. - * - * Run with: bun run server/scripts/testOcrDispatcher.ts - */ - -import { promises as fsPromises } from "fs" -import * as path from "path" -import { PDFDocument } from "pdf-lib" -import type { PdfOcrBatch, DispatchOptions, DispatchReport } from "../lib/chunkByOCR" - -// Import the dispatcher function -import { dispatchOCRBatches } from "../lib/chunkByOCR" - -// ============================================================================ -// MOCK CONFIGURATION -// ============================================================================ - -const MOCK_PDF_PATH = "/Users/aayush.shah/Downloads/Tolkien-J.-The-lord-of-the-rings-HarperCollins-ebooks-2010.pdf" -const PAGES_PER_BATCH = 100 -const MAX_BATCHES_TO_TEST = 100 - -// ============================================================================ -// HELPER FUNCTIONS -// ============================================================================ - -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)) -} - -async function createMockBatches( - pdfPath: string, - pagesPerBatch: number, - maxBatches: number, -): Promise { - console.log(`\n๐Ÿ“„ Loading PDF: ${path.basename(pdfPath)}`) - - const pdfBuffer = await fsPromises.readFile(pdfPath) - const pdfDocument = await PDFDocument.load(pdfBuffer) - const totalPages = pdfDocument.getPageCount() - const pdfFileName = path.basename(pdfPath) - - console.log(` Total pages: ${totalPages}`) - console.log(` Pages per batch: ${pagesPerBatch}`) - - const batches: PdfOcrBatch[] = [] - - for (let startPage = 0; startPage < totalPages && batches.length < maxBatches; startPage += pagesPerBatch) { - const endPage = Math.min(startPage + pagesPerBatch, totalPages) - const pageCount = endPage - startPage - - // Create new PDF with subset of pages - const newPdf = await PDFDocument.create() - const pageIndices: number[] = [] - for (let i = 0; i < pageCount; i++) { - pageIndices.push(startPage + i) - } - - const copiedPages = await newPdf.copyPages(pdfDocument, pageIndices) - for (const page of copiedPages) { - newPdf.addPage(page) - } - - const pdfBytes = await newPdf.save() - - batches.push({ - id: `batch-${batches.length + 1}`, - fileName: pdfFileName, - startPage: startPage + 1, - endPage: endPage, - pdfBuffer: Buffer.from(pdfBytes), - }) - } - - console.log(` Created ${batches.length} batches\n`) - return batches -} - -// ============================================================================ -// MOCK INSTANCE STATUS ENDPOINT -// ============================================================================ - -type MockStatusConfig = { - idleSequence: number[] - label: string - serverDown?: boolean -} - -function createMockFetch(config: MockStatusConfig) { - let statusCallCount = 0 - const originalFetch = globalThis.fetch - - return { - install: () => { - globalThis.fetch = async (input, init) => { - if (typeof input === "string" && input.includes("/instance_status")) { - statusCallCount += 1 - - // Simulate server being unreachable - if (config.serverDown) { - throw new Error("ECONNREFUSED: Connection refused - instance status server is down") - } - - const idle = - config.idleSequence[ - statusCallCount < config.idleSequence.length - ? statusCallCount - 1 - : config.idleSequence.length - 1 - ] ?? 0 - - return new Response( - JSON.stringify({ - active_instances: Math.max(0, 10 - idle), - configured_instances: 10, - idle_instances: idle, - last_updated: Date.now(), - }), - { - status: 200, - headers: { "Content-Type": "application/json" }, - }, - ) - } - - throw new Error( - `[testOcrDispatcher] Unexpected fetch target: ${String(input)}`, - ) - } - }, - restore: () => { - globalThis.fetch = originalFetch - }, - getCallCount: () => statusCallCount, - } -} - -// ============================================================================ -// MOCK BATCH PROCESSOR -// ============================================================================ - -type BatchProcessingStats = { - totalDispatched: number - totalCompleted: number - totalFailed: number - batchTimings: Array<{ - id: string - startTime: number - endTime: number - duration: number - }> -} - -function createMockBatchProcessor() { - const stats: BatchProcessingStats = { - totalDispatched: 0, - totalCompleted: 0, - totalFailed: 0, - batchTimings: [], - } - - const sendBatch = async ( - batch: PdfOcrBatch, - options: { timeoutMs: number }, - ): Promise => { - stats.totalDispatched += 1 - const startTime = Date.now() - - console.log( - ` ๐Ÿš€ Dispatching ${batch.id} (pages ${batch.startPage}-${batch.endPage}, timeout ${options.timeoutMs}ms)`, - ) - - // Simulate processing time (30-80ms) - const processingTime = 30 + Math.random() * 50 - await sleep(processingTime) - - // Simulate occasional failures (5% chance) - if (Math.random() < 0.05) { - stats.totalFailed += 1 - throw new Error(`Simulated processing failure for ${batch.id}`) - } - - const endTime = Date.now() - stats.totalCompleted += 1 - stats.batchTimings.push({ - id: batch.id, - startTime, - endTime, - duration: endTime - startTime, - }) - - console.log( - ` โœ… Completed ${batch.id} (${Math.round(processingTime)}ms)`, - ) - } - - return { sendBatch, stats } -} - -// ============================================================================ -// CUSTOM LOGGER -// ============================================================================ - -function createTestLogger(scenarioLabel: string) { - return { - info(message?: unknown, meta?: unknown) { - const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" - console.log(` [${scenarioLabel}] โ„น๏ธ ${String(message)}${metaStr}`) - }, - warn(message?: unknown, meta?: unknown) { - const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" - console.warn(` [${scenarioLabel}] โš ๏ธ ${String(message)}${metaStr}`) - }, - error(message?: unknown, meta?: unknown) { - const metaStr = meta ? ` ${JSON.stringify(meta)}` : "" - console.error(` [${scenarioLabel}] โŒ ${String(message)}${metaStr}`) - }, - } -} - -// ============================================================================ -// METRICS COLLECTOR -// ============================================================================ - -type MetricsData = { - counters: Record - observations: Array<{ - name: string - value: number - tags?: Record - }> -} - -function createMetricsCollector() { - const data: MetricsData = { - counters: {}, - observations: [], - } - - return { - incr(name: string, tags?: Record) { - const key = tags ? `${name}:${JSON.stringify(tags)}` : name - data.counters[key] = (data.counters[key] || 0) + 1 - }, - observe(name: string, value: number, tags?: Record) { - data.observations.push({ name, value, tags }) - }, - getData: () => data, - } -} - -// ============================================================================ -// TEST SCENARIOS -// ============================================================================ - -type TestScenario = { - name: string - description: string - idleSequence: number[] - thresholdForConcurrency: number - pollIntervalMs: number - maxRetries: number -} - -const TEST_SCENARIOS: TestScenario[] = [ - { - name: "Sequential Processing (Low Batch Count)", - description: "Tests sequential dispatch when batch count is below threshold", - idleSequence: [5], - thresholdForConcurrency: 100, - pollIntervalMs: 50, - maxRetries: 2, - }, - { - name: "Concurrent - High Idle Capacity", - description: "Tests concurrent dispatch with consistently high idle instances", - idleSequence: [8, 8, 8, 8, 8], - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 2, - }, - { - name: "Concurrent - Low Idle Capacity", - description: "Tests concurrent dispatch with low idle instances", - idleSequence: [1, 1, 1, 1, 1], - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 2, - }, - { - name: "Concurrent - No Idle Capacity", - description: "Tests behavior when no idle instances are available", - idleSequence: [0, 0, 0, 1, 2], - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 2, - }, - { - name: "Concurrent - Fluctuating Capacity", - description: "Tests adaptive dispatch with varying idle instances", - idleSequence: [5, 8, 3, 1, 0, 2, 6, 4], - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 2, - }, - { - name: "Retry Logic Test", - description: "Tests retry behavior with higher failure rate", - idleSequence: [5, 5, 5], - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 3, - }, - { - name: "Instance Server Down", - description: "Tests behavior when instance status server is unreachable", - idleSequence: [], // Not used - server will be unreachable - thresholdForConcurrency: 1, - pollIntervalMs: 50, - maxRetries: 2, - }, -] - -// ============================================================================ -// RUN SCENARIO -// ============================================================================ - -async function runScenario( - scenario: TestScenario, - batches: PdfOcrBatch[], -): Promise { - console.log(`\n${"=".repeat(80)}`) - console.log(`๐Ÿงช TEST: ${scenario.name}`) - console.log(` ${scenario.description}`) - console.log(` Batches: ${batches.length}`) - console.log(` Idle sequence: [${scenario.idleSequence.join(", ")}]`) - console.log(` Threshold: ${scenario.thresholdForConcurrency}`) - console.log(`${"=".repeat(80)}\n`) - - // Check if this is the "Instance Server Down" scenario - const isServerDownScenario = scenario.name === "Instance Server Down" - - const mockFetch = createMockFetch({ - idleSequence: scenario.idleSequence, - label: scenario.name, - serverDown: isServerDownScenario, - }) - const mockProcessor = createMockBatchProcessor() - const logger = createTestLogger(scenario.name) - const metrics = createMetricsCollector() - - mockFetch.install() - - try { - const startTime = Date.now() - - const options: DispatchOptions = { - logger, - metrics, - sendBatch: mockProcessor.sendBatch, - thresholdForConcurrency: scenario.thresholdForConcurrency, - pollIntervalMs: scenario.pollIntervalMs, - maxRetries: scenario.maxRetries, - } - - const report: DispatchReport = await dispatchOCRBatches(batches, options) - - const endTime = Date.now() - const totalDuration = endTime - startTime - - // Print results - console.log(`\n๐Ÿ“Š RESULTS:`) - console.log(` Total batches: ${report.total}`) - console.log(` Succeeded: ${report.succeeded} โœ…`) - console.log(` Failed: ${report.failed} โŒ`) - console.log(` Total duration: ${totalDuration}ms`) - console.log(` Status endpoint calls: ${mockFetch.getCallCount()}`) - - // Print per-batch details - console.log(`\n๐Ÿ“‹ Per-Batch Details:`) - for (const item of report.perItem) { - const status = item.status === "ok" ? "โœ…" : "โŒ" - const error = item.error ? ` (${item.error})` : "" - console.log( - ` ${status} ${item.id}: ${item.attempts} attempt(s), ${item.latencyMs}ms${error}`, - ) - } - - // Print metrics summary - const metricsData = metrics.getData() - console.log(`\n๐Ÿ“ˆ Metrics:`) - console.log(` Counters:`, metricsData.counters) - - const latencies = metricsData.observations - .filter((o) => o.name === "ocr_dispatch.latency_ms") - .map((o) => o.value) - - if (latencies.length > 0) { - const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length - const minLatency = Math.min(...latencies) - const maxLatency = Math.max(...latencies) - console.log(` Latency: avg=${Math.round(avgLatency)}ms, min=${Math.round(minLatency)}ms, max=${Math.round(maxLatency)}ms`) - } - - // Print processing stats - console.log(`\n๐Ÿ”ง Processing Stats:`) - console.log(` Total dispatched: ${mockProcessor.stats.totalDispatched}`) - console.log(` Total completed: ${mockProcessor.stats.totalCompleted}`) - console.log(` Total failed: ${mockProcessor.stats.totalFailed}`) - - } catch (error) { - console.error(`\nโŒ Scenario failed with error:`, error) - } finally { - mockFetch.restore() - } -} - -// ============================================================================ -// MAIN EXECUTION -// ============================================================================ - -async function main() { - console.log(` -โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— -โ•‘ OCR DISPATCHER TEST SUITE โ•‘ -โ•‘ โ•‘ -โ•‘ This test suite verifies the dispatch logic implementation in โ•‘ -โ•‘ chunkByOCR.ts by running various scenarios with mocked dependencies. โ•‘ -โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• -`) - - try { - // Create mock batches from the PDF - const batches = await createMockBatches( - MOCK_PDF_PATH, - PAGES_PER_BATCH, - MAX_BATCHES_TO_TEST, - ) - - // Run all test scenarios - for (const scenario of TEST_SCENARIOS) { - await runScenario(scenario, batches) // Use first 6 batches for testing - await sleep(100) // Small delay between scenarios - } - - console.log(`\n${"=".repeat(80)}`) - console.log(`โœ… All test scenarios completed successfully!`) - console.log(`${"=".repeat(80)}\n`) - - } catch (error) { - console.error(`\nโŒ Test suite failed:`, error) - process.exit(1) - } -} - -// Run the test suite -main().catch((error) => { - console.error("Fatal error:", error) - process.exit(1) -})