Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 70 additions & 40 deletions server/api/knowledgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ import type { CollectionItem, File as DbFile } from "@/db/schema"
import { collectionItems, collections } from "@/db/schema"
import { and, eq, isNull, sql } from "drizzle-orm"
import { DeleteDocument, GetDocument } from "@/search/vespa"
import { KbItemsSchema } from "@xyne/vespa-ts/types"
import { ChunkMetadata, KbItemsSchema } from "@xyne/vespa-ts/types"
import { boss, FileProcessingQueue } from "@/queue/api-server-queue"
import crypto from "crypto"
import * as crypto from "crypto"
import {
DATASOURCE_CONFIG,
getBaseMimeType,
} from "@/integrations/dataSource/config"
import { getAuth, safeGet } from "./agent"
import { ApiKeyScopes, UploadStatus } from "@/shared/types"
import { FileProcessorService } from "@/services/fileProcessor"

const loggerWithChild = getLoggerWithChild(Subsystem.Api, {
module: "knowledgeBaseService",
Expand Down Expand Up @@ -111,7 +112,7 @@ function getStoragePath(
collectionId: string,
storageKey: string,
fileName: string,
): string {
): string {
const date = new Date()
const year = date.getFullYear()
const month = String(date.getMonth() + 1).padStart(2, "0")
Expand Down Expand Up @@ -189,13 +190,17 @@ export const CreateCollectionApi = async (c: Context) => {
})

// Queue after transaction commits to avoid race condition
await boss.send(FileProcessingQueue, {
collectionId: collection.id,
type: ProcessingJobType.COLLECTION
}, {
retryLimit: 3,
expireInHours: 12
})
await boss.send(
FileProcessingQueue,
{
collectionId: collection.id,
type: ProcessingJobType.COLLECTION,
},
{
retryLimit: 3,
expireInHours: 12,
},
)
loggerWithChild({ email: userEmail }).info(
`Created Collection: ${collection.id} for user ${userEmail}`,
)
Expand Down Expand Up @@ -678,13 +683,17 @@ export const CreateFolderApi = async (c: Context) => {
})

// Queue after transaction commits to avoid race condition
await boss.send(FileProcessingQueue, {
folderId: folder.id,
type: ProcessingJobType.FOLDER
}, {
retryLimit: 3,
expireInHours: 12
})
await boss.send(
FileProcessingQueue,
{
folderId: folder.id,
type: ProcessingJobType.FOLDER,
},
{
retryLimit: 3,
expireInHours: 12,
},
)

loggerWithChild({ email: userEmail }).info(
`Created folder: ${folder.id} in Collection: ${collectionId} with Vespa doc: ${folder.vespaDocId}`,
Expand Down Expand Up @@ -751,11 +760,11 @@ const uploadSessions = new Map<
// Clean up old sessions (older than 1 hour)
setInterval(() => {
const now = Date.now()
for (const [sessionId, session] of uploadSessions.entries()) {
uploadSessions.forEach((session, sessionId) => {
if (now - session.createdAt > 3600000) {
uploadSessions.delete(sessionId)
}
}
})
}, 300000) // Run every 5 minutes

// Helper function to ensure folder exists or create it
Expand Down Expand Up @@ -824,13 +833,17 @@ async function ensureFolderPath(
})

// Queue after transaction commits to avoid race condition
await boss.send(FileProcessingQueue, {
folderId: newFolder.id,
type: ProcessingJobType.FOLDER
}, {
retryLimit: 3,
expireInHours: 12
})
await boss.send(
FileProcessingQueue,
{
folderId: newFolder.id,
type: ProcessingJobType.FOLDER,
},
{
retryLimit: 3,
expireInHours: 12,
},
)

currentFolderId = newFolder.id

Expand Down Expand Up @@ -1022,7 +1035,6 @@ export const UploadFilesApi = async (c: Context) => {
// Parse the file path to extract folder structure
const pathParts = filePath.split("/").filter((part) => part.length > 0)
const originalFileName = pathParts.pop() || file.name // Get the actual filename


// Skip if the filename is a system file (in case it comes from path)
if (
Expand Down Expand Up @@ -1193,15 +1205,19 @@ export const UploadFilesApi = async (c: Context) => {
},
user.id,
user.email,
`File uploaded successfully, queued for processing` // Initial status message
`File uploaded successfully, queued for processing`, // Initial status message
)
})

// Queue after transaction commits to avoid race condition
await boss.send(FileProcessingQueue, { fileId: item.id, type: ProcessingJobType.FILE }, {
retryLimit: 3,
expireInHours: 12
})
await boss.send(
FileProcessingQueue,
{ fileId: item.id, type: ProcessingJobType.FILE },
{
retryLimit: 3,
expireInHours: 12,
},
)

uploadResults.push({
success: true,
Expand Down Expand Up @@ -1232,11 +1248,10 @@ export const UploadFilesApi = async (c: Context) => {
try {
await unlink(storagePath)
} catch (err) {
loggerWithChild({ email: userEmail, }).error(
error,
`Failed to clean up storage file`
);

loggerWithChild({ email: userEmail }).error(
error,
`Failed to clean up storage file`,
)
}
}

Expand Down Expand Up @@ -1575,9 +1590,24 @@ export const GetChunkContentApi = async (c: Context) => {
throw new HTTPException(404, { message: "Document missing chunk data" })
}

const index = resp.fields.chunks_pos.findIndex(
(pos: number) => pos === chunkIndex,
)
// Handle both legacy number[] format and new ChunkMetadata[] format
const index = resp.fields.chunks_pos.findIndex((pos: number | ChunkMetadata) => {
// If it's a number (legacy format), compare directly
if (typeof pos === "number") {
return pos === chunkIndex
}
// If it's a ChunkMetadata object, compare the index field
if (typeof pos === "object") {
if (pos.chunk_index !== undefined) {
return pos.chunk_index === chunkIndex
} else {
loggerWithChild({ email: userEmail }).warn(
`Unexpected chunk position object format: ${JSON.stringify(pos)}`,
)
}
}
return false
})
if (index === -1) {
throw new HTTPException(404, { message: "Chunk index not found" })
}
Expand Down
2 changes: 1 addition & 1 deletion server/docxChunks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2624,7 +2624,7 @@ export async function extractTextAndImagesWithChunksFromDocx(
`Reusing description for repeated image ${imagePath}`,
)
} else {
if(describeImages) {
if (describeImages) {
description = await describeImageWithllm(imageBuffer)
} else {
description = "This is an image."
Expand Down
Loading
Loading