Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions server/api/knowledgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import {
generateFolderVespaDocId,
generateCollectionVespaDocId,
getCollectionFilesVespaIds,
getCollectionItemsStatusByCollections,
markParentAsProcessing,
// Legacy aliases for backward compatibility
} from "@/db/knowledgeBase"
import { cleanUpAgentDb } from "@/db/agent"
Expand Down Expand Up @@ -1645,6 +1647,47 @@ export const GetFileContentApi = async (c: Context) => {
}
}

// Poll collection items status for multiple collections
export const PollCollectionsStatusApi = async (c: Context) => {
const { sub: userEmail } = c.get(JwtPayloadKey)

// Get user from database
const users = await getUserByEmail(db, userEmail)
if (!users || users.length === 0) {
throw new HTTPException(404, { message: "User not found" })
}
const user = users[0]

try {
const body = await c.req.json()
const collectionIds = body.collectionIds as string[]

if (!collectionIds || !Array.isArray(collectionIds) || collectionIds.length === 0) {
throw new HTTPException(400, { message: "collectionIds array is required" })
}

// Fetch items only for collections owned by the user (enforced in DB function)
const items = await getCollectionItemsStatusByCollections(
db,
collectionIds,
user.id,
)

return c.json({ items })
} catch (error) {
if (error instanceof HTTPException) throw error

const errMsg = getErrorMessage(error)
loggerWithChild({ email: userEmail }).error(
error,
`Failed to poll collections status: ${errMsg}`,
)
throw new HTTPException(500, {
message: "Failed to poll collections status",
})
}
}

// Download file (supports all file types with true streaming and range requests)
export const DownloadFileApi = async (c: Context) => {
const { sub: userEmail } = c.get(JwtPayloadKey)
Expand Down
4 changes: 2 additions & 2 deletions server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ let aiProviderBaseUrl = ""
let isReasoning = false

// File processing worker configuration
let fileProcessingWorkerThreads = parseInt(process.env.FILE_PROCESSING_WORKER_THREADS || "1", 10)
let fileProcessingTeamSize = parseInt(process.env.FILE_PROCESSING_TEAM_SIZE || "1", 10)
let fileProcessingWorkerThreads = parseInt(process.env.FILE_PROCESSING_WORKER_THREADS || "4", 10)
let fileProcessingTeamSize = parseInt(process.env.FILE_PROCESSING_TEAM_SIZE || "4", 10)
let fastModelReasoning = false
let slackHost = process.env.SLACK_HOST
let VESPA_NAMESPACE = "my_content"
Expand Down
177 changes: 177 additions & 0 deletions server/db/knowledgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { createId } from "@paralleldrive/cuid2"
import type { TxnOrClient } from "@/types"
import { and, asc, desc, eq, isNull, sql, or, inArray } from "drizzle-orm"
import { UploadStatus } from "@/shared/types"

// Collection CRUD operations
export const createCollection = async (
Expand Down Expand Up @@ -446,6 +447,13 @@ export const createFolder = async (
// Update collection total count
await updateCollectionTotalCount(trx, collectionId, 1)

// Mark parent folder/collection as PROCESSING when folder is created
if (parentId) {
await markParentAsProcessing(trx, parentId, false)
} else {
await markParentAsProcessing(trx, collectionId, true)
}

return folder
}

Expand Down Expand Up @@ -528,6 +536,13 @@ export const createFileItem = async (
await updateParentFolderCounts(trx, parentId, 1)
}

// Mark parent folder/collection as PROCESSING when file is uploaded
if (parentId) {
await markParentAsProcessing(trx, parentId, false)
} else {
await markParentAsProcessing(trx, collectionId, true)
}

return item
}

Expand Down Expand Up @@ -736,6 +751,39 @@ export const getCollectionFilesVespaIds = async (
return resp
}

// Get collection items status for polling - with access control
export const getCollectionItemsStatusByCollections = async (
trx: TxnOrClient,
collectionIds: string[],
userId: number,
) => {
if (collectionIds.length === 0) {
return []
}

// Fetch items only for collections owned by the user
const items = await trx
.select({
id: collectionItems.id,
uploadStatus: collectionItems.uploadStatus,
statusMessage: collectionItems.statusMessage,
retryCount: collectionItems.retryCount,
collectionId: collectionItems.collectionId,
})
.from(collectionItems)
.innerJoin(collections, eq(collectionItems.collectionId, collections.id))
.where(
and(
inArray(collectionItems.collectionId, collectionIds),
eq(collections.ownerId, userId),
isNull(collectionItems.deletedAt),
isNull(collections.deletedAt),
),
)

return items
}

export const getCollectionFoldersItemIds = async (
collectionFoldersIds: string[],
trx: TxnOrClient,
Expand Down Expand Up @@ -828,6 +876,135 @@ export const generateCollectionVespaDocId = (): string => {
return `cl-${createId()}`
}

// Helper function to mark parent (folder/collection) as PROCESSING when new items are added
export const markParentAsProcessing = async (
trx: TxnOrClient,
parentId: string | null,
isCollection: boolean,
) => {
if (!parentId) return

const updateData = {
uploadStatus: UploadStatus.PROCESSING,
updatedAt: sql`NOW()`,
}

if (isCollection) {
// Update collection status
await trx
.update(collections)
.set(updateData)
.where(eq(collections.id, parentId))
} else {
// Update folder status
await trx
.update(collectionItems)
.set(updateData)
.where(eq(collectionItems.id, parentId))

// Recursively mark parent's parent as processing
const [folder] = await trx
.select({
parentId: collectionItems.parentId,
collectionId: collectionItems.collectionId,
})
.from(collectionItems)
.where(eq(collectionItems.id, parentId))

if (folder) {
// Recursively mark parent (either another folder or the collection)
await markParentAsProcessing(
trx,
folder.parentId || folder.collectionId,
!folder.parentId, // isCollection = true if no parentId
)
}
}
}

// Helper function to update parent (collection/folder) status based on children completion
export const updateParentStatus = async (
trx: TxnOrClient,
parentId: string | null,
isCollection: boolean,
) => {
if (!parentId) return

// Fetch children based on parent type
const children = await trx
.select({ uploadStatus: collectionItems.uploadStatus })
.from(collectionItems)
.where(
and(
isCollection
? eq(collectionItems.collectionId, parentId)
: eq(collectionItems.parentId, parentId),
isCollection ? isNull(collectionItems.parentId) : sql`true`,
isNull(collectionItems.deletedAt),
),
)

// Determine parent type name for logging
const parentType = isCollection ? "collection" : "folder"

// Handle case where parent has no children
if (children.length === 0) {
const updateData = {
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed ${parentType}`,
updatedAt: sql`NOW()`,
}

if (isCollection) {
await trx.update(collections).set(updateData).where(eq(collections.id, parentId))
} else {
await trx.update(collectionItems).set(updateData).where(eq(collectionItems.id, parentId))
}
return
}

// Count completed and failed children
const completedCount = children.filter(
(c) => c.uploadStatus === UploadStatus.COMPLETED,
).length
const failedCount = children.filter(
(c) => c.uploadStatus === UploadStatus.FAILED,
).length

// Update if all children are either completed or failed
if (completedCount + failedCount === children.length) {
const updateData = {
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed ${parentType}: ${completedCount} completed, ${failedCount} failed`,
updatedAt: sql`NOW()`,
}

if (isCollection) {
await trx.update(collections).set(updateData).where(eq(collections.id, parentId))
} else {
await trx.update(collectionItems).set(updateData).where(eq(collectionItems.id, parentId))

// For folders, recursively check the parent folder/collection
const [folder] = await trx
.select({
parentId: collectionItems.parentId,
collectionId: collectionItems.collectionId,
})
.from(collectionItems)
.where(eq(collectionItems.id, parentId))

if (folder) {
// Recursively update parent (either another folder or the collection)
await updateParentStatus(
trx,
folder.parentId || folder.collectionId,
!folder.parentId, // isCollection = true if no parentId
)
}
}
}
}

export const getRecordBypath = async (path: string, trx: TxnOrClient) => {
let collectionName: string
let directoryPath: string
Expand Down
50 changes: 27 additions & 23 deletions server/queue/fileProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import { Apps, KbItemsSchema, KnowledgeBaseEntity } from "@xyne/vespa-ts/types"
import { getBaseMimeType } from "@/integrations/dataSource/config"
import { db } from "@/db/client"
import { collectionItems, collections } from "@/db/schema"
import { eq } from "drizzle-orm"
import { eq, and, isNull } from "drizzle-orm"
import { readFile } from "node:fs/promises"
import { UploadStatus } from "@/shared/types"
import { updateParentStatus } from "@/db/knowledgeBase"

const Logger = getLogger(Subsystem.Queue)

Expand Down Expand Up @@ -39,6 +40,8 @@ async function handleRetryFailure(
entityId: string,
currentRetryCount: number,
errorMessage: string,
parentId?: string | null,
collectionId?: string,
) {
const newRetryCount = currentRetryCount + 1
const maxRetries = 3 // Match pg-boss retryLimit
Expand All @@ -62,6 +65,15 @@ async function handleRetryFailure(
.update(collectionItems)
.set(updateData)
.where(eq(collectionItems.id, entityId))

// If it's a file that failed, trigger parent status update
if (entityType === ProcessingJobType.FILE && parentId !== undefined && collectionId) {
if (parentId) {
await updateParentStatus(db, parentId, false)
} else {
await updateParentStatus(db, collectionId, true)
}
}
}
} else {
// Update retry count but keep status as 'processing' for retries
Expand Down Expand Up @@ -250,6 +262,13 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) {
})
.where(eq(collectionItems.id, fileId))

// Trigger parent status update after file completion
if (file.parentId) {
await updateParentStatus(db, file.parentId, false)
} else {
await updateParentStatus(db, file.collectionId, true)
}

const endTime = Date.now()
Logger.info(
`Successfully processed file: ${fileId} in ${endTime - startTime}ms`,
Expand All @@ -264,6 +283,8 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) {
fileId,
file.retryCount || 0,
errorMessage,
file.parentId,
file.collectionId,
)

throw error // Let pg-boss handle retries
Expand Down Expand Up @@ -344,19 +365,13 @@ async function processCollectionJob(
// Insert into Vespa
await insert(vespaDoc, KbItemsSchema)

// Update status to completed
await db
.update(collections)
.set({
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed collection Vespa insertion`,
updatedAt: new Date(),
})
.where(eq(collections.id, collectionId))
// Keep collection in PROCESSING status
// It will be updated to COMPLETED only when child files/folders complete
// This prevents race condition where collection is marked complete before children are added

const endTime = Date.now()
Logger.info(
`Successfully processed collection Vespa insertion: ${collectionId} in ${endTime - startTime}ms`,
`Successfully processed collection Vespa insertion: ${collectionId} in ${endTime - startTime}ms (waiting for children to complete)`,
)
} catch (error) {
const errorMessage = getErrorMessage(error)
Expand Down Expand Up @@ -460,20 +475,9 @@ async function processFolderJob(

// Insert into Vespa
await insert(vespaDoc, KbItemsSchema)

// Update status to completed
await db
.update(collectionItems)
.set({
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed folder Vespa insertion`,
updatedAt: new Date(),
})
.where(eq(collectionItems.id, folderId))

const endTime = Date.now()
Logger.info(
`Successfully processed folder Vespa insertion: ${folderId} in ${endTime - startTime}ms`,
`Successfully processed folder Vespa insertion: ${folderId} in ${endTime - startTime}ms (waiting for children to complete)`,
)
} catch (error) {
const errorMessage = getErrorMessage(error)
Expand Down
Loading
Loading