diff --git a/frontend/src/routes/_authenticated/admin/integrations/slack.tsx b/frontend/src/routes/_authenticated/admin/integrations/slack.tsx index b1ec200e0..527367f0a 100644 --- a/frontend/src/routes/_authenticated/admin/integrations/slack.tsx +++ b/frontend/src/routes/_authenticated/admin/integrations/slack.tsx @@ -177,6 +177,7 @@ type ManualIngestionFormData = { channelIds: string startDate: string endDate: string + includeBotMessage: boolean } interface SlackOAuthTabProps { @@ -1127,7 +1128,7 @@ const ManualIngestionForm = ({ // const startTimeRef = useRef(null) const form = useForm({ - defaultValues: { channelIds: "", startDate: "", endDate: "" }, + defaultValues: { channelIds: "", startDate: "", endDate: "", includeBotMessage: false }, onSubmit: async ({ value }) => { if (!connectorId) { toast({ @@ -1167,6 +1168,7 @@ const ManualIngestionForm = ({ channelsToIngest: channelIdsList, startDate: value.startDate, endDate: value.endDate, + includeBotMessage: value.includeBotMessage, }, }) : await api.slack.ingest_more_channel.$post({ @@ -1175,6 +1177,7 @@ const ManualIngestionForm = ({ channelsToIngest: channelIdsList, startDate: value.startDate, endDate: value.endDate, + includeBotMessage: value.includeBotMessage, }, }) @@ -1255,6 +1258,27 @@ const ManualIngestionForm = ({ )} /> +
+ ( + field.handleChange(e.target.checked)} + className="h-4 w-4 rounded border-gray-300 text-indigo-600 focus:ring-indigo-500" + /> + )} + /> + +
+ diff --git a/server/api/admin.ts b/server/api/admin.ts index b3cb0151b..3d3f8efdf 100644 --- a/server/api/admin.ts +++ b/server/api/admin.ts @@ -1502,6 +1502,7 @@ export const IngestMoreChannelApi = async (c: Context) => { channelsToIngest: string[] startDate: string endDate: string + includeBotMessage: boolean } try { @@ -1512,6 +1513,7 @@ export const IngestMoreChannelApi = async (c: Context) => { payload.startDate, payload.endDate, email, + payload.includeBotMessage, ) return c.json({ success: true, diff --git a/server/integrations/slack/channelIngest.ts b/server/integrations/slack/channelIngest.ts index 2383e2525..a3e909e01 100644 --- a/server/integrations/slack/channelIngest.ts +++ b/server/integrations/slack/channelIngest.ts @@ -342,6 +342,100 @@ export function formatSlackSpecialMentions( return formattedText } + +/** + * Processes user mentions in bot messages, replacing <@userId> with appropriate names + * Similar to formatSlackSpecialMentions but handles user mentions specifically + */ +export async function processBotMessageMentions( + text: string | undefined, + client: WebClient, + memberMap: Map, +): Promise { + if (!text) return "" + + let processedText = text + const mentionRegex = /<@([A-Z0-9]+)>/g + + // Find all user mentions + const mentions = text.match(mentionRegex) + if (!mentions) return processedText + + for (const mention of mentions) { + const userId = mention.slice(2, -1) // Remove <@ and > + + // Check if user is already in memberMap + let user = memberMap.get(userId) + + // If not in map, fetch user info + if (!user) { + try { + const userResponse = await client.users.info({ user: userId }) + if (userResponse.ok && userResponse.user) { + user = userResponse.user as User + memberMap.set(userId, user) + } + } catch (error) { + // If fetching fails, continue with original mention + continue + } + } + + if (user) { + let replacementName: string + + // Check if this is a bot user + if (user.is_bot) { + // For bots, use the bot's name or real_name + replacementName = + user.profile?.real_name || + user.real_name || + user.name || + "Unknown Bot" + } else { + // For regular users, use display_name or name + replacementName = + user.profile?.display_name || user.name || "Unknown User" + } + + // Replace the mention with @username format + processedText = processedText.replace(mention, `@${replacementName}`) + } + } + + return processedText +} + +/** + * Extracts and combines all text content from bot message blocks + */ +export function extractBotMessageText(message: SlackMessage): string { + let combinedText = "" + + // Add the main text field if it exists + if (message.text) { + combinedText += message.text + "\n" + } + + // Extract text from blocks + if (message.blocks) { + for (const block of message.blocks) { + if (block.type === "section" && block.text?.text) { + combinedText += block.text.text + "\n" + } + + if (block.type === "section" && block.fields) { + for (const field of block.fields) { + if (field.text) { + combinedText += field.text + "\n" + } + } + } + } + } + + return combinedText.trim() +} /** * Fetches all messages from a channel. * For each message that is a thread parent, it also fetches the thread replies. @@ -357,6 +451,7 @@ export async function insertChannelMessages( channelMap: Map, startDate: string, endDate: string, + includeBotMessages: boolean = false, ): Promise { let cursor: string | undefined = undefined @@ -414,37 +509,95 @@ export async function insertChannelMessages( text = formatSlackSpecialMentions(text, channelMap, channelId) message.text = text // Add the top-level message - if ( + // Check if message should be processed based on includeBotMessages flag + const isRegularMessage = message.type === "message" && !message.subtype && message.user && message.client_msg_id && message.text != "" - // memberMap[message.user] - ) { - // a deleted user's message could be there - if (!memberMap.get(message.user)) { - memberMap.set( - message.user, - ( - await client.users.info({ - user: message.user, - }) - ).user!, - ) - } + + const hasTextOrBlocks = + (message.text && message.text.trim().length > 0) || + (Array.isArray(message.blocks) && message.blocks.length > 0) + const isBotMessage = + includeBotMessages && + message.type === "message" && + message.bot_id && + hasTextOrBlocks + + if (isRegularMessage || isBotMessage) { message.mentions = mentions message.team = await getTeam(client, message) - // case to avoid bot messages - await insertChatMessage( - client, - message, - channelId, - memberMap.get(message.user!)?.profile?.display_name!, - memberMap.get(message.user!)?.name!, - memberMap.get(message.user!)?.profile?.image_192!, - ) + // Handle both regular user messages and bot messages + if (isBotMessage) { + // For bot messages, generate custom ID: channelId_ts_botid + const customBotId = `${channelId}_${message.ts}_${message.bot_id}` + // Temporarily set identifiers for bot messages + message.client_msg_id = message.client_msg_id || customBotId + // Ensure a userId-like value exists for downstream writes/attachments + message.user = + message.user || (message.bot_profile?.id as any) || message.bot_id + + // For bot messages, extract and combine all text from blocks + const combinedBotText = extractBotMessageText(message) + + // Process user mentions in the combined bot text + const processedBotText = await processBotMessageMentions( + combinedBotText, + client, + memberMap, + ) + + // Apply special mentions formatting + const finalBotText = formatSlackSpecialMentions( + processedBotText, + channelMap, + channelId, + ) + + message.text = finalBotText + // For bot messages, use bot profile information + const botName = message.bot_profile?.name || "Unknown Bot" + const botUsername = + message.bot_profile?.name || message.bot_id || "bot" + const botImage = + message.bot_profile?.icons?.image_72 || + message.bot_profile?.icons?.image_48 || + message.bot_profile?.icons?.image_36 || + "" + + await insertChatMessage( + client, + message, + channelId, + botName, + botUsername, + botImage, + ) + } else { + // For regular user messages, handle user info + if (message.user && !memberMap.get(message.user)) { + memberMap.set( + message.user, + ( + await client.users.info({ + user: message.user, + }) + ).user!, + ) + } + + await insertChatMessage( + client, + message, + channelId, + memberMap.get(message.user!)?.profile?.display_name!, + memberMap.get(message.user!)?.name!, + memberMap.get(message.user!)?.profile?.image_192!, + ) + } try { insertChatMessagesCount.inc({ conversation_id: channelId, @@ -486,14 +639,24 @@ export async function insertChannelMessages( const replies: (SlackMessage & { mentions?: string[] })[] = threadMessages.filter((msg) => msg.ts !== message.ts) for (const reply of replies) { - if ( + // Check if reply should be processed based on includeBotMessages flag + const isRegularReply = reply.type === "message" && !reply.subtype && reply.user && reply.client_msg_id && reply.text != "" - // memberMap[reply.user] - ) { + + const hasTextOrBlocks = + (reply.text && reply.text.trim().length > 0) || + (Array.isArray(reply.blocks) && reply.blocks.length > 0) + const isBotReply = + includeBotMessages && + reply.type === "message" && + reply.bot_id && + hasTextOrBlocks + + if (isRegularReply || isBotReply) { const mentions = extractUserIdsFromBlocks(reply) let text = reply.text if (mentions.length) { @@ -515,29 +678,78 @@ export async function insertChannelMessages( } } text = formatSlackSpecialMentions(text, channelMap, channelId) - if (!memberMap.get(reply.user)) { - memberMap.set( - reply.user, - ( - await client.users.info({ - user: reply.user, - }) - ).user!, - ) - } + reply.mentions = mentions reply.text = text - reply.team = await getTeam(client, reply) - await insertChatMessage( - client, - reply, - channelId, - memberMap.get(reply.user!)?.profile?.display_name!, - memberMap.get(reply.user!)?.name!, - memberMap.get(reply.user!)?.profile?.image_192!, - ) + // Handle both regular user replies and bot replies + if (isBotReply) { + // For bot replies, generate custom ID: channelId_ts_botid + const customBotId = `${channelId}_${reply.ts}_${reply.bot_id}` + reply.client_msg_id = reply.client_msg_id || customBotId + // Ensure a userId-like value exists for downstream writes/attachments + reply.user = + reply.user || (reply.bot_profile?.id as any) || reply.bot_id + + // For bot replies, extract and combine all text from blocks + const combinedBotText = extractBotMessageText(reply) + + // Process user mentions in the combined bot text + const processedBotText = await processBotMessageMentions( + combinedBotText, + client, + memberMap, + ) + + // Apply special mentions formatting + const finalBotText = formatSlackSpecialMentions( + processedBotText, + channelMap, + channelId, + ) + + reply.text = finalBotText + // For bot replies, use bot profile information + const botName = reply.bot_profile?.name || "Unknown Bot" + const botUsername = + reply.bot_profile?.name || reply.bot_id || "bot" + const botImage = + reply.bot_profile?.icons?.image_72 || + reply.bot_profile?.icons?.image_48 || + reply.bot_profile?.icons?.image_36 || + "" + + await insertChatMessage( + client, + reply, + channelId, + botName, + botUsername, + botImage, + ) + } else { + // For regular user replies, handle user info + if (reply.user && !memberMap.get(reply.user)) { + memberMap.set( + reply.user, + ( + await client.users.info({ + user: reply.user, + }) + ).user!, + ) + } + + await insertChatMessage( + client, + reply, + channelId, + memberMap.get(reply.user!)?.profile?.display_name!, + memberMap.get(reply.user!)?.name!, + memberMap.get(reply.user!)?.profile?.image_192!, + ) + } try { insertChatMessagesCount.inc({ conversation_id: channelId, @@ -913,6 +1125,7 @@ export const handleSlackChannelIngestion = async ( startDate: string, endDate: string, email: string, + includeBotMessages: boolean = false, ) => { try { const abortController = new AbortController() @@ -947,7 +1160,6 @@ export const handleSlackChannelIngestion = async ( const channelId = channelsToIngest[channel] // Get the channel ID string using the index from the for...in loop try { const response = await client.conversations.info({ channel: channelId }) - console.log(response.channel) if (response.ok && response.channel) { conversations.push(response.channel as Channel) } else { @@ -1104,6 +1316,7 @@ export const handleSlackChannelIngestion = async ( channelMap, startDate, endDate, + includeBotMessages, ) channelMessageInsertionDuration() insertChannelMessagesCount.inc({ diff --git a/server/scripts/slackChannelProcessing.ts b/server/scripts/slackChannelProcessing.ts new file mode 100644 index 000000000..28af8e09e --- /dev/null +++ b/server/scripts/slackChannelProcessing.ts @@ -0,0 +1,465 @@ +// Slack Channel Processing with re-ingestion for existing channels +import { getLogger } from "@/logger" +import { Subsystem } from "@/types" +import { Apps, chatContainerSchema } from "@xyne/vespa-ts/types" +import { db } from "@/db/client" +import pLimit from "p-limit" +import type { TxnOrClient } from "@/types" +import { syncJobs } from "@/db/schema" +import { eq } from "drizzle-orm" +import { z } from "zod" +import config, { NAMESPACE, CLUSTER } from "@/config" +import { handleSlackChannelIngestion } from "@/integrations/slack/channelIngest" +import { getConnectorByAppAndEmailId } from "@/db/connector" +import { AuthType } from "@/shared/types" + +const logger = getLogger(Subsystem.Api) + +// Types +export interface SlackChannelProcessingStats { + totalChannels: number + channelsProcessed: number + channelsWithoutValidUser: number + channelsAlreadyProcessed: number + errors: number + skippedChannels: number +} + +export interface SlackChannelProcessingOptions { + batchSize?: number + concurrency?: number + skipProcessed?: boolean + initialContinuationToken?: string +} + +const userSchema = z.object({ email: z.string() }) + +// VespaClient with visit functionality for slack containers +class SlackContainerVespaClient { + private vespaEndpoint: string + + constructor() { + this.vespaEndpoint = config.vespaEndpoint + } + + private async fetchWithRetry( + url: string, + options: RequestInit, + maxRetries = 3, + ): Promise { + let lastError: Error | null = null + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const response = await fetch(url, options) + return response + } catch (error) { + lastError = error as Error + logger.warn(`Fetch attempt ${attempt} failed: ${lastError.message}`) + + if (attempt < maxRetries) { + const delay = Math.min(1000 * Math.pow(2, attempt - 1), 5000) + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + } + + throw lastError || new Error("Max retries exceeded") + } + + async visit(options: { + namespace?: string + schema?: string + continuation?: string + wantedDocumentCount?: number + fieldSet?: string + concurrency?: number + }): Promise<{ documents: any[]; continuation?: string }> { + const { + namespace = NAMESPACE, + schema = chatContainerSchema, + continuation, + wantedDocumentCount = 100, + fieldSet = "[all]", + concurrency = 1, + } = options + + const params = new URLSearchParams({ + wantedDocumentCount: wantedDocumentCount.toString(), + cluster: CLUSTER || "search", + selection: schema, + fieldSet: fieldSet, + concurrency: concurrency.toString(), + ...(continuation ? { continuation } : {}), + }) + + const url = `${this.vespaEndpoint}/document/v1/${namespace}/${schema}/docid?${params.toString()}` + + try { + logger.info(`Visiting slack containers: ${url}`) + if (continuation) { + logger.info(`Using continuation token: ${continuation}`) + } + + const response = await this.fetchWithRetry(url, { + method: "GET", + headers: { + Accept: "application/json", + }, + }) + + if (!response.ok) { + const errorText = await response.text() + throw new Error( + `Visit failed: ${response.status} ${response.statusText} - ${errorText}`, + ) + } + + const data = await response.json() + // Log the continuation token for recovery purposes + if (data.continuation) { + logger.info(`Received continuation token: ${data.continuation}`) + } else { + logger.info( + "No continuation token received - this might be the last batch", + ) + } + + return { + documents: data.documents || [], + continuation: data.continuation, + } + } catch (error) { + const errMessage = (error as Error).message + logger.error(error, `Error visiting slack containers: ${errMessage}`) + throw new Error(`Error visiting slack containers: ${errMessage}`) + } + } +} + +const slackContainerVespaClient = new SlackContainerVespaClient() + +// Fetch slack container documents with continuation using custom vespa client +export const fetchSlackContainerDocumentsWithContinuation = async ( + limit: number = 100, + continuation?: string, +): Promise<{ documents: any[]; continuation?: string }> => { + return await slackContainerVespaClient.visit({ + namespace: NAMESPACE, + schema: chatContainerSchema, + continuation: continuation, + wantedDocumentCount: limit, + fieldSet: "[all]", + concurrency: 1, + }) +} + +// Database Operations +async function getUsersWithSlackSyncJobs( + trx: TxnOrClient, + app: Apps, +): Promise> { + const jobs = await trx + .select({ email: syncJobs.email }) + .from(syncJobs) + .where(eq(syncJobs.app, app)) + + const users = z.array(userSchema).parse(jobs) + return new Set(users.map((user) => user.email)) +} + +// Get connector ID for a user with Slack connection +async function getSlackConnectorForUser(email: string): Promise { + try { + const connector = await getConnectorByAppAndEmailId( + db, + Apps.Slack, + AuthType.OAuth, + email, + ) + return connector?.id || null + } catch (error) { + logger.warn({ email, error }, "Failed to get Slack connector for user") + return null + } +} + +// Main function to process a slack channel document +async function processSlackChannel( + doc: any, + validUsers: Set, +): Promise<{ + processed: boolean + error?: Error + reason?: string +}> { + const docId = doc.fields.docId + + try { + // Get permissions array from the document + const permissions = doc.fields.permissions || [] + if (!permissions.length) { + logger.warn({ docId }, "Channel has no permissions, skipping") + return { processed: false, reason: "no_permissions" } + } + + // Find a user from permissions who has Slack connected + let selectedUser: string | null = null + let connectorId: number | null = null + + for (const email of permissions) { + if (validUsers.has(email)) { + const userConnectorId = await getSlackConnectorForUser(email) + if (userConnectorId) { + selectedUser = email + connectorId = userConnectorId + break + } + } + } + + if (!selectedUser || !connectorId) { + logger.warn( + { docId, permissions }, + "No valid user with Slack connector found in permissions", + ) + return { processed: false, reason: "no_valid_user_with_connector" } + } + + // Call channel ingestion with no time range (empty strings for start/end date) + await handleSlackChannelIngestion( + connectorId, + [docId], // Array with single channel ID + "", // No start date - full ingestion + "", // No end date - full ingestion + selectedUser, + false, + ) + + logger.info( + { + docId, + selectedUser, + connectorId, + }, + "Processed slack channel re-ingestion", + ) + + return { processed: true } + } catch (error) { + logger.error({ docId, error }, "Failed to process slack channel") + return { processed: false, error: error as Error } + } +} + +// Main Function to Process Slack Channels +export async function processSlackChannels( + app: Apps = Apps.Slack, + options: SlackChannelProcessingOptions = {}, +): Promise { + const { + batchSize = 100, + concurrency = 3, // Lower concurrency to avoid overwhelming Slack API + skipProcessed = true, + initialContinuationToken, + } = options + + logger.info( + { + app, + batchSize, + concurrency, + skipProcessed, + initialContinuationToken: initialContinuationToken ? "PROVIDED" : "NONE", + }, + "Starting slack channel processing", + ) + + if (initialContinuationToken) { + logger.info("=== RESUMING FROM CONTINUATION TOKEN ===") + logger.info("Initial continuation token:", initialContinuationToken) + logger.info("==========================================") + logger.info( + { initialContinuationToken }, + "Resuming from provided continuation token", + ) + } + + const stats: SlackChannelProcessingStats = { + totalChannels: 0, + channelsProcessed: 0, + channelsWithoutValidUser: 0, + channelsAlreadyProcessed: 0, + errors: 0, + skippedChannels: 0, + } + + let continuation: string | undefined = initialContinuationToken + + try { + // Get users with Slack sync jobs + const validUsers = await getUsersWithSlackSyncJobs(db, app) + + if (validUsers.size === 0) { + logger.warn("No users with Slack sync jobs found") + return stats + } + + logger.info( + { userCount: validUsers.size }, + "Found users with Slack sync jobs", + ) + + const processLimit = pLimit(concurrency) + let batchCounter = 0 + + // Process in batches using vespa.visit + do { + batchCounter++ + const batchId = `batch-${batchCounter}-${Date.now()}` + + logger.info(`=== BATCH ${batchCounter} CONTINUATION TOKEN ===`) + logger.info("Current continuation token:", continuation || "NONE") + logger.info("=========================================") + + const { documents, continuation: nextContinuation } = + await fetchSlackContainerDocumentsWithContinuation( + batchSize, + continuation, + ) + logger.info(`=== BATCH ${batchCounter} NEXT CONTINUATION TOKEN ===`) + logger.info("Next continuation token:", nextContinuation || "NONE") + logger.info("===============================================") + + continuation = nextContinuation + + if (documents.length === 0) break + + // Filter only Slack app containers + const slackChannels = documents.filter( + (doc) => + doc?.fields?.app === Apps.Slack && doc?.fields?.entity === "channel", + ) + + stats.totalChannels += slackChannels.length + + if (slackChannels.length === 0) { + logger.info( + { + batchId, + totalDocuments: documents.length, + slackChannels: slackChannels.length, + }, + "No Slack channels in batch, skipping", + ) + continue + } + + logger.info( + { + batchId, + totalDocuments: documents.length, + slackChannels: slackChannels.length, + continuationToken: continuation ? "HAS_TOKEN" : "NO_TOKEN", + }, + "Processing batch", + ) + + // Process channels concurrently + const results = await Promise.allSettled( + slackChannels.map((doc: any) => + processLimit(() => processSlackChannel(doc, validUsers)), + ), + ) + + // Update statistics + results.forEach((result) => { + if (result.status === "fulfilled") { + const { processed, error, reason } = result.value + if (processed) { + stats.channelsProcessed++ + } else if ( + reason === "no_permissions" || + reason === "no_valid_user_with_connector" + ) { + stats.channelsWithoutValidUser++ + } else { + stats.skippedChannels++ + } + if (error) stats.errors++ + } else { + stats.errors++ + logger.error(result.reason, "Processing failed") + } + }) + + logger.info( + { + batchId, + processed: slackChannels.length, + ...stats, + }, + "Batch completed", + ) + + if (continuation) { + logger.info(`=== BATCH ${batchCounter} COMPLETED - SAVE THIS TOKEN ===`) + logger.info("Continuation token for next run:", continuation) + logger.info("=================================================") + } + } while (continuation) + + logger.info( + { + ...stats, + }, + "Slack channel processing completed", + ) + + return stats + } catch (error) { + logger.info("=== ERROR OCCURRED - SAVE THIS TOKEN FOR RECOVERY ===") + logger.info("Last continuation token:", continuation || "NONE") + logger.info("====================================================") + + logger.error( + { error, lastContinuationToken: continuation }, + "Fatal error during slack channel processing", + ) + throw error + } +} + +// Single entry point function +export async function runSlackChannelProcessing( + continuationToken?: string, +): Promise { + logger.info( + continuationToken + ? `Resuming slack channel processing from token: ${continuationToken}` + : "Starting slack channel processing from the beginning", + ) + + const stats = await processSlackChannels(Apps.Slack, { + batchSize: 50, // Smaller batches for Slack to avoid rate limits + concurrency: 2, // Lower concurrency for Slack API + skipProcessed: false, // We want to re-process all channels + initialContinuationToken: continuationToken, + }) + + logger.info("Slack channel processing completed:", stats) + logger.info("Channels processed:", stats.channelsProcessed) +} + +// Run the processing if this file is executed directly +if (require.main === module) { + runSlackChannelProcessing() + .then(() => { + logger.info("Slack channel processing completed successfully") + process.exit(0) + }) + .catch((error) => { + logger.error({ error }, "Slack channel processing failed") + process.exit(1) + }) +} diff --git a/server/types.ts b/server/types.ts index a7bf2b193..6a7918d96 100644 --- a/server/types.ts +++ b/server/types.ts @@ -20,9 +20,9 @@ import type { } from "@xyne/vespa-ts/types" export enum ProcessingJobType { - FILE = 'file', - COLLECTION = 'collection', - FOLDER = 'folder' + FILE = "file", + COLLECTION = "collection", + FOLDER = "folder", } // type GoogleContacts = people_v1.Schema$Person @@ -559,6 +559,7 @@ export const ingestMoreChannelSchema = z.object({ channelsToIngest: z.array(z.string()), startDate: z.string(), endDate: z.string(), + includeBotMessage: z.boolean().optional().default(false), }) export const startSlackIngestionSchema = z.object({ connectorId: z.number(),