Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
765 changes: 666 additions & 99 deletions frontend/src/routes/_authenticated/admin/integrations/slack.tsx

Large diffs are not rendered by default.

94 changes: 86 additions & 8 deletions server/api/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,9 @@ export const StartGoogleIngestionApi = async (c: Context) => {
})
}
}
// API endpoint for starting resumable Slack channel ingestion
// Creates an ingestion record and starts background processing
// Returns immediate response while ingestion runs in sync-server
export const IngestMoreChannelApi = async (c: Context) => {
const { sub } = c.get(JwtPayloadKey)
// @ts-ignore
Expand All @@ -1563,27 +1566,102 @@ export const IngestMoreChannelApi = async (c: Context) => {
}

try {
// Validate user exists and has access
const userRes = await getUserByEmail(db, sub)
if (!userRes || !userRes.length) {
loggerWithChild({ email: sub }).error(
{ sub },
"No user found for sub in IngestMoreChannelApi",
)
throw new NoUserFound({})
}
const [user] = userRes

// Validate connector exists and user has access
const connector = await getConnector(db, payload.connectorId)
if (!connector) {
throw new HTTPException(404, { message: "Connector not found" })
}

// Import ingestion functions for database operations
const { createIngestion, hasActiveIngestion } = await import("@/db/ingestion")

// Prevent concurrent ingestions using database transaction to avoid race conditions
// This atomically checks for active ingestions and creates new one if none exist
const ingestion = await db.transaction(async (trx) => {
const hasActive = await hasActiveIngestion(trx, user.id, connector.id)
if (hasActive) {
throw new HTTPException(409, {
message: "An ingestion is already in progress for this connector. Please wait for it to complete or cancel it first."
})
}

// Create ingestion record with initial metadata for resumability
// All state needed for resuming is stored in the metadata field
return await createIngestion(trx, {
userId: user.id,
connectorId: connector.id,
workspaceId: connector.workspaceId,
status: "pending",
metadata: {
slack: {
// Data sent to frontend via WebSocket for progress display
websocketData: {
connectorId: connector.externalId,
progress: {
totalChannels: payload.channelsToIngest.length,
processedChannels: 0,
totalMessages: 0,
processedMessages: 0,
},
},
// Internal state data for resumability
ingestionState: {
channelsToIngest: payload.channelsToIngest,
startDate: payload.startDate,
endDate: payload.endDate,
includeBotMessage: payload.includeBotMessage,
currentChannelIndex: 0, // Resume from this channel
lastUpdated: new Date().toISOString(),
},
},
},
})
})

// Start background ingestion processing asynchronously
// Ingestion runs in sync-server while API returns immediately
const email = sub
const resp = await handleSlackChannelIngestion(
payload.connectorId,
handleSlackChannelIngestion(
connector.id,
payload.channelsToIngest,
payload.startDate,
payload.endDate,
email,
payload.includeBotMessage,
)
ingestion.id, // Pass ingestion ID for progress tracking
).catch((error) => {
loggerWithChild({ email: sub }).error(
error,
`Background Slack channel ingestion failed for connector ${connector.id}: ${getErrorMessage(error)}`,
)
})

// Return immediate success response to frontend
// Actual progress will be communicated via WebSocket
return c.json({
success: true,
message: "Successfully ingested the channels",
message: "Slack channel ingestion started.",
ingestionId: ingestion.id,
})
} catch (error) {
loggerWithChild({ email: sub }).error(
error,
"Failed to ingest Slack channels",
"Failed to start Slack channel ingestion",
)
return c.json({
success: false,
message: getErrorMessage(error),
if (error instanceof HTTPException) throw error
throw new HTTPException(500, {
message: `Failed to start Slack channel ingestion: ${getErrorMessage(error)}`,
})
}
}
Expand Down
Loading