Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions server/api/knowledgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,60 @@ export const GetFileContentApi = async (c: Context) => {
}
}

// Poll collection items status for multiple collections
export const PollCollectionsStatusApi = async (c: Context) => {
const { sub: userEmail } = c.get(JwtPayloadKey)

// Get user from database
const users = await getUserByEmail(db, userEmail)
if (!users || users.length === 0) {
throw new HTTPException(404, { message: "User not found" })
}
const user = users[0]

try {
const body = await c.req.json()
const collectionIds = body.collectionIds as string[]

if (!collectionIds || !Array.isArray(collectionIds) || collectionIds.length === 0) {
throw new HTTPException(400, { message: "collectionIds array is required" })
}

// Fetch all items for the given collections
const items = await db
.select({
id: collectionItems.id,
uploadStatus: collectionItems.uploadStatus,
statusMessage: collectionItems.statusMessage,
retryCount: collectionItems.retryCount,
collectionId: collectionItems.collectionId,
})
.from(collectionItems)
.where(
and(
sql`${collectionItems.collectionId} IN (${sql.join(
collectionIds.map((id) => sql`${id}`),
sql`, `,
)})`,
isNull(collectionItems.deletedAt),
),
)

return c.json({ items })
} catch (error) {
if (error instanceof HTTPException) throw error

const errMsg = getErrorMessage(error)
loggerWithChild({ email: userEmail }).error(
error,
`Failed to poll collections status: ${errMsg}`,
)
throw new HTTPException(500, {
message: "Failed to poll collections status",
})
}
}

// Download file (supports all file types with true streaming and range requests)
export const DownloadFileApi = async (c: Context) => {
const { sub: userEmail } = c.get(JwtPayloadKey)
Expand Down
4 changes: 2 additions & 2 deletions server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ let aiProviderBaseUrl = ""
let isReasoning = false

// File processing worker configuration
let fileProcessingWorkerThreads = parseInt(process.env.FILE_PROCESSING_WORKER_THREADS || "1", 10)
let fileProcessingTeamSize = parseInt(process.env.FILE_PROCESSING_TEAM_SIZE || "1", 10)
let fileProcessingWorkerThreads = parseInt(process.env.FILE_PROCESSING_WORKER_THREADS || "4", 10)
let fileProcessingTeamSize = parseInt(process.env.FILE_PROCESSING_TEAM_SIZE || "4", 10)
let fastModelReasoning = false
let slackHost = process.env.SLACK_HOST
let VESPA_NAMESPACE = "my_content"
Expand Down
84 changes: 84 additions & 0 deletions server/db/knowledgeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { createId } from "@paralleldrive/cuid2"
import type { TxnOrClient } from "@/types"
import { and, asc, desc, eq, isNull, sql, or, inArray } from "drizzle-orm"
import { UploadStatus } from "@/shared/types"

// Collection CRUD operations
export const createCollection = async (
Expand Down Expand Up @@ -828,6 +829,89 @@ export const generateCollectionVespaDocId = (): string => {
return `cl-${createId()}`
}

// Helper function to update parent (collection/folder) status based on children completion
export const updateParentStatus = async (
trx: TxnOrClient,
parentId: string | null,
isCollection: boolean,
) => {
if (!parentId) return

// Fetch children based on parent type
const children = await trx
.select({ uploadStatus: collectionItems.uploadStatus })
.from(collectionItems)
.where(
and(
isCollection
? eq(collectionItems.collectionId, parentId)
: eq(collectionItems.parentId, parentId),
isCollection ? isNull(collectionItems.parentId) : sql`true`,
isNull(collectionItems.deletedAt),
),
)

// Determine parent type name for logging
const parentType = isCollection ? "collection" : "folder"

// Handle case where parent has no children
if (children.length === 0) {
const updateData = {
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed ${parentType}`,
updatedAt: sql`NOW()`,
}

if (isCollection) {
await trx.update(collections).set(updateData).where(eq(collections.id, parentId))
} else {
await trx.update(collectionItems).set(updateData).where(eq(collectionItems.id, parentId))
}
return
}

// Count completed and failed children
const completedCount = children.filter(
(c) => c.uploadStatus === UploadStatus.COMPLETED,
).length
const failedCount = children.filter(
(c) => c.uploadStatus === UploadStatus.FAILED,
).length

// Update if all children are either completed or failed
if (completedCount + failedCount === children.length) {
const updateData = {
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed ${parentType}: ${completedCount} completed, ${failedCount} failed`,
updatedAt: sql`NOW()`,
}

if (isCollection) {
await trx.update(collections).set(updateData).where(eq(collections.id, parentId))
} else {
await trx.update(collectionItems).set(updateData).where(eq(collectionItems.id, parentId))

// For folders, recursively check the parent folder/collection
const [folder] = await trx
.select({
parentId: collectionItems.parentId,
collectionId: collectionItems.collectionId,
})
.from(collectionItems)
.where(eq(collectionItems.id, parentId))

if (folder) {
// Recursively update parent (either another folder or the collection)
await updateParentStatus(
trx,
folder.parentId || folder.collectionId,
!folder.parentId, // isCollection = true if no parentId
)
}
}
}
}

export const getRecordBypath = async (path: string, trx: TxnOrClient) => {
let collectionName: string
let directoryPath: string
Expand Down
50 changes: 27 additions & 23 deletions server/queue/fileProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import { Apps, KbItemsSchema, KnowledgeBaseEntity } from "@xyne/vespa-ts/types"
import { getBaseMimeType } from "@/integrations/dataSource/config"
import { db } from "@/db/client"
import { collectionItems, collections } from "@/db/schema"
import { eq } from "drizzle-orm"
import { eq, and, isNull } from "drizzle-orm"
import { readFile } from "node:fs/promises"
import { UploadStatus } from "@/shared/types"
import { updateParentStatus } from "@/db/knowledgeBase"

const Logger = getLogger(Subsystem.Queue)

Expand Down Expand Up @@ -39,6 +40,8 @@ async function handleRetryFailure(
entityId: string,
currentRetryCount: number,
errorMessage: string,
parentId?: string | null,
collectionId?: string,
) {
const newRetryCount = currentRetryCount + 1
const maxRetries = 3 // Match pg-boss retryLimit
Expand All @@ -62,6 +65,15 @@ async function handleRetryFailure(
.update(collectionItems)
.set(updateData)
.where(eq(collectionItems.id, entityId))

// If it's a file that failed, trigger parent status update
if (entityType === ProcessingJobType.FILE && parentId !== undefined && collectionId) {
if (parentId) {
await updateParentStatus(db, parentId, false)
} else {
await updateParentStatus(db, collectionId, true)
}
}
}
} else {
// Update retry count but keep status as 'processing' for retries
Expand Down Expand Up @@ -250,6 +262,13 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) {
})
.where(eq(collectionItems.id, fileId))

// Trigger parent status update after file completion
if (file.parentId) {
await updateParentStatus(db, file.parentId, false)
} else {
await updateParentStatus(db, file.collectionId, true)
}

const endTime = Date.now()
Logger.info(
`Successfully processed file: ${fileId} in ${endTime - startTime}ms`,
Expand All @@ -264,6 +283,8 @@ async function processFileJob(jobData: FileProcessingJob, startTime: number) {
fileId,
file.retryCount || 0,
errorMessage,
file.parentId,
file.collectionId,
)

throw error // Let pg-boss handle retries
Expand Down Expand Up @@ -344,19 +365,13 @@ async function processCollectionJob(
// Insert into Vespa
await insert(vespaDoc, KbItemsSchema)

// Update status to completed
await db
.update(collections)
.set({
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed collection Vespa insertion`,
updatedAt: new Date(),
})
.where(eq(collections.id, collectionId))
// Keep collection in PROCESSING status
// It will be updated to COMPLETED only when child files/folders complete
// This prevents race condition where collection is marked complete before children are added

const endTime = Date.now()
Logger.info(
`Successfully processed collection Vespa insertion: ${collectionId} in ${endTime - startTime}ms`,
`Successfully processed collection Vespa insertion: ${collectionId} in ${endTime - startTime}ms (waiting for children to complete)`,
)
} catch (error) {
const errorMessage = getErrorMessage(error)
Expand Down Expand Up @@ -460,20 +475,9 @@ async function processFolderJob(

// Insert into Vespa
await insert(vespaDoc, KbItemsSchema)

// Update status to completed
await db
.update(collectionItems)
.set({
uploadStatus: UploadStatus.COMPLETED,
statusMessage: `Successfully processed folder Vespa insertion`,
updatedAt: new Date(),
})
.where(eq(collectionItems.id, folderId))

const endTime = Date.now()
Logger.info(
`Successfully processed folder Vespa insertion: ${folderId} in ${endTime - startTime}ms`,
`Successfully processed folder Vespa insertion: ${folderId} in ${endTime - startTime}ms (waiting for children to complete)`,
)
} catch (error) {
const errorMessage = getErrorMessage(error)
Expand Down
2 changes: 2 additions & 0 deletions server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ import {
GetFileContentApi,
DownloadFileApi,
GetChunkContentApi,
PollCollectionsStatusApi,
} from "@/api/knowledgeBase"
import {
searchKnowledgeBaseSchema,
Expand Down Expand Up @@ -1114,6 +1115,7 @@ export const AppRoutes = app
zValidator("query", searchKnowledgeBaseSchema),
SearchKnowledgeBaseApi,
)
.post("/cl/poll-status", PollCollectionsStatusApi)
.get("/cl/:clId", GetCollectionApi)
.put("/cl/:clId", UpdateCollectionApi)
.delete("/cl/:clId", DeleteCollectionApi)
Expand Down
50 changes: 6 additions & 44 deletions server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import { getErrorMessage } from "@/utils"
import { boss } from "@/queue"
import { FileProcessingQueue } from "@/queue/api-server-queue"
import { processJob, type ProcessingJob } from "@/queue/fileProcessor"
import { db } from "@/db/client"
import { collectionItems } from "@/db/schema"
import { eq } from "drizzle-orm"
import { UploadStatus } from "@/shared/types"
import config from "@/config"

const Logger = getLogger(Subsystem.Queue)
Expand All @@ -26,48 +22,14 @@ export const initFileProcessingWorker = async () => {
const jobType = jobData.type || ProcessingJobType.FILE

Logger.info(`Processing ${jobType} job: ${JSON.stringify(jobData)}`)

// For file jobs, update status to processing (collections and folders don't need status updates)
if (jobType === ProcessingJobType.FILE) {
const fileId = (jobData as any).fileId

// Get file info from database
const fileItem = await db
.select({ name: collectionItems.name })
.from(collectionItems)
.where(eq(collectionItems.id, fileId))
.limit(1)

const fileName = fileItem[0]?.name || 'Unknown'

// Update status to 'processing'
await db
.update(collectionItems)
.set({
uploadStatus: UploadStatus.PROCESSING,
statusMessage: `Processing file: ${fileName}`,
updatedAt: new Date()
})
.where(eq(collectionItems.id, fileId))
}


// Process the job using the unified processor
// The processJob function handles all status updates internally:
// - Sets status to PROCESSING
// - Sets status to COMPLETED after success
// - Calls updateParentStatus to check parent completion
await processJob(job as { data: ProcessingJob })

// For file jobs, update status to completed
if (jobType === ProcessingJobType.FILE) {
const fileId = (jobData as any).fileId

await db
.update(collectionItems)
.set({
uploadStatus: UploadStatus.COMPLETED,
statusMessage: 'File processed successfully',
updatedAt: new Date()
})
.where(eq(collectionItems.id, fileId))
}


Logger.info(`✅ ${jobType} job processed successfully`)

} catch (error) {
Expand Down
Loading