diff --git a/frontend/src/routes/_authenticated/admin/integrations/slack.tsx b/frontend/src/routes/_authenticated/admin/integrations/slack.tsx index 5539a9ffe..c91803fef 100644 --- a/frontend/src/routes/_authenticated/admin/integrations/slack.tsx +++ b/frontend/src/routes/_authenticated/admin/integrations/slack.tsx @@ -1,13 +1,13 @@ import { createFileRoute, useRouterState } from "@tanstack/react-router" -import { useEffect, useRef, useState } from "react" +import { useEffect, useRef, useState, useCallback } from "react" import { useForm } from "@tanstack/react-form" import { toast, useToast } from "@/hooks/use-toast" import { useNavigate } from "@tanstack/react-router" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" import { Button } from "@/components/ui/button" -import { Pencil, ArrowLeft } from "lucide-react" -import { api } from "@/api" +import { Pencil, ArrowLeft, Square, RotateCcw, Pause, Play, RefreshCw } from "lucide-react" +import { api, wsClient } from "@/api" import { getErrorMessage } from "@/lib/utils" import { Apps, AuthType, IngestionType, UserRole } from "shared/types" import { PublicUser, PublicWorkspace } from "shared/types" @@ -40,7 +40,88 @@ import { TableHeader, TableRow, } from "@/components/ui/table" -import { wsClient } from "@/api" // ensure wsClient is imported +import { Progress } from "@/components/ui/progress" +// Define ingestion status type locally to avoid importing from server schema +type DbIngestionStatus = "started" | "in_progress" | "completed" | "failed" | "cancelled" | "paused" + +// Slack-specific metadata structure matching backend response +interface SlackIngestionMetadata { + slack?: { + websocketData?: { + progress?: { + totalChannels: number; + processedChannels: number; + currentChannel: string; + totalMessages: number; + processedMessages: number; + }; + connectorId?: string; + }; + ingestionState?: { + endDate?: string; + startDate?: string; + lastUpdated: string; + channelsToIngest?: string[]; + currentChannelId?: string; + includeBotMessage?: boolean; + currentChannelIndex?: number; + lastMessageTimestamp?: string; + }; + }; +} + +// Complete ingestion object matching backend response +interface IngestionData { + id: number; + userId: number; + connectorId: number; + workspaceId: number; + status: DbIngestionStatus; + metadata: SlackIngestionMetadata; + errorMessage?: string | null; + startedAt?: string | null; + completedAt?: string | null; + createdAt: string; + updatedAt: string; +} + +// Main response type matching backend API response +interface IngestionStatusResponse { + success: boolean; + hasActiveIngestion: boolean; + ingestion?: IngestionData; +} + +// Frontend-specific progress data structure for UI display +interface ProgressData { + totalChannels?: number; + processedChannels?: number; + currentChannel?: string; + totalMessages?: number; + processedMessages?: number; +} + + + +// Helper function to safely extract progress data from ingestion metadata +const getProgressData = (ingestion: IngestionData): ProgressData => { + // Try to get progress from multiple sources + const progress = ingestion.metadata?.slack?.websocketData?.progress; + const state = ingestion.metadata?.slack?.ingestionState; + + return { + totalChannels: progress?.totalChannels, + processedChannels: progress?.processedChannels, + currentChannel: progress?.currentChannel, + totalMessages: progress?.totalMessages, + processedMessages: progress?.processedMessages, + // Fallback to state data if progress is not available + ...(!progress && state && { + totalChannels: state.channelsToIngest?.length, + processedChannels: state.currentChannelIndex, + }) + }; +} export const updateConnectorStatus = async ( connectorId: string, @@ -766,6 +847,251 @@ export const Slack = ({ const [isRegularIngestionActive, setIsRegularIngestionActive] = useState(false) + // Enhanced ingestion management state + const [ingestionStatus, setIngestionStatus] = useState(null) + const [ingestionLoading, setIngestionLoading] = useState(true) + const [ingestionError, setIngestionError] = useState(null) + + // Refs for polling management + const pollingIntervalRef = useRef(null) + const previousStatusRef = useRef() + const isFetchingRef = useRef(false) + + // Simple polling functions for ingestion status + const stopPolling = useCallback(() => { + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + }, []); + + const fetchIngestionStatus = useCallback(async (connectorId: string) => { + // Prevent concurrent calls + if (isFetchingRef.current) { + return; + } + + isFetchingRef.current = true; + try { + setIngestionError(null); + + const response = await api.ingestion.status.$get({ + query: { connectorId } + }); + + if (!response.ok) { + throw new Error('Failed to fetch ingestion status'); + } + + const data = await response.json(); + const currentStatus = data.ingestion?.status; + + // Show notifications for status changes (but not on initial page load) + if (currentStatus !== previousStatusRef.current) { + const isInitialLoad = previousStatusRef.current === undefined; + previousStatusRef.current = currentStatus; + + // Only show completion toast for fresh completions, not on page refresh/load + if (currentStatus === 'completed' && !isInitialLoad) { + toast({ + title: "Ingestion Completed", + description: "Slack channel ingestion has finished successfully!", + duration: 10000, + }); + stopPolling(); + } else if (currentStatus === 'failed') { + toast({ + title: "Ingestion Failed", + description: data.ingestion?.errorMessage || "Ingestion failed with an unknown error", + variant: "destructive", + duration: 10000, + }); + stopPolling(); + } else if (currentStatus === 'cancelled' && !isInitialLoad) { + toast({ + title: "Ingestion Cancelled", + description: "Ingestion was cancelled by user", + duration: 10000, + }); + stopPolling(); + } + } + + // Start/stop polling based on status - Fixed: More explicit status checking to avoid empty string issues + const shouldPoll = data.hasActiveIngestion && + data.ingestion?.status && + ['pending', 'in_progress'].includes(data.ingestion.status); + + + // Fixed: Use ref to check current polling state instead of potentially stale state + const isCurrentlyPolling = pollingIntervalRef.current !== null; + + if (shouldPoll && !isCurrentlyPolling) { + // Always clear any existing interval first to prevent duplicates + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + } + // Fixed: Capture connectorId in closure to avoid stale values + const currentConnectorId = connectorId; + pollingIntervalRef.current = setInterval(() => { + fetchIngestionStatus(currentConnectorId); + }, 10000); // Poll every 10 seconds + } else if (!shouldPoll && isCurrentlyPolling) { + stopPolling(); + } else if (!shouldPoll && pollingIntervalRef.current) { + // Force stop polling even if isPolling state is wrong + stopPolling(); + } + + setIngestionStatus(data); + setIngestionLoading(false); + + return data; + } catch (err) { + setIngestionError(getErrorMessage(err)); + setIngestionLoading(false); + stopPolling(); + throw err; + } finally { + isFetchingRef.current = false; + } + }, [stopPolling]); // Fixed: Removed isPolling from dependencies to avoid stale closure issues + + // Enhanced start ingestion function + const startChannelIngestion = useCallback(async (connectorId: string, channelsToIngest: string[], startDate: string, endDate: string, includeBotMessage: boolean) => { + try { + setIsManualIngestionActive(true); + + const isAdmin = user.role === UserRole.Admin || user.role === UserRole.SuperAdmin; + const response = isAdmin + ? await api.admin.slack.ingest_more_channel.$post({ + json: { connectorId, channelsToIngest, startDate, endDate, includeBotMessage }, + }) + : await api.slack.ingest_more_channel.$post({ + json: { connectorId, channelsToIngest, startDate, endDate, includeBotMessage }, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(errorText); + } + + toast({ + title: "Ingestion Started", + description: "Slack channel ingestion has been initiated successfully.", + duration: 20000, + }); + + // Refresh status and start polling + await fetchIngestionStatus(connectorId); + } catch (err) { + toast({ + title: "Failed to Start Ingestion", + description: getErrorMessage(err), + variant: "destructive", + duration: 20000, + }); + throw err; + } finally { + setIsManualIngestionActive(false); + } + }, [user.role, fetchIngestionStatus]); // Fixed: Include fetchIngestionStatus in dependencies + + // Resume ingestion function + const resumeIngestion = useCallback(async (ingestionId: number, connectorId: string) => { + try { + const response = await api.ingestion.resume.$post({ + json: { ingestionId: ingestionId.toString() } + }); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.message || 'Failed to resume ingestion'); + } + + toast({ + title: "Ingestion Resumed", + description: "Ingestion has been resumed from where it left off.", + duration: 20000, + }); + + await fetchIngestionStatus(connectorId); + } catch (err) { + toast({ + title: "Failed to Resume Ingestion", + description: getErrorMessage(err), + variant: "destructive", + duration: 20000, + }); + throw err; + } + }, [fetchIngestionStatus]); // Fixed: Include fetchIngestionStatus in dependencies + + // Cancel ingestion function + const cancelIngestion = useCallback(async (ingestionId: number, connectorId: string) => { + try { + const response = await api.ingestion.cancel.$post({ + json: { ingestionId: ingestionId.toString() } + }); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.message || 'Failed to cancel ingestion'); + } + + toast({ + title: "Ingestion Cancelled", + description: "Ingestion is being cancelled...", + duration: 20000, + }); + + // Force stop polling before checking status + stopPolling(); + await fetchIngestionStatus(connectorId); + } catch (err) { + toast({ + title: "Failed to Cancel Ingestion", + description: getErrorMessage(err), + variant: "destructive", + duration: 20000, + }); + throw err; + } + }, [stopPolling, fetchIngestionStatus]); // Fixed: Include dependencies + + // Pause ingestion function + const pauseIngestion = useCallback(async (ingestionId: number, connectorId: string) => { + try { + const response = await api.ingestion.pause.$post({ + json: { ingestionId: ingestionId.toString() } + }); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.message || 'Failed to pause ingestion'); + } + + toast({ + title: "Ingestion Paused", + description: "Ingestion has been paused and can be resumed later.", + duration: 20000, + }); + + // Force stop polling before checking status + stopPolling(); + await fetchIngestionStatus(connectorId); + } catch (err) { + toast({ + title: "Failed to Pause Ingestion", + description: getErrorMessage(err), + variant: "destructive", + duration: 20000, + }); + throw err; + } + }, [stopPolling, fetchIngestionStatus]); // Fixed: Include dependencies + + const { isPending, data, refetch } = useQuery({ queryKey: ["all-connectors"], queryFn: async (): Promise => { @@ -809,6 +1135,21 @@ export const Slack = ({ const slackConnector = data?.find( (v) => v.app === Apps.Slack && v.authType === AuthType.OAuth, ) + + // Initialize ingestion status checking when connector is available + useEffect(() => { + if (slackConnector?.cId) { + fetchIngestionStatus(slackConnector.cId.toString()); + } + }, [slackConnector?.cId, fetchIngestionStatus]); // Fixed: Include fetchIngestionStatus in dependencies + + // Cleanup polling on component unmount + useEffect(() => { + return () => { + stopPolling(); + }; + }, [stopPolling]); + useEffect(() => { let socket: WebSocket | null = null if (!isPending && data && data.length > 0) { @@ -1034,6 +1375,14 @@ export const Slack = ({ slackProgress={slackProgress} slackUserStats={slackUserPartialIngestionStats} userRole={user.role} + ingestionStatus={ingestionStatus} + ingestionLoading={ingestionLoading} + ingestionError={ingestionError} + startChannelIngestion={startChannelIngestion} + resumeIngestion={resumeIngestion} + pauseIngestion={pauseIngestion} + cancelIngestion={cancelIngestion} + fetchIngestionStatus={fetchIngestionStatus} /> ) : (

Please connect Slack OAuth to enable manual ingestion.

@@ -1134,6 +1483,15 @@ interface ManualIngestionFormProps { slackProgress: number slackUserStats: { [email: string]: any } userRole: PublicUser["role"] + // Enhanced ingestion management props + ingestionStatus: IngestionStatusResponse | null + ingestionLoading: boolean + ingestionError: string | null + startChannelIngestion: (connectorId: string, channelsToIngest: string[], startDate: string, endDate: string, includeBotMessage: boolean) => Promise + resumeIngestion: (ingestionId: number, connectorId: string) => Promise + pauseIngestion: (ingestionId: number, connectorId: string) => Promise + cancelIngestion: (ingestionId: number, connectorId: string) => Promise + fetchIngestionStatus: (connectorId: string) => Promise } const ManualIngestionForm = ({ @@ -1143,6 +1501,15 @@ const ManualIngestionForm = ({ slackProgress, slackUserStats, userRole, + // Enhanced ingestion management props + ingestionStatus, + ingestionLoading, + ingestionError, + startChannelIngestion, + resumeIngestion, + pauseIngestion, + cancelIngestion, + fetchIngestionStatus, }: ManualIngestionFormProps) => { const { toast } = useToast() // const startTimeRef = useRef(null) @@ -1159,8 +1526,6 @@ const ManualIngestionForm = ({ return } - setIsManualIngestionActive(true) - try { const channelIdsList = value.channelIds .split(",") @@ -1173,110 +1538,293 @@ const ManualIngestionForm = ({ description: "Please provide at least one valid channel ID.", variant: "destructive", }) - setIsManualIngestionActive(false) - return - } - - // Role-based API routing - const isAdmin = - userRole === UserRole.Admin || userRole === UserRole.SuperAdmin - - const response = isAdmin - ? await api.admin.slack.ingest_more_channel.$post({ - json: { - connectorId: connectorId, - channelsToIngest: channelIdsList, - startDate: value.startDate, - endDate: value.endDate, - includeBotMessage: value.includeBotMessage, - }, - }) - : await api.slack.ingest_more_channel.$post({ - json: { - connectorId: connectorId, - channelsToIngest: channelIdsList, - startDate: value.startDate, - endDate: value.endDate, - includeBotMessage: value.includeBotMessage, - }, - }) - - if (!response.ok) { - const errorText = await response.text() - toast({ - title: "Ingestion Failed", - description: `Error: ${errorText}`, - variant: "destructive", - }) - setIsManualIngestionActive(false) return } - toast({ - title: "Manual Ingestion Started", - description: "Ingestion process initiated successfully.", - }) - + // Use the enhanced start ingestion function + await startChannelIngestion(connectorId, channelIdsList, value.startDate, value.endDate, value.includeBotMessage) form.reset() - setIsManualIngestionActive(false) } catch (error) { - toast({ - title: "An error occurred", - description: `Error: ${getErrorMessage(error)}`, - variant: "destructive", - }) - setIsManualIngestionActive(false) + // Error handling is done in the startChannelIngestion function } }, }) + // Render ingestion status if there's an active or previous ingestion + const renderIngestionStatus = () => { + if (ingestionLoading) { + return ( +
+
+
+ Checking ingestion status... +
+
+ ) + } + + if (ingestionError) { + return ( +
+
+
Error: {ingestionError}
+ +
+
+ ) + } + + if (!ingestionStatus?.hasActiveIngestion) { + return null // No status to show + } + + const ingestion = ingestionStatus.ingestion! + + if (ingestion.status === 'in_progress') { + return ( +
+
+
+ Ingestion In Progress +
+ + +
+
+ + {(() => { + // Extract progress from metadata if available + const progressData = getProgressData(ingestion); + + if (progressData && (progressData.totalChannels || progressData.processedChannels)) { + return ( +
+
+
+ Channels + {progressData.processedChannels || 0} / {progressData.totalChannels || 0} +
+ 0 ? ((progressData.processedChannels || 0) / (progressData.totalChannels ?? 0)) * 100 : 0} + className="h-2" + /> +
+ +
+
+ Messages Processed + {progressData.processedMessages || 0} +
+
+ + {progressData.currentChannel && ( +
+ Current: {progressData.currentChannel} +
+ )} +
+ ); + } else { + return ( +
+ Progress information not available yet... +
+ ); + } + })()} +
+
+ ) + } + + if (ingestion.status === 'paused') { + return ( +
+
+
+
+ Ingestion Paused +
+ {(() => { + const progressData = getProgressData(ingestion); + if (progressData && (progressData.totalChannels || progressData.processedChannels || progressData.processedMessages)) { + return ( +
+ Progress: {progressData.processedChannels || 0} / {progressData.totalChannels || 0} channels, {progressData.processedMessages || 0} messages processed +
+ ); + } + return null; + })()} +
+
+ + +
+
+
+ ) + } + + if (['failed', 'cancelled'].includes(ingestion.status)) { + return ( +
+
+
+
+ Ingestion {ingestion.status === 'failed' ? 'Failed' : 'Cancelled'} +
+ {(() => { + const progressData = getProgressData(ingestion); + return progressData && (progressData.totalChannels || progressData.processedChannels) && ( +
+ Progress: {progressData.processedChannels || 0} / {progressData.totalChannels || 0} channels +
+ ); + })()} +
+
+ + {ingestion.status === 'failed' && ( + + )} +
+
+
+ ) + } + + if (ingestion.status === 'completed') { + return ( +
+
+
+
Ingestion Completed
+ {(() => { + const progressData = getProgressData(ingestion); + return progressData && (progressData.processedChannels || progressData.processedMessages) && ( +
+ Processed {progressData.processedChannels || 0} channels with {progressData.processedMessages || 0} messages +
+ ); + })()} +
+
+
+ ) + } + + return null + } + return ( -
{ - e.preventDefault() - form.handleSubmit() - }} - className="grid w-full max-w-sm items-center gap-1.5" - > - - ( - field.handleChange(e.target.value)} - placeholder="e.g., C123,C456" +
+ {renderIngestionStatus()} + + {/* Only show form if no active ingestion */} + {!ingestionStatus?.hasActiveIngestion || !['in_progress', 'paused', 'failed'].includes(ingestionStatus?.ingestion?.status || '') ? ( + { + e.preventDefault() + form.handleSubmit() + }} + className="grid w-full max-w-sm items-center gap-1.5" + > + + ( + field.handleChange(e.target.value)} + placeholder="e.g., C123,C456" + /> + )} /> - )} - /> - - ( - field.handleChange(e.target.value)} + + ( + field.handleChange(e.target.value)} + /> + )} /> - )} - /> - - ( - field.handleChange(e.target.value)} + + ( + field.handleChange(e.target.value)} + /> + )} /> - )} - />
- - + + + ) : null} +
) } diff --git a/server/api/admin.ts b/server/api/admin.ts index 767e39130..6d7069131 100644 --- a/server/api/admin.ts +++ b/server/api/admin.ts @@ -1551,6 +1551,9 @@ export const StartGoogleIngestionApi = async (c: Context) => { }) } } +// API endpoint for starting resumable Slack channel ingestion +// Creates an ingestion record and starts background processing +// Returns immediate response while ingestion runs in sync-server export const IngestMoreChannelApi = async (c: Context) => { const { sub } = c.get(JwtPayloadKey) // @ts-ignore @@ -1563,27 +1566,102 @@ export const IngestMoreChannelApi = async (c: Context) => { } try { + // Validate user exists and has access + const userRes = await getUserByEmail(db, sub) + if (!userRes || !userRes.length) { + loggerWithChild({ email: sub }).error( + { sub }, + "No user found for sub in IngestMoreChannelApi", + ) + throw new NoUserFound({}) + } + const [user] = userRes + + // Validate connector exists and user has access + const connector = await getConnector(db, payload.connectorId) + if (!connector) { + throw new HTTPException(404, { message: "Connector not found" }) + } + + // Import ingestion functions for database operations + const { createIngestion, hasActiveIngestion } = await import("@/db/ingestion") + + // Prevent concurrent ingestions using database transaction to avoid race conditions + // This atomically checks for active ingestions and creates new one if none exist + const ingestion = await db.transaction(async (trx) => { + const hasActive = await hasActiveIngestion(trx, user.id, connector.id) + if (hasActive) { + throw new HTTPException(409, { + message: "An ingestion is already in progress for this connector. Please wait for it to complete or cancel it first." + }) + } + + // Create ingestion record with initial metadata for resumability + // All state needed for resuming is stored in the metadata field + return await createIngestion(trx, { + userId: user.id, + connectorId: connector.id, + workspaceId: connector.workspaceId, + status: "pending", + metadata: { + slack: { + // Data sent to frontend via WebSocket for progress display + websocketData: { + connectorId: connector.externalId, + progress: { + totalChannels: payload.channelsToIngest.length, + processedChannels: 0, + totalMessages: 0, + processedMessages: 0, + }, + }, + // Internal state data for resumability + ingestionState: { + channelsToIngest: payload.channelsToIngest, + startDate: payload.startDate, + endDate: payload.endDate, + includeBotMessage: payload.includeBotMessage, + currentChannelIndex: 0, // Resume from this channel + lastUpdated: new Date().toISOString(), + }, + }, + }, + }) + }) + + // Start background ingestion processing asynchronously + // Ingestion runs in sync-server while API returns immediately const email = sub - const resp = await handleSlackChannelIngestion( - payload.connectorId, + handleSlackChannelIngestion( + connector.id, payload.channelsToIngest, payload.startDate, payload.endDate, email, payload.includeBotMessage, - ) + ingestion.id, // Pass ingestion ID for progress tracking + ).catch((error) => { + loggerWithChild({ email: sub }).error( + error, + `Background Slack channel ingestion failed for connector ${connector.id}: ${getErrorMessage(error)}`, + ) + }) + + // Return immediate success response to frontend + // Actual progress will be communicated via WebSocket return c.json({ success: true, - message: "Successfully ingested the channels", + message: "Slack channel ingestion started.", + ingestionId: ingestion.id, }) } catch (error) { loggerWithChild({ email: sub }).error( error, - "Failed to ingest Slack channels", + "Failed to start Slack channel ingestion", ) - return c.json({ - success: false, - message: getErrorMessage(error), + if (error instanceof HTTPException) throw error + throw new HTTPException(500, { + message: `Failed to start Slack channel ingestion: ${getErrorMessage(error)}`, }) } } diff --git a/server/api/ingestion.ts b/server/api/ingestion.ts new file mode 100644 index 000000000..0e21fbf13 --- /dev/null +++ b/server/api/ingestion.ts @@ -0,0 +1,316 @@ +// API endpoints for managing resumable Slack channel ingestion lifecycle +// Provides cancel, resume, delete, and status operations for ingestion records + +import type { Context } from "hono" +import { HTTPException } from "hono/http-exception" +import { z } from "zod" +import { eq, and, sql } from "drizzle-orm" +import { db } from "@/db/client" +import { getUserByEmail } from "@/db/user" +import { getConnector } from "@/db/connector" +import { + getActiveIngestionForUser, + getIngestionById, + updateIngestionStatus, +} from "@/db/ingestion" +import { ingestions, type SelectIngestion } from "@/db/schema/ingestions" +import { getLogger, getLoggerWithChild } from "@/logger" +import { Subsystem } from "@/types" +import { getErrorMessage } from "@/utils" +import config from "@/config" +import { handleSlackChannelIngestion } from "@/integrations/slack/channelIngest" + +const { JwtPayloadKey } = config +const Logger = getLogger(Subsystem.Api) +const loggerWithChild = getLoggerWithChild(Subsystem.Api) + +// Zod validation schemas for API request parameters +export const getIngestionStatusSchema = z.object({ + connectorId: z.string(), +}) + +export const cancelIngestionSchema = z.object({ + ingestionId: z.string(), +}) + +export const resumeIngestionSchema = z.object({ + ingestionId: z.string(), +}) + + +export const pauseIngestionSchema = z.object({ + ingestionId: z.string(), +}) + +// API endpoint to check current ingestion status for a connector +// Used by frontend to determine what UI to show when user visits page +// Returns ingestion details including progress and current state +export const GetIngestionStatusApi = async (c: Context) => { + const { sub } = c.get(JwtPayloadKey) + // @ts-ignore - Payload is validated at the endpoint declaration level in sync-server using zValidator + const { connectorId } = c.req.valid("query") + + try { + // Validate user authentication + const userRes = await getUserByEmail(db, sub) + if (!userRes || !userRes.length) { + throw new HTTPException(404, { message: "User not found" }) + } + const [user] = userRes + + // Validate connector exists and user has access + const connector = await getConnector(db, parseInt(connectorId)) + if (!connector) { + throw new HTTPException(404, { message: "Connector not found" }) + } + + // SECURITY: Ensure connector belongs to current user/workspace to prevent cross-tenant access + if (connector.userId !== user.id || connector.workspaceId !== user.workspaceId) { + throw new HTTPException(403, { message: "Forbidden: connector does not belong to you" }) + } + + // Check for any active ingestion for this user+connector + const activeIngestion = await getActiveIngestionForUser( + db, + user.id, + connector.id + ) + + // If no active ingestion, check for recent completed/cancelled ingestion for status display + let ingestionToReturn = activeIngestion + if (!activeIngestion) { + const recentIngestion = await db + .select() + .from(ingestions) + .where( + and( + eq(ingestions.userId, user.id), + eq(ingestions.connectorId, connector.id), + sql`status IN ('completed', 'cancelled')` + ) + ) + .orderBy(sql`updated_at DESC`) + .limit(1) + + ingestionToReturn = (recentIngestion[0] as SelectIngestion) || null + } + + // Return status info for frontend UI state management + return c.json({ + success: true, + hasActiveIngestion: !!activeIngestion, + ingestion: ingestionToReturn, + }) + } catch (error) { + loggerWithChild({ email: sub }).error( + error, + "Failed to get ingestion status" + ) + if (error instanceof HTTPException) throw error + throw new HTTPException(500, { + message: `Failed to get ingestion status: ${getErrorMessage(error)}`, + }) + } +} + +// API endpoint to cancel a currently running ingestion +// Sets status to 'cancelled' and stops processing +// Only works for in_progress ingestions +export const CancelIngestionApi = async (c: Context) => { + const { sub } = c.get(JwtPayloadKey) + // @ts-ignore - Payload is validated at the endpoint declaration level in sync-server using zValidator + const payload = c.req.valid("json") as { ingestionId: string } + + try { + // Validate user authentication + const userRes = await getUserByEmail(db, sub) + if (!userRes || !userRes.length) { + throw new HTTPException(404, { message: "User not found" }) + } + const [user] = userRes + + // Find the ingestion record + const ingestion = await getIngestionById(db, parseInt(payload.ingestionId)) + if (!ingestion) { + throw new HTTPException(404, { message: "Ingestion not found" }) + } + + // Security check - ensure user owns this ingestion + if (ingestion.userId !== user.id) { + throw new HTTPException(403, { message: "Access denied" }) + } + + // Business rule - can only cancel active ingestions (in_progress, failed, paused) + if (!["in_progress", "failed", "paused"].includes(ingestion.status)) { + throw new HTTPException(400, { + message: "Can only cancel in-progress, failed, or paused ingestions", + }) + } + + // Update status to cancelled - sync-server will detect this and stop + await updateIngestionStatus(db, parseInt(payload.ingestionId), "cancelled") + + return c.json({ + success: true, + message: "Ingestion cancelled successfully", + }) + } catch (error) { + loggerWithChild({ email: sub }).error(error, "Failed to cancel ingestion") + if (error instanceof HTTPException) throw error + throw new HTTPException(500, { + message: `Failed to cancel ingestion: ${getErrorMessage(error)}`, + }) + } +} + +// API endpoint to pause a currently running ingestion +// Sets status to 'paused' and stops processing +// Only works for in_progress ingestions +export const PauseIngestionApi = async (c: Context) => { + const { sub } = c.get(JwtPayloadKey) + // @ts-ignore - Payload is validated at the endpoint declaration level in sync-server using zValidator + const payload = c.req.valid("json") as { ingestionId: string } + + try { + // Validate user authentication + const userRes = await getUserByEmail(db, sub) + if (!userRes || !userRes.length) { + throw new HTTPException(404, { message: "User not found" }) + } + const [user] = userRes + + // Find the ingestion record + const ingestion = await getIngestionById(db, parseInt(payload.ingestionId)) + if (!ingestion) { + throw new HTTPException(404, { message: "Ingestion not found" }) + } + + // Security check - ensure user owns this ingestion + if (ingestion.userId !== user.id) { + throw new HTTPException(403, { message: "Access denied" }) + } + + // Business rule - can only pause active ingestions + if (ingestion.status !== "in_progress") { + throw new HTTPException(400, { + message: "Can only pause in-progress ingestions", + }) + } + + // Update status to paused - ingestion process will detect this and pause + await updateIngestionStatus(db, parseInt(payload.ingestionId), "paused") + + return c.json({ + success: true, + message: "Ingestion paused successfully", + }) + } catch (error) { + loggerWithChild({ email: sub }).error(error, "Failed to pause ingestion") + if (error instanceof HTTPException) throw error + throw new HTTPException(500, { + message: `Failed to pause ingestion: ${getErrorMessage(error)}`, + }) + } +} + +// API endpoint to resume a failed or cancelled ingestion +// Extracts stored parameters from metadata and restarts processing +// Continues from where it left off using stored state +export const ResumeIngestionApi = async (c: Context) => { + const { sub } = c.get(JwtPayloadKey) + // @ts-ignore - Payload is validated at the endpoint declaration level in sync-server using zValidator + const payload = c.req.valid("json") as { ingestionId: string } + + try { + // Validate user authentication + const userRes = await getUserByEmail(db, sub) + if (!userRes || !userRes.length) { + throw new HTTPException(404, { message: "User not found" }) + } + const [user] = userRes + + // Find the ingestion record to resume + const ingestion = await getIngestionById(db, parseInt(payload.ingestionId)) + if (!ingestion) { + throw new HTTPException(404, { message: "Ingestion not found" }) + } + + // Security check - ensure user owns this ingestion + if (ingestion.userId !== user.id) { + throw new HTTPException(403, { message: "Access denied" }) + } + + // Business rule - can only resume failed, cancelled, or paused ingestions + if (!["failed", "cancelled", "paused"].includes(ingestion.status)) { + throw new HTTPException(400, { + message: "Can only resume failed, cancelled, or paused ingestions", + }) + } + + // Get connector details for resuming + const connector = await getConnector(db, ingestion.connectorId) + if (!connector) { + throw new HTTPException(404, { message: "Connector not found" }) + } + + // SECURITY: Additional check - ensure connector belongs to current user/workspace (defense in depth) + if (connector.userId !== user.id || connector.workspaceId !== user.workspaceId) { + throw new HTTPException(403, { message: "Forbidden: connector does not belong to you" }) + } + + // Extract original ingestion parameters from stored metadata + // This is key to resumability - all state is preserved in metadata + const metadata = ingestion.metadata as any + if (!metadata?.slack?.ingestionState) { + throw new HTTPException(400, { + message: "Ingestion metadata not found or invalid", + }) + } + + const state = metadata.slack.ingestionState + const channelsToIngest = state.channelsToIngest + const startDate = state.startDate + const endDate = state.endDate + const includeBotMessage = state.includeBotMessage || false + + // Validate the stored parameters are still valid + if (!channelsToIngest || !Array.isArray(channelsToIngest)) { + throw new HTTPException(400, { + message: "Invalid channels data in ingestion metadata", + }) + } + + // Reset status to pending to prepare for processing + await updateIngestionStatus(db, parseInt(payload.ingestionId), "pending") + + // Restart the ingestion with the original parameters + // The processing function will detect the existing state and resume + handleSlackChannelIngestion( + connector.id, + channelsToIngest, + startDate, + endDate, + sub, + includeBotMessage, + parseInt(payload.ingestionId) + ).catch((error) => { + loggerWithChild({ email: sub }).error( + error, + `Background Slack channel ingestion resume failed for ingestion ${payload.ingestionId}: ${getErrorMessage(error)}` + ) + }) + + return c.json({ + success: true, + message: "Ingestion resumed successfully", + ingestionId: parseInt(payload.ingestionId), + }) + } catch (error) { + loggerWithChild({ email: sub }).error(error, "Failed to resume ingestion") + if (error instanceof HTTPException) throw error + throw new HTTPException(500, { + message: `Failed to resume ingestion: ${getErrorMessage(error)}`, + }) + } +} + diff --git a/server/db/ingestion.ts b/server/db/ingestion.ts new file mode 100644 index 000000000..5e30e4694 --- /dev/null +++ b/server/db/ingestion.ts @@ -0,0 +1,144 @@ +// Database functions for managing resumable Slack channel ingestion records +// Provides CRUD operations and business logic for the ingestions table + +import { eq, and, sql } from "drizzle-orm" +import { + ingestions, + type InsertIngestion, + type SelectIngestion, + type IngestionStatus +} from "./schema/ingestions" +import type { TxnOrClient } from "@/types" + +// Creates a new ingestion record in the database +// Returns the created record with generated ID and timestamps +export const createIngestion = async ( + txn: TxnOrClient, + data: InsertIngestion +): Promise => { + const [result] = await txn + .insert(ingestions) + .values(data) + .returning() + return result as SelectIngestion +} + +// Finds any active (in_progress, paused, or failed) ingestion for a specific user and connector +// Used to check if an ingestion is already running before starting a new one +// Returns null if no active ingestion exists +export const getActiveIngestionForUser = async ( + txn: TxnOrClient, + userId: number, + connectorId: number +): Promise => { + const result = await txn + .select() + .from(ingestions) + .where( + and( + eq(ingestions.userId, userId), + eq(ingestions.connectorId, connectorId), + sql`status IN ('in_progress', 'paused', 'failed')` + ) + ) + .limit(1) + + return (result[0] as SelectIngestion) || null +} + +// Updates the status of an ingestion with automatic timestamp management +// Handles setting startedAt, completedAt, and error messages based on status +// Used throughout the ingestion lifecycle to track progress +export const updateIngestionStatus = async ( + txn: TxnOrClient, + ingestionId: number, + status: IngestionStatus, + errorMessage?: string +): Promise => { + const updateData: any = { + status, + updatedAt: sql`NOW()`, + } + + // Set startedAt timestamp when ingestion begins processing + if (status === "in_progress" && !errorMessage) { + updateData.startedAt = sql`NOW()` + } + + // Set completedAt timestamp when ingestion finishes (any final state) + if (status === "completed" || status === "failed" || status === "cancelled") { + updateData.completedAt = sql`NOW()` + } + + // Store error message for failed ingestions + if (errorMessage) { + updateData.errorMessage = errorMessage + } + + const [result] = await txn + .update(ingestions) + .set(updateData) + .where(eq(ingestions.id, ingestionId)) + .returning() + + return result as SelectIngestion +} + +// Updates the metadata field of an ingestion record +// Used to store progress, WebSocket data, and resumability state +// Called frequently during ingestion to persist current state +export const updateIngestionMetadata = async ( + txn: TxnOrClient, + ingestionId: number, + metadata: any +): Promise => { + const [result] = await txn + .update(ingestions) + .set({ + metadata, + updatedAt: sql`NOW()`, + }) + .where(eq(ingestions.id, ingestionId)) + .returning() + + return result as SelectIngestion +} + +// Retrieves a specific ingestion record by its ID +// Used for loading ingestion details for resume, cancel, and status operations +// Returns null if ingestion doesn't exist +export const getIngestionById = async ( + txn: TxnOrClient, + ingestionId: number +): Promise => { + const result = await txn + .select() + .from(ingestions) + .where(eq(ingestions.id, ingestionId)) + .limit(1) + + return (result[0] as SelectIngestion) || null +} + + +// Fast check to see if user has any active ingestions for a connector +// Used to prevent concurrent ingestions and enforce business rules +// Returns boolean - true if any pending, in_progress, paused, or failed ingestion exists +export const hasActiveIngestion = async ( + txn: TxnOrClient, + userId: number, + connectorId: number +): Promise => { + const result = await txn + .select({ count: sql`count(*)` }) + .from(ingestions) + .where( + and( + eq(ingestions.userId, userId), + eq(ingestions.connectorId, connectorId), + sql`status IN ('pending', 'in_progress', 'paused', 'failed')` + ) + ) + + return Number(result[0].count) > 0 +} \ No newline at end of file diff --git a/server/db/schema/index.ts b/server/db/schema/index.ts index bbc332c1c..5e5d9fc20 100644 --- a/server/db/schema/index.ts +++ b/server/db/schema/index.ts @@ -17,6 +17,7 @@ export * from "@/db/schema/knowledgeBase" export * from "@/db/schema/apiKey" export * from "@/db/schema/workflows" export * from "@/db/schema/calls" +export * from "@/db/schema/ingestions" // Export combined types import type { PublicUser, SelectUser } from "@/db/schema/users" diff --git a/server/db/schema/ingestions.ts b/server/db/schema/ingestions.ts new file mode 100644 index 000000000..ffa1d28d3 --- /dev/null +++ b/server/db/schema/ingestions.ts @@ -0,0 +1,155 @@ +// Database schema for resumable Slack channel ingestion system +// This file defines the ingestions table and related types for managing +// long-running ingestion processes with resumability support + +import { sql } from "drizzle-orm" +import { + serial, + pgTable, + text, + integer, + timestamp, + jsonb, + pgEnum, +} from "drizzle-orm/pg-core" +import { createInsertSchema, createSelectSchema } from "drizzle-zod" +import { z } from "zod" +import { workspaces } from "./workspaces" +import { users } from "./users" +import { connectors } from "./connectors" + +// Enum defining all possible states of an ingestion process +// - pending: Ingestion created but not yet started +// - in_progress: Currently running ingestion +// - paused: Ingestion temporarily paused by user +// - completed: Successfully finished ingestion +// - failed: Ingestion stopped due to error +// - cancelled: User manually cancelled the ingestion +export const ingestionStatusEnum = pgEnum("ingestion_status", [ + "pending", + "in_progress", + "paused", + "completed", + "failed", + "cancelled" +]) + +// Main ingestions table that tracks all ingestion processes +// Each row represents one ingestion job with its current state and progress +export const ingestions = pgTable( + "ingestions", + { + // Primary key for the ingestion record + id: serial("id").notNull().primaryKey(), + + // Foreign key to users table - identifies who started this ingestion + userId: integer("user_id") + .notNull() + .references(() => users.id), + + // Foreign key to connectors table - identifies which Slack workspace + connectorId: integer("connector_id") + .notNull() + .references(() => connectors.id), + + // Foreign key to workspaces table - identifies the workspace context + workspaceId: integer("workspace_id") + .notNull() + .references(() => workspaces.id), + + // Current status of the ingestion process (see enum above) + status: ingestionStatusEnum("status").notNull().default("pending"), + + // JSONB field storing all ingestion state and progress data + // Contains WebSocket data, channel progress, resumability state, etc. + metadata: jsonb("metadata").notNull().default(sql`'{}'::jsonb`), + + // Optional error message when ingestion fails + errorMessage: text("error_message"), + + // Timestamp when ingestion actually started processing + startedAt: timestamp("started_at", { withTimezone: true }), + + // Timestamp when ingestion finished (completed, failed, or cancelled) + completedAt: timestamp("completed_at", { withTimezone: true }), + + // Timestamp when ingestion record was created + createdAt: timestamp("created_at", { withTimezone: true }) + .notNull() + .default(sql`NOW()`), + + // Timestamp when ingestion record was last updated + updatedAt: timestamp("updated_at", { withTimezone: true }) + .notNull() + .default(sql`NOW()`), + }, + // No unique constraints - application logic will handle preventing concurrent active ingestions +) + +// Zod schema defining the structure of Slack-specific metadata stored in JSONB +// This contains all the data needed for resumability and WebSocket communication +export const slackIngestionMetadataSchema = z.object({ + // Data sent over WebSocket to frontend for real-time progress updates + websocketData: z.object({ + // Connector ID for WebSocket message routing + connectorId: z.string(), + // Currently processing channel ID (for display) + currentChannelId: z.string().optional(), + // Progress information shown to user + progress: z.object({ + totalChannels: z.number().optional(), // Total channels to process + processedChannels: z.number().optional(), // Channels completed so far + currentChannel: z.string().optional(), // Name of current channel + totalMessages: z.number().optional(), // Total messages found + processedMessages: z.number().optional(), // Messages processed so far + }).optional(), + }), + + // Internal state data used for resuming interrupted ingestions + ingestionState: z.object({ + // Current channel being processed (for resumability) + currentChannelId: z.string().optional(), + // Last message timestamp processed (for resumability) + lastMessageTs: z.string().optional(), + // When this state was last updated + lastUpdated: z.string(), + + // Original ingestion parameters (needed for resuming) + channelsToIngest: z.array(z.string()).optional(), // List of channel IDs to process + startDate: z.string().optional(), // Date range start + endDate: z.string().optional(), // Date range end + includeBotMessage: z.boolean().optional(), // Whether to include bot messages + currentChannelIndex: z.number().optional(), // Index in channelsToIngest array + }).optional(), +}) + +// Generic metadata schema that can support different integration types +// Currently only supports Slack, but extensible for future integrations +export const ingestionMetadataSchema = z.object({ + slack: slackIngestionMetadataSchema.optional(), +}) + +// Zod schemas for type-safe database operations +// These provide runtime validation and TypeScript types + +// Schema for inserting new ingestion records +export const insertIngestionSchema = createInsertSchema(ingestions, { + metadata: ingestionMetadataSchema.optional(), +}) + +// Schema for selecting/reading ingestion records +export const selectIngestionSchema = createSelectSchema(ingestions, { + metadata: ingestionMetadataSchema.optional(), +}) + +// TypeScript types exported for use throughout the application +export type InsertIngestion = z.infer +export type SelectIngestion = z.infer +export type IngestionStatus = + | "pending" + | "in_progress" + | "paused" + | "completed" + | "failed" + | "cancelled" +export type SlackIngestionMetadata = z.infer \ No newline at end of file diff --git a/server/integrations/slack/channelIngest.ts b/server/integrations/slack/channelIngest.ts index a3e909e01..5a65ea60e 100644 --- a/server/integrations/slack/channelIngest.ts +++ b/server/integrations/slack/channelIngest.ts @@ -1,3 +1,7 @@ +// Resumable Slack channel ingestion implementation +// Provides full resumability with progress tracking and WebSocket communication +// Supports stopping and starting from the exact point where ingestion was interrupted + import { getOAuthConnectorWithCredentials } from "@/db/connector" import { connectors, @@ -49,7 +53,7 @@ import type { Team } from "@slack/web-api/dist/types/response/TeamInfoResponse" import type { User } from "@slack/web-api/dist/types/response/UsersInfoResponse" import { count, eq } from "drizzle-orm" import { StatType, Tracker } from "@/integrations/tracker" -import { sendWebsocketMessage } from "../metricStream" +// Removed WebSocket import - now using database-only approach for progress updates import { AuthType, ConnectorStatus, @@ -452,8 +456,18 @@ export async function insertChannelMessages( startDate: string, endDate: string, includeBotMessages: boolean = false, + messageCounters?: { + totalMessages: { value: number } + processedMessages: { value: number } + }, + checkCancellationOrPause?: () => Promise<{ + shouldStop: boolean + isPaused: boolean + }>, + onLastTimestampUpdate?: (timestamp: string) => void, ): Promise { let cursor: string | undefined = undefined + let lastProcessedTimestamp: string = timestamp let replyCount = 0 @@ -481,16 +495,53 @@ export async function insertChannelMessages( { conversation_id: channelId ?? "", email: email }, response.messages?.length, ) + + // Update total message count + if (messageCounters) { + const previousTotal = messageCounters.totalMessages.value + messageCounters.totalMessages.value += response.messages.length + loggerWithChild({ email }).info( + `Message counter update: added ${response.messages.length} messages, total now ${messageCounters.totalMessages.value} (was ${previousTotal})`, + ) + } + + let messageIndex = 0 for (const message of response.messages as (SlackMessage & { mentions: string[] })[]) { + messageIndex++ + // Processing message ${messageIndex}/${response.messages.length} + // Check for pause/cancellation/deletion before processing each message + // Check for pause/cancellation/deletion before processing each message + if (checkCancellationOrPause) { + try { + // Check cancellation status + const { shouldStop } = await checkCancellationOrPause() + // Cancellation check complete + if (shouldStop) { + loggerWithChild({ email }).info( + `Message processing stopped due to pause/cancellation/deletion in channel ${channelId}`, + ) + return + } + } catch (error) { + loggerWithChild({ email }).error( + `❌ Error in checkCancellationOrPause for message ${messageIndex}: ${error}`, + ) + throw error + } + } + // replace user id with username + // Extract mentions const mentions = extractUserIdsFromBlocks(message) + // Found ${mentions.length} mentions let text = message.text if (mentions.length) { for (const m of mentions) { if (!memberMap.get(m)) { + // Fetching user info for mention memberMap.set( m, ( @@ -499,6 +550,7 @@ export async function insertChannelMessages( }) ).user!, ) + // Successfully fetched user info } text = text?.replace( `<@${m}>`, @@ -506,8 +558,12 @@ export async function insertChannelMessages( ) } } + loggerWithChild({ email }).info( + `πŸ”„ Formatting special mentions for message ${messageIndex}`, + ) text = formatSlackSpecialMentions(text, channelMap, channelId) message.text = text + // Completed text processing // Add the top-level message // Check if message should be processed based on includeBotMessages flag const isRegularMessage = @@ -527,6 +583,7 @@ export async function insertChannelMessages( hasTextOrBlocks if (isRegularMessage || isBotMessage) { + // Insert message ${messageIndex} message.mentions = mentions message.team = await getTeam(client, message) @@ -611,11 +668,27 @@ export async function insertChannelMessages( `Error inserting chat message`, ) } + + // Update processed message count + if (messageCounters) { + messageCounters.processedMessages.value += 1 + // Successfully processed message + } + tracker.updateUserStats(email, StatType.Slack_Message, 1) } else { + // Skipping message (subtype: ${message.subtype}) subtypes.add(message.subtype) } + // Update last processed timestamp for resumption + lastProcessedTimestamp = message.ts! + if (onLastTimestampUpdate) { + onLastTimestampUpdate(lastProcessedTimestamp) + } + + // Completed processing message ${messageIndex} + // If the message is a thread parent (its thread_ts equals its ts) and it has replies, fetch them. if ( message.thread_ts && @@ -638,6 +711,15 @@ export async function insertChannelMessages( // Exclude the parent message (already added) const replies: (SlackMessage & { mentions?: string[] })[] = threadMessages.filter((msg) => msg.ts !== message.ts) + + // Add thread replies to total message count + if (messageCounters && replies.length > 0) { + const previousTotal = messageCounters.totalMessages.value + messageCounters.totalMessages.value += replies.length + loggerWithChild({ email }).info( + `Thread replies counter update: added ${replies.length} replies, total now ${messageCounters.totalMessages.value} (was ${previousTotal})`, + ) + } for (const reply of replies) { // Check if reply should be processed based on includeBotMessages flag const isRegularReply = @@ -763,6 +845,15 @@ export async function insertChannelMessages( `Error inserting chat message`, ) } + + // Update processed message count for replies + if (messageCounters) { + messageCounters.processedMessages.value += 1 + loggerWithChild({ email }).info( + `Processed reply counter: ${messageCounters.processedMessages.value}`, + ) + } + tracker.updateUserStats(email, StatType.Slack_Message_Reply, 1) } else { subtypes.add(reply.subtype) @@ -1039,9 +1130,7 @@ export const insertChatMessage = async ( channelId, ) attachmentIds.push(file.id!) - Logger.info( - `Inserted attachment ${file.id} for message ${message.client_msg_id}`, - ) + // Successfully inserted attachment } catch (error) { Logger.error(`Error inserting attachment ${file.id}: ${error}`) } @@ -1119,6 +1208,9 @@ export const insertMember = async (member: Member) => { ) } +// Main function for resumable Slack channel ingestion +// Processes specified channels with full resumability support +// Tracks progress and communicates via WebSocket for real-time updates export const handleSlackChannelIngestion = async ( connectorId: number, channelsToIngest: string[], @@ -1126,14 +1218,92 @@ export const handleSlackChannelIngestion = async ( endDate: string, email: string, includeBotMessages: boolean = false, + ingestionId?: number, // Optional for resumability tracking ) => { + let ingestionRecord: any = null + let interval: ReturnType | undefined try { const abortController = new AbortController() + + // Function to check if ingestion has been cancelled, paused, or deleted + const checkCancellationOrPause = async (): Promise<{ + shouldStop: boolean + isPaused: boolean + }> => { + if (!ingestionId) return { shouldStop: false, isPaused: false } + try { + const { getIngestionById } = await import("@/db/ingestion") + const currentIngestion = await getIngestionById(db, ingestionId) + if (!currentIngestion) { + // Ingestion was deleted + return { shouldStop: true, isPaused: false } + } + const isCancelled = currentIngestion?.status === "cancelled" + const isPaused = currentIngestion?.status === "paused" + const isFailed = currentIngestion?.status === "failed" + const isCompleted = currentIngestion?.status === "completed" + const shouldStop = isCancelled || isPaused || isFailed || isCompleted + loggerWithChild({ email }).info( + `Status check: ingestionId=${ingestionId}, status=${currentIngestion?.status}, isCancelled=${isCancelled}, isPaused=${isPaused}`, + ) + return { shouldStop, isPaused } + } catch (error) { + loggerWithChild({ email }).warn( + "Failed to check ingestion status:", + error, + ) + return { shouldStop: false, isPaused: false } + } + } const connector: SelectConnector = await getOAuthConnectorWithCredentials( db, connectorId, ) + // Import ingestion database functions for progress tracking + const { updateIngestionStatus, updateIngestionMetadata, getIngestionById } = + await import("@/db/ingestion") + + // Initialize resumability variables with defaults + let resumeFromChannelIndex = 0 + let existingChannelsToIngest = channelsToIngest + let existingStartDate = startDate + let existingEndDate = endDate + let existingIncludeBotMessages = includeBotMessages + + if (ingestionId) { + // Check if this is a resume operation by examining existing state + const existingIngestion = await getIngestionById(db, ingestionId) + if (existingIngestion && existingIngestion.status === "pending") { + // Extract resumability state from stored metadata + // This is the key to resuming from exactly where we left off + const metadata = existingIngestion.metadata as any + if (metadata?.slack?.ingestionState) { + const state = metadata.slack.ingestionState + resumeFromChannelIndex = state.currentChannelIndex || 0 // Resume from this channel + existingChannelsToIngest = state.channelsToIngest || channelsToIngest + existingStartDate = state.startDate || startDate + existingEndDate = state.endDate || endDate + existingIncludeBotMessages = + state.includeBotMessage ?? includeBotMessages + + loggerWithChild({ email }).info( + `Resuming Slack channel ingestion from channel index ${resumeFromChannelIndex} of ${existingChannelsToIngest.length}`, + ) + } + } + + // Update database status to indicate active processing + ingestionRecord = await updateIngestionStatus( + db, + ingestionId, + "in_progress", + ) + loggerWithChild({ email }).info( + `Started Slack channel ingestion with ID: ${ingestionId}`, + ) + } + const { accessToken } = connector.oauthCredentials as { accessToken: string } @@ -1143,21 +1313,138 @@ export const handleSlackChannelIngestion = async ( const tracker = new Tracker(Apps.Slack, AuthType.OAuth) const team = await safeGetTeamInfo(client) const channelMap = new Map() - const interval = setInterval(() => { - sendWebsocketMessage( - JSON.stringify({ - IngestionType: IngestionType.partialIngestion, - progress: tracker.getProgress(), - userStats: tracker.getOAuthProgress().userStats, - startTime: tracker.getStartTime(), - }), - connector?.externalId, + + let processedChannels = resumeFromChannelIndex + let globalLastMessageTimestamp = "0" // Track last processed message timestamp globally + + // Restore last message timestamp if resuming + if ( + ingestionRecord?.metadata?.slack?.ingestionState?.lastMessageTimestamp + ) { + globalLastMessageTimestamp = + ingestionRecord.metadata.slack.ingestionState.lastMessageTimestamp + loggerWithChild({ email }).info( + `πŸ“ Restored global last message timestamp: ${globalLastMessageTimestamp}`, ) - }, 4000) + } + + // Initialize message counters - restore from saved metadata if resuming + let savedMessageCounters = { totalMessages: 0, processedMessages: 0 } + if (ingestionRecord?.metadata?.slack?.websocketData?.progress) { + savedMessageCounters = { + totalMessages: + ingestionRecord.metadata.slack.websocketData.progress.totalMessages || + 0, + processedMessages: + ingestionRecord.metadata.slack.websocketData.progress + .processedMessages || 0, + } + loggerWithChild({ email }).info( + `πŸ“Š Resuming with saved counters: total=${savedMessageCounters.totalMessages}, processed=${savedMessageCounters.processedMessages}`, + ) + } else { + loggerWithChild({ email }).info( + `πŸ“Š Starting fresh counters: total=0, processed=0`, + ) + } + + const messageCounters = { + totalMessages: { value: savedMessageCounters.totalMessages }, + processedMessages: { value: savedMessageCounters.processedMessages }, + } + + // Set up periodic progress updates every 4 seconds + // This ensures real-time frontend updates and persistent resumability state + interval = setInterval(async () => { + loggerWithChild({ email }).info( + `Periodic check running for ingestion ${ingestionId}`, + ) + loggerWithChild({ email }).info( + `Message counters: total=${messageCounters.totalMessages.value}, processed=${messageCounters.processedMessages.value}`, + ) + // Check for cancellation/pause and abort if requested + const { shouldStop, isPaused } = await checkCancellationOrPause() + if (shouldStop) { + if (isPaused) { + loggerWithChild({ email }).info( + `Ingestion ${ingestionId} was paused, stopping process`, + ) + } else { + // Get the actual status to log the correct message + if (ingestionId !== undefined) { + const { getIngestionById } = await import("@/db/ingestion") + const currentIngestion = await getIngestionById(db, ingestionId) + const status = currentIngestion?.status || "unknown" + loggerWithChild({ email }).info( + `Ingestion ${ingestionId} was ${status}, stopping process`, + ) + } else { + loggerWithChild({ email }).info( + `Ingestion was cancelled/stopped, stopping process`, + ) + } + } + abortController.abort() + clearInterval(interval) + return + } + const progressData = { + IngestionType: IngestionType.partialIngestion, + progress: tracker.getProgress(), + userStats: tracker.getOAuthProgress().userStats, + startTime: tracker.getStartTime(), + // Enhanced progress information for resumable ingestion UI + channelProgress: { + totalChannels: existingChannelsToIngest.length, + processedChannels, + currentChannel: + channelMap.get(existingChannelsToIngest[processedChannels]) || "", + totalMessages: messageCounters.totalMessages.value, + processedMessages: messageCounters.processedMessages.value, + }, + ingestionId, + } + + // Progress updates now handled via database polling - no WebSocket needed + // Frontend will get this data when it polls /api/ingestion/status + // sendWebsocketMessage call removed - using database-only approach + + // Persist current state to database for resumability + // Critical: this allows resuming from exact same point if interrupted + if (ingestionId && ingestionRecord) { + try { + await updateIngestionMetadata(db, ingestionId, { + slack: { + // Data sent to frontend for progress display + websocketData: { + connectorId: connector.externalId, + progress: progressData.channelProgress, + }, + // Internal state data for resuming interrupted ingestions + ingestionState: { + currentChannelId: existingChannelsToIngest[processedChannels], + channelsToIngest: existingChannelsToIngest, + startDate: existingStartDate, + endDate: existingEndDate, + includeBotMessage: existingIncludeBotMessages, + currentChannelIndex: processedChannels, // Key for resumability + lastMessageTimestamp: globalLastMessageTimestamp, // Key for message-level resumability + lastUpdated: new Date().toISOString(), + }, + }, + }) + } catch (metadataError) { + loggerWithChild({ email }).warn( + "Failed to update ingestion metadata:", + metadataError, + ) + } + } + }, 4000) // Update every 4 seconds for good UX without overwhelming the system const conversations: Channel[] = [] - for (const channel in channelsToIngest) { - const channelId = channelsToIngest[channel] // Get the channel ID string using the index from the for...in loop + for (const channel in existingChannelsToIngest) { + const channelId = existingChannelsToIngest[channel] // Get the channel ID string using the index from the for...in loop try { const response = await client.conversations.info({ channel: channelId }) if (response.ok && response.channel) { @@ -1190,54 +1477,163 @@ export const handleSlackChannelIngestion = async ( chatContainerSchema, conversations.map((c) => c.id!), ) - const conversationsToInsert = conversations - // .filter( - // (conversation) => - // (existenceMap[conversation.id!] && - // !existenceMap[conversation.id!].exists) || - // !existenceMap[conversation.id!], - // ) + const conversationsToInsert = conversations.filter( + (conversation) => + (existenceMap[conversation.id!] && + !existenceMap[conversation.id!].exists) || + !existenceMap[conversation.id!], + ) + + // Fix: Build conversationsToProcess from original ordering to prevent skipping channels + // resumeFromChannelIndex refers to position in existingChannelsToIngest, not conversationsToInsert + const channelsToResume = existingChannelsToIngest.slice( + resumeFromChannelIndex, + ) + const conversationsToProcess = channelsToResume + .map((channelId) => + conversationsToInsert.find((conv) => conv.id === channelId), + ) + .filter(Boolean) as typeof conversationsToInsert + + loggerWithChild({ email }).info( + `Processing ${conversationsToProcess.length} channels (skipping ${resumeFromChannelIndex} already processed)`, + ) + loggerWithChild({ email: email }).info( - `conversations to insert ${conversationsToInsert.length} and skipping ${conversations.length - conversationsToInsert.length}`, + `conversations to insert ${conversationsToProcess.length} (skipping ${resumeFromChannelIndex} already processed) and skipping ${conversations.length - conversationsToInsert.length} existing`, ) + // Fix: Correct skip count calculation - don't double-count resume offset + // conversations.length - conversationsToInsert.length = already ingested + // resumeFromChannelIndex = channels processed in previous runs totalConversationsSkipped.inc( { team_id: team.id ?? team.name ?? "", email: email }, - conversations.length - conversationsToInsert.length, + conversations.length - + conversationsToInsert.length + + resumeFromChannelIndex, ) const user = await getAuthenticatedUserId(client) const teamMap = new Map() teamMap.set(team.id!, team) const memberMap = new Map() - tracker.setCurrent(0) + tracker.setCurrent(resumeFromChannelIndex) tracker.setTotal(conversationsToInsert.length) - let conversationIndex = 0 + let conversationIndex = resumeFromChannelIndex totalConversationsToBeInserted.inc( { team_id: team.id ?? team.name ?? "", email: email }, conversationsToInsert.length, ) // can be done concurrently, but can cause issues with ratelimits - for (const conversation of conversationsToInsert) { + for (const conversation of conversationsToProcess) { + // Update conversationIndex to match position in existingChannelsToIngest + conversationIndex = existingChannelsToIngest.indexOf(conversation.id!) + loggerWithChild({ email }).info( + `Processing channel ${conversationIndex + 1}/${existingChannelsToIngest.length}: ${conversation.name}`, + ) + + // Check for cancellation/pause before processing each conversation + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Conversation processing aborted for ingestion ${ingestionId}`, + ) + return + } + + // Update processedChannels for progress tracking + processedChannels = conversationIndex + loggerWithChild({ email }).info( + `Updated processedChannels to ${processedChannels} for conversation ${conversation.name}`, + ) + + loggerWithChild({ email }).info( + `Starting member fetching for conversation ${conversation.name}`, + ) const memberIds = await getConversationUsers( user, client, conversation, email, ) + loggerWithChild({ email }).info( + `Found ${memberIds.length} member IDs for conversation ${conversation.name}`, + ) + const membersToFetch = memberIds.filter((m: string) => !memberMap.get(m)) + loggerWithChild({ email }).info( + `Need to fetch ${membersToFetch.length} new members for ${conversation.name}`, + ) + const concurrencyLimit = pLimit(5) const memberPromises = membersToFetch.map((memberId: string) => - concurrencyLimit(() => client.users.info({ user: memberId })), + concurrencyLimit(async () => { + // Check abort signal before each individual API call + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Aborting member fetch due to cancellation`, + ) + return null // Return null to signal abort without error + } + return client.users.info({ user: memberId }) + }), + ) + + loggerWithChild({ email }).info( + `Starting member fetch for ${memberPromises.length} members`, ) - const members: User[] = (await Promise.all(memberPromises)) - .map((userResp) => { - if (userResp.user) { - memberMap.set(userResp.user.id!, userResp.user) - return userResp.user as User + + // Check for abort before starting member fetching + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Member fetching aborted for ingestion ${ingestionId}`, + ) + return + } + + // Use Promise.allSettled to handle individual promise failures due to abort + const memberResults = await Promise.allSettled(memberPromises) + const members: User[] = memberResults + .filter( + (result): result is PromiseFulfilledResult => + result.status === "fulfilled", + ) + .map((result) => { + // Handle null values from aborted calls + if (result.value?.user && result.value) { + memberMap.set(result.value.user.id!, result.value.user) + return result.value.user as User } + return undefined }) .filter((user) => !!user) - // check if already exists + + // Check if we were aborted during member fetching + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Member fetching aborted during processing for ingestion ${ingestionId}`, + ) + return + } + loggerWithChild({ email }).info( + `πŸ‘₯ [CHANNEL ${conversationIndex + 1}] Completed member fetching, got ${members.length} valid members for ${conversation.name} (ID: ${conversation.id})`, + ) + + // Check for abort after member fetching completes + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Aborting after member fetching completed for ingestion ${ingestionId}`, + ) + return + } + let memberIndex = 0 for (const member of members) { + memberIndex++ + // Check for abort during member processing + if (abortController.signal.aborted) { + loggerWithChild({ email }).info( + `Member processing aborted for ingestion ${ingestionId}`, + ) + return + } + // team first time encountering if (!teamMap.get(member.team_id!)) { const teamResp: TeamInfoResponse = await client.team.info({ @@ -1255,7 +1651,7 @@ export const handleSlackChannelIngestion = async ( } catch (error) { loggerWithChild({ email: email ?? "" }).error( error, - `Error inserting member`, + `Error inserting team`, ) ingestedTeamErrorTotalCount.inc({ email_domain: teamResp.team!.email_domain, @@ -1305,6 +1701,33 @@ export const handleSlackChannelIngestion = async ( status: OperationStatus.Success, }) try { + // Get last processed message timestamp for resumption + let resumeTimestamp = "0" // Default to start from beginning + if ( + ingestionRecord?.metadata?.slack?.ingestionState + ?.lastMessageTimestamp && + conversationIndex === resumeFromChannelIndex + ) { + resumeTimestamp = + ingestionRecord.metadata.slack.ingestionState.lastMessageTimestamp + loggerWithChild({ email }).info( + `πŸ“ Resuming channel ${conversation.name} from timestamp: ${resumeTimestamp}`, + ) + } else { + loggerWithChild({ email }).info( + `πŸ“ Starting channel ${conversation.name} from beginning (timestamp: 0)`, + ) + } + + // Track the last processed message timestamp for resumption + let currentLastTimestamp = resumeTimestamp + + loggerWithChild({ email }).info( + `πŸ“¨ [CHANNEL ${conversationIndex + 1}] About to call insertChannelMessages for conversation ${conversation.name} (ID: ${conversation.id})`, + ) + loggerWithChild({ email }).info( + `πŸ“Š [CHANNEL ${conversationIndex + 1}] Current message counters before processing: total=${messageCounters.totalMessages.value}, processed=${messageCounters.processedMessages.value}`, + ) await insertChannelMessages( email, client, @@ -1312,11 +1735,27 @@ export const handleSlackChannelIngestion = async ( abortController, memberMap, tracker, - "0", + resumeTimestamp, channelMap, - startDate, - endDate, - includeBotMessages, + existingStartDate, + existingEndDate, + existingIncludeBotMessages, + messageCounters, + checkCancellationOrPause, + (timestamp: string) => { + // Update the last processed timestamp for this channel + currentLastTimestamp = timestamp + globalLastMessageTimestamp = timestamp // Update global variable for metadata storage + loggerWithChild({ email }).info( + `πŸ“ [CHANNEL ${conversationIndex + 1}] Updated last processed timestamp: ${timestamp} for ${conversation.name}`, + ) + }, + ) + loggerWithChild({ email }).info( + `βœ… [CHANNEL ${conversationIndex + 1}] Completed insertChannelMessages for conversation ${conversation.name} (ID: ${conversation.id})`, + ) + loggerWithChild({ email }).info( + `πŸ“Š [CHANNEL ${conversationIndex + 1}] Message counters after processing ${conversation.name}: total=${messageCounters.totalMessages.value}, processed=${messageCounters.processedMessages.value}`, ) channelMessageInsertionDuration() insertChannelMessagesCount.inc({ @@ -1328,7 +1767,7 @@ export const handleSlackChannelIngestion = async ( tracker.updateUserStats(email, StatType.Slack_Conversation, 1) } catch (error) { loggerWithChild({ email: email }).error( - "Error inserting Channel Messages", + `Error inserting Channel Messages for ${conversation.name}: ${error}`, ) insertChannelMessagesErrorCount.inc({ conversation_id: conversation.id ?? "", @@ -1352,8 +1791,10 @@ export const handleSlackChannelIngestion = async ( status: OperationStatus.Success, email: email, }) - conversationIndex++ tracker.setCurrent(conversationIndex) + loggerWithChild({ email }).info( + `Completed processing conversation ${conversation.name} (${conversationIndex + 1}/${existingChannelsToIngest.length})`, + ) } catch (error) { loggerWithChild({ email: email }).error(`Error inserting Conversation`) insertConversationErrorCount.inc({ @@ -1364,10 +1805,32 @@ export const handleSlackChannelIngestion = async ( }) } } - setTimeout(() => { - clearInterval(interval) - }, 8000) + loggerWithChild({ email }).info( + `Successfully processed all ${conversationsToInsert.length} channels. Total messages: ${messageCounters.totalMessages.value}, Processed: ${messageCounters.processedMessages.value}`, + ) + + // Check if ingestion was actually completed or just paused/cancelled + const finalStatus = await checkCancellationOrPause() + loggerWithChild({ email }).info( + `πŸ” Final status check: shouldStop=${finalStatus.shouldStop}, isPaused=${finalStatus.isPaused}`, + ) + + if (finalStatus.shouldStop && finalStatus.isPaused) { + loggerWithChild({ email }).info( + `⏸️ Ingestion was paused - keeping status as 'paused', not completing`, + ) + return // Exit without marking as completed + } + + if (finalStatus.shouldStop && !finalStatus.isPaused) { + loggerWithChild({ email }).info( + `❌ Ingestion was cancelled - keeping status as 'cancelled', not completing`, + ) + return // Exit without marking as completed + } + + // Update connector status only for actual completion db.transaction(async (trx) => { await trx .update(connectors) @@ -1377,8 +1840,119 @@ export const handleSlackChannelIngestion = async ( }) .where(eq(connectors.id, connector.id)) }) + + // Mark ingestion as successfully completed (only if not paused/cancelled) + loggerWithChild({ email }).info( + `🎯 Reached completion section! About to mark ingestion ${ingestionId} as completed`, + ) + if (ingestionId) { + try { + // Update database status to completed + loggerWithChild({ email }).info( + `πŸ”„ Calling updateIngestionStatus with ingestionId=${ingestionId}, status=completed`, + ) + await updateIngestionStatus(db, ingestionId, "completed") + + // Update metadata with final message counters for frontend display + const finalProgressData = { + totalChannels: channelsToIngest.length, + processedChannels: channelsToIngest.length, // All channels completed + currentChannel: "", // Completed + totalMessages: messageCounters.totalMessages.value, + processedMessages: messageCounters.processedMessages.value, + } + + loggerWithChild({ email }).info( + `πŸ“Š Updating final metadata: total=${finalProgressData.totalMessages}, processed=${finalProgressData.processedMessages}`, + ) + await updateIngestionMetadata(db, ingestionId, { + slack: { + websocketData: { + connectorId: connector.externalId, + progress: finalProgressData, + }, + ingestionState: { + channelsToIngest: existingChannelsToIngest, + currentChannelIndex: existingChannelsToIngest.length, // Completed all + currentChannelId: null, // No current channel + startDate: existingStartDate, + endDate: existingEndDate, + includeBotMessage: existingIncludeBotMessages, + lastUpdated: new Date().toISOString(), + }, + }, + }) + + loggerWithChild({ email }).info( + `βœ… SUCCESS: Completed Slack channel ingestion with ID: ${ingestionId}`, + ) + + // Completion notification now handled via database status + // Frontend will detect completion via polling /api/ingestion/status + loggerWithChild({ email }).info( + `πŸš€ Slack channel ingestion completed - status updated in database for polling detection`, + ) + } catch (completionError) { + loggerWithChild({ email }).error( + "Failed to mark ingestion as completed:", + completionError, + ) + } + } } catch (error) { loggerWithChild({ email: email }).error(error) + + // Handle ingestion failure by updating database and notifying frontend + if (ingestionId) { + try { + const { updateIngestionStatus, getIngestionById } = await import( + "@/db/ingestion" + ) + + // Check current status before overwriting - preserve cancellation/pause states + const currentIngestion = await getIngestionById(db, ingestionId) + const currentStatus = currentIngestion?.status + + // Only mark as failed if not already cancelled, paused, or completed + if ( + currentStatus && + !["cancelled", "paused", "completed"].includes(currentStatus) + ) { + await updateIngestionStatus( + db, + ingestionId, + "failed", + (error as Error).message, + ) + loggerWithChild({ email }).error( + `Failed Slack channel ingestion with ID: ${ingestionId}`, + ) + + // Failure notification now handled via database status + // Frontend will detect failure via polling /api/ingestion/status + loggerWithChild({ email }).error( + `Slack channel ingestion failed - status updated in database for polling detection`, + ) + } else { + loggerWithChild({ email }).info( + `Ingestion ${ingestionId} error occurred but preserving existing status: ${currentStatus}`, + ) + } + } catch (failureError) { + loggerWithChild({ email }).error( + "Failed to mark ingestion as failed:", + failureError, + ) + } + } + } finally { + // Always clear the interval regardless of how the function exits + if (interval) { + clearInterval(interval) + loggerWithChild({ email }).info( + `Cleared periodic progress interval for ingestion ${ingestionId}`, + ) + } } } diff --git a/server/server.ts b/server/server.ts index 5529a0904..41a8d225d 100644 --- a/server/server.ts +++ b/server/server.ts @@ -112,6 +112,12 @@ import { GetUserApiKeys, DeleteUserApiKey, } from "@/api/auth" +import { + getIngestionStatusSchema, + cancelIngestionSchema, + pauseIngestionSchema, + resumeIngestionSchema, +} from "@/api/ingestion" import { SearchWorkspaceUsersApi, searchUsersSchema } from "@/api/users" import { InitiateCallApi, @@ -1168,6 +1174,27 @@ export const AppRoutes = app .post("/google/start_ingestion", (c) => proxyToSyncServer(c, "/google/start_ingestion"), ) + // Ingestion Management APIs - new polling-based approach for Slack channel ingestion + .get( + "/ingestion/status", + zValidator("query", getIngestionStatusSchema), + (c) => proxyToSyncServer(c, "/ingestion/status", "GET"), + ) + .post( + "/ingestion/cancel", + zValidator("json", cancelIngestionSchema), + (c) => proxyToSyncServer(c, "/ingestion/cancel"), + ) + .post( + "/ingestion/pause", + zValidator("json", pauseIngestionSchema), + (c) => proxyToSyncServer(c, "/ingestion/pause"), + ) + .post( + "/ingestion/resume", + zValidator("json", resumeIngestionSchema), + (c) => proxyToSyncServer(c, "/ingestion/resume"), + ) .delete( "/oauth/connector/delete", zValidator("form", deleteConnectorSchema), @@ -1413,7 +1440,7 @@ app .delete("/cl/:clId/items/:itemId", DeleteItemApi) // Delete Item in KB // Proxy function to forward ingestion API calls to sync server -const proxyToSyncServer = async (c: Context, endpoint: string) => { +const proxyToSyncServer = async (c: Context, endpoint: string, method: string = "POST") => { try { // Get JWT token from cookie const token = getCookie(c, AccessTokenCookieName) @@ -1421,21 +1448,36 @@ const proxyToSyncServer = async (c: Context, endpoint: string) => { throw new HTTPException(401, { message: "No authentication token" }) } - // Get request body - const body = await c.req.json() + // Prepare URL - for GET requests, add query parameters + let url = `http://localhost:${config.syncServerPort}${endpoint}` + if (method === "GET") { + const urlObj = new URL(url) + const queryParams = c.req.query() + Object.keys(queryParams).forEach(key => { + if (queryParams[key]) { + urlObj.searchParams.set(key, queryParams[key]) + } + }) + url = urlObj.toString() + } + + // Prepare request configuration + const requestConfig: RequestInit = { + method, + headers: { + "Content-Type": "application/json", + Cookie: `${AccessTokenCookieName}=${token}`, + }, + } + + // Add body for non-GET requests + if (method !== "GET") { + const body = await c.req.json() + requestConfig.body = JSON.stringify(body) + } // Forward to sync server - const response = await fetch( - `http://localhost:${config.syncServerPort}${endpoint}`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - Cookie: `${AccessTokenCookieName}=${token}`, - }, - body: JSON.stringify(body), - }, - ) + const response = await fetch(url, requestConfig) if (!response.ok) { const errorData = await response diff --git a/server/sync-server.ts b/server/sync-server.ts index 1c54ab6fc..f5dc3e5ca 100644 --- a/server/sync-server.ts +++ b/server/sync-server.ts @@ -16,6 +16,16 @@ import { HandlePerUserGoogleWorkSpaceSync, StartGoogleIngestionApi, } from "@/api/admin" +import { + GetIngestionStatusApi, + CancelIngestionApi, + PauseIngestionApi, + ResumeIngestionApi, + getIngestionStatusSchema, + cancelIngestionSchema, + pauseIngestionSchema, + resumeIngestionSchema, +} from "@/api/ingestion" import { ingestMoreChannelSchema, startSlackIngestionSchema, @@ -25,9 +35,9 @@ import { db } from "@/db/client" import { getUserByEmail } from "@/db/user" import type { JwtVariables } from "hono/jwt" import type { Context, Next } from "hono" -import WebSocket from "ws" import { Worker } from "worker_threads" import path from "path" +import WebSocket from "ws" const Logger = getLogger(Subsystem.SyncServer) @@ -36,6 +46,7 @@ const app = new Hono<{ Variables: JwtVariables }>() const honoMiddlewareLogger = LogMiddleware(Subsystem.SyncServer) // WebSocket connection to main server for forwarding stats +// Note: Slack channel ingestion uses database polling, other integrations use WebSocket let mainServerWebSocket: WebSocket | null = null let reconnectAttempts = 0 let reconnectTimeout: ReturnType | null = null @@ -146,6 +157,7 @@ const scheduleReconnect = () => { } // Function to send WebSocket message to main server +// Note: Slack channel ingestion bypasses this and uses database polling instead export const sendWebsocketMessageToMainServer = ( message: string, connectorId: string, @@ -156,6 +168,7 @@ export const sendWebsocketMessageToMainServer = ( ) { try { mainServerWebSocket.send(JSON.stringify({ message, connectorId })) + Logger.debug(`WebSocket message sent for connector ${connectorId}`) } catch (error) { Logger.error( error, @@ -178,6 +191,8 @@ export const sendWebsocketMessageToMainServer = ( } } + + // JWT Authentication middleware const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET! const AccessTokenCookieName = config.AccessTokenCookie @@ -251,6 +266,33 @@ app.post( app.post("/syncSlackByMail", HandlePerUserSlackSync) app.post("/syncGoogleWorkSpaceByMail", HandlePerUserGoogleWorkSpaceSync) +// Ingestion Management APIs +app.get( + "/ingestion/status", + zValidator("query", getIngestionStatusSchema), + GetIngestionStatusApi, +) + +app.post( + "/ingestion/cancel", + zValidator("json", cancelIngestionSchema), + CancelIngestionApi, +) + +app.post( + "/ingestion/pause", + zValidator("json", pauseIngestionSchema), + PauseIngestionApi, +) + +app.post( + "/ingestion/resume", + zValidator("json", resumeIngestionSchema), + ResumeIngestionApi, +) + + + const startAndMonitorWorkers = ( workerScript: string, workerType: string, @@ -362,8 +404,15 @@ export const initSyncServer = async () => { Logger.error(error, "Failed to initialize queue system") }) - // Connect to main server WebSocket - connectToMainServer() + // Connect to main server via WebSocket for progress updates + // Note: Slack channel ingestion uses database polling, other integrations use WebSocket + try { + connectToMainServer() + Logger.info("WebSocket connection to main server initiated") + } catch (error) { + Logger.error(error, "Failed to connect to main server via WebSocket") + } + Logger.info("Sync Server initialization completed") }