Skip to content

Conversation

MohdShoaib-18169
Copy link
Contributor

@MohdShoaib-18169 MohdShoaib-18169 commented Oct 14, 2025

…sume the channel ingestion

Description

Testing

Additional Notes

Summary by CodeRabbit

  • New Features

    • Slack UI: start, pause, resume, cancel ingestions; resume interrupted ingests; "ingest more channels" without restarting.
    • Admin: endpoint to delete a user’s data and an API to create resumable "ingest more channels" jobs.
  • Improvements

    • Live ingestion status with polling, progress bars, channel/message counts, current-channel details, and user-facing toasts.
    • Resumable background ingestions with duplicate-run prevention, lifecycle controls, automatic status refresh, and persistent progress for reliable resumes.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 14, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds DB-backed, resumable Slack channel ingestion end-to-end: DB schema and accessors, ingestion lifecycle APIs (status/pause/resume/cancel/start), background resumable ingestion logic with metadata persistence, main-server proxy routing, and frontend polling/UI controls integrated into the Slack admin route and ManualIngestionForm.

Changes

Cohort / File(s) Summary
Frontend: Slack admin & form
frontend/src/routes/_authenticated/admin/integrations/slack.tsx, frontend/src/routes/_authenticated/admin/integrations/ManualIngestionForm*
Adds ingestion state and polling, guarded fetch/status handlers (fetchIngestionStatus, startChannelIngestion, resumeIngestion, pauseIngestion, cancelIngestion), UI wiring for ingestion status/progress, new icons/components, wsClient wiring placeholder, and expands ManualIngestionFormProps to accept ingestion props/handlers.
API: Ingestion lifecycle & admin
server/api/ingestion.ts, server/api/admin.ts
New ingestion APIs and zod schemas: GetIngestionStatusApi, CancelIngestionApi, PauseIngestionApi, ResumeIngestionApi; adds IngestMoreChannelApi and AdminDeleteUserData admin endpoint. Implements auth/ownership checks, state transitions, resume triggering, and validation.
DB: Schema & accessors
server/db/schema/ingestions.ts, server/db/schema/index.ts, server/db/ingestion.ts
Adds ingestion_status enum and ingestions table with Slack-specific metadata zod schemas; exports types (InsertIngestion, SelectIngestion, IngestionStatus, SlackIngestionMetadata); implements DB helpers (createIngestion, getActiveIngestionForUser, updateIngestionStatus, updateIngestionMetadata, getIngestionById, hasActiveIngestion) and re-exports schema.
Slack ingestion logic (worker)
server/integrations/slack/channelIngest.ts
Converts WS-based progress to DB-persisted resumable ingestion: adds optional ingestionId, resume-from-state logic, periodic metadata/progress persistence, per-message counters, cancellation/pause checks, and final status updates. Updates insertChannelMessages and handleSlackChannelIngestion signatures to support counters, checks, and resumability.
Server routing & proxying
server/server.ts, server/sync-server.ts
Registers ingestion endpoints on sync-server (GET /ingestion/status, POST /ingestion/cancel

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor UI as Frontend (Slack Admin)
  participant Main as Main Server
  participant Sync as Sync Server
  participant DB as Database
  participant Worker as Ingest Worker
  participant Slack as Slack API

  UI->>Main: POST /admin/ingest-more-channel (start)
  Main->>DB: createIngestion(status=pending, metadata)
  Main->>Worker: start handleSlackChannelIngestion(..., ingestionId)
  Main-->>UI: 200 {ingestionId}

  loop Polling
    UI->>Main: GET /ingestion/status?connectorId=...
    Main->>Sync: proxy GET /ingestion/status
    Sync->>DB: fetch active/latest ingestion
    DB-->>Sync: ingestion payload
    Sync-->>Main: status payload
    Main-->>UI: status payload
  end

  rect rgba(200,240,200,0.25)
    Worker->>DB: persist progress & metadata
    Worker->>Slack: fetch channels/messages
    alt paused / cancelled
      Worker->>DB: updateIngestionStatus(paused/cancelled)
    else completed
      Worker->>DB: updateIngestionStatus(completed)
    end
  end
Loading
sequenceDiagram
  autonumber
  actor UI as Frontend
  participant Sync as Sync Server
  participant DB as Database
  participant Worker as Ingest Worker

  UI->>Sync: POST /ingestion/pause {ingestionId}
  Sync->>DB: updateIngestionStatus(paused)
  Sync-->>UI: 200

  UI->>Sync: POST /ingestion/resume {ingestionId}
  Sync->>DB: getIngestionById(ingestionId)
  Sync->>Worker: handleSlackChannelIngestion(..., ingestionId)
  Sync-->>UI: 200

  UI->>Sync: POST /ingestion/cancel {ingestionId}
  Sync->>DB: updateIngestionStatus(cancelled)
  Sync-->>UI: 200
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • zereraz
  • shivamashtikar
  • kalpadhwaryu
  • junaid-shirur
  • devesh-juspay

Poem

I nibble at code with twitching nose,
I store each hop where progress grows.
Pause, resume, or cancel a run—
I hop through channels till all is done.
DB carrots found, ingestion complete! 🥕🐇

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly identifies the primary feature—adding pause and resume capabilities to Slack channel ingestion—using a concise, conventional-commit style prefix. It succinctly captures the main change in the pull request without extraneous detail. This makes it immediately understandable to teammates scanning commit history.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch slack-channel-resumability

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 164093c and 758bb26.

📒 Files selected for processing (1)
  • server/integrations/slack/channelIngest.ts (19 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/integrations/slack/channelIngest.ts
🧬 Code graph analysis (1)
server/integrations/slack/channelIngest.ts (4)
server/integrations/slack/index.ts (3)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/db/connector.ts (2)
  • db (3-3)
  • getOAuthConnectorWithCredentials (195-317)
server/metrics/slack/slack-metrics.ts (2)
  • totalConversationsSkipped (109-113)
  • totalConversationsToBeInserted (93-97)
🔇 Additional comments (1)
server/integrations/slack/channelIngest.ts (1)

1268-1305: Resumability implementation looks solid.

The resumability logic properly addresses all previously flagged issues:

  • includeBotMessage is correctly restored from metadata (line 1287-1288) and persisted (lines 1429, 1880)
  • Resume index calculation properly maps channels to avoid skipping (lines 1487-1496)
  • Interval is hoisted and cleaned up in finally block (lines 1224, 1951)
  • Abort handling returns null instead of throwing errors (line 1573)
  • Message counters are tracked and restored from saved state

The implementation provides robust end-to-end resumability with proper state persistence.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @MohdShoaib-18169, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers a critical enhancement to the Slack channel ingestion feature by making it resumable and controllable. It provides users with the flexibility to manage their data imports more effectively, allowing them to pause and restart processes without losing progress. The changes span both frontend and backend, introducing new database structures, API endpoints, and UI components to support this robust functionality.

Highlights

  • Resumable Slack Channel Ingestion: Introduced the ability to pause, resume, cancel, and delete Slack channel ingestion processes, significantly improving the user experience for long-running data imports.
  • Persistent Ingestion State Management: Implemented a new database table (ingestions) to store the state and progress of each ingestion, enabling true resumability and robust tracking even if the server restarts.
  • Enhanced Frontend UI for Ingestion Control: Added new UI elements in the Slack integration settings to display real-time ingestion status, progress bars, and interactive buttons for managing ingestion lifecycle (pause, play, cancel, delete).
  • API Endpoints for Ingestion Lifecycle: Created a dedicated set of API endpoints (/api/v1/ingestion/status, /cancel, /pause, /resume, /delete) to manage the ingestion processes from the frontend, ensuring proper authorization and data validation.
  • Shift from WebSockets to Database Polling for Slack Progress: Transitioned Slack channel ingestion progress updates from real-time WebSockets to a database polling mechanism, where the frontend periodically fetches the latest status and progress from the backend.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant feature for Slack integration: resumable channel ingestion. The changes include a new database table to track ingestion state, new APIs for managing the ingestion lifecycle (pause, resume, cancel, delete), and a polling mechanism on the frontend to display real-time progress. The implementation is comprehensive, but I've identified a critical runtime error in the ingestion logic that needs to be addressed, as well as a high-severity bug in the frontend related to React hooks that could lead to incorrect behavior. Additionally, there are several medium-severity issues concerning code quality, leftover debug code, and data consistency that should be reviewed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/integrations/slack/channelIngest.ts (1)

448-509: Fix pause-check integration in message loop

checkCancellationOrPause is referenced inside insertChannelMessages, but that helper is defined inside handleSlackChannelIngestion’s scope. TypeScript rightfully fails to compile (TS2304). At the same time, the early-exit path currently returns an object even though the function promises void, which triggers the second pipeline error. Please pass the checker down from the caller and exit with a bare return.

 export async function insertChannelMessages(
   email: string,
   client: WebClient,
   channelId: string,
   abortController: AbortController,
   memberMap: Map<string, User>,
   tracker: Tracker,
   timestamp: string = "0",
   channelMap: Map<string, string>,
   startDate: string,
   endDate: string,
   includeBotMessages: boolean = false,
-  messageCounters?: { totalMessages: { value: number }, processedMessages: { value: number } },
+  messageCounters?: { totalMessages: { value: number }, processedMessages: { value: number } },
+  checkCancellationOrPause?: () => Promise<{ shouldStop: boolean; isPaused: boolean }>,
 ): Promise<void> {
@@
-        const { shouldStop } = await checkCancellationOrPause()
-        if (shouldStop) {
+        if (checkCancellationOrPause) {
+          const { shouldStop } = await checkCancellationOrPause()
+          if (shouldStop) {
             loggerWithChild({ email }).info(`Message processing stopped due to pause/cancellation/deletion in channel ${channelId}`)
-          return { messages: [], files: [], channel: channelId }
+            return
+          }
         }

Remember to pass the new callback when you invoke insertChannelMessages inside handleSlackChannelIngestion.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 836c107 and e25e9ef.

📒 Files selected for processing (9)
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx (10 hunks)
  • server/api/admin.ts (2 hunks)
  • server/api/ingestion.ts (1 hunks)
  • server/db/ingestion.ts (1 hunks)
  • server/db/schema/index.ts (1 hunks)
  • server/db/schema/ingestions.ts (1 hunks)
  • server/integrations/slack/channelIngest.ts (12 hunks)
  • server/server.ts (3 hunks)
  • server/sync-server.ts (9 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/sync-server.ts
  • server/db/schema/ingestions.ts
  • server/integrations/slack/channelIngest.ts
  • server/api/ingestion.ts
  • server/db/ingestion.ts
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.

Applied to files:

  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/sync-server.ts (1)
server/api/ingestion.ts (10)
  • getIngestionStatusSchema (27-29)
  • GetIngestionStatusApi (50-92)
  • cancelIngestionSchema (31-33)
  • CancelIngestionApi (97-142)
  • pauseIngestionSchema (43-45)
  • PauseIngestionApi (147-192)
  • resumeIngestionSchema (35-37)
  • ResumeIngestionApi (197-288)
  • deleteIngestionSchema (39-41)
  • DeleteIngestionApi (293-339)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
  • users (27-57)
server/db/schema/connectors.ts (1)
  • connectors (60-109)
server/db/schema/workspaces.ts (1)
  • workspaces (7-22)
server/integrations/slack/channelIngest.ts (3)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/db/connector.ts (2)
  • db (3-3)
  • getOAuthConnectorWithCredentials (195-317)
server/metrics/slack/slack-metrics.ts (2)
  • totalConversationsSkipped (109-113)
  • totalConversationsToBeInserted (93-97)
server/server.ts (1)
server/api/ingestion.ts (5)
  • getIngestionStatusSchema (27-29)
  • cancelIngestionSchema (31-33)
  • pauseIngestionSchema (43-45)
  • resumeIngestionSchema (35-37)
  • deleteIngestionSchema (39-41)
server/api/ingestion.ts (5)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • getLoggerWithChild (192-200)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (1)
  • getConnector (143-164)
server/db/ingestion.ts (4)
  • getActiveIngestionForUser (29-47)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • deleteIngestion (126-133)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1159-1588)
server/api/admin.ts (6)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (2)
  • db (3-3)
  • getConnector (143-164)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/db/ingestion.ts (2)
  • hasActiveIngestion (138-155)
  • createIngestion (15-24)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1159-1588)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/db/ingestion.ts (2)
server/types.ts (1)
  • TxnOrClient (319-319)
server/db/schema/ingestions.ts (4)
  • InsertIngestion (146-146)
  • SelectIngestion (147-147)
  • ingestions (39-87)
  • IngestionStatus (148-148)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (5)
server/db/schema/ingestions.ts (1)
  • IngestionStatus (148-148)
frontend/src/hooks/use-toast.ts (1)
  • toast (201-201)
server/utils.ts (1)
  • getErrorMessage (103-106)
frontend/src/api.ts (1)
  • api (5-5)
server/db/ingestion.ts (1)
  • deleteIngestion (126-133)
🪛 GitHub Actions: TypeScript Build Check
server/integrations/slack/channelIngest.ts

[error] 501-501: TS2304: Cannot find name 'checkCancellationOrPause'.


[error] 504-504: TS2322: Type '{ messages: never[]; files: never[]; channel: string; }' is not assignable to type 'void'.

server/api/ingestion.ts

[error] 23-23: TS2551: Property 'API' does not exist on type 'typeof Subsystem'. Did you mean 'Api'?


[error] 24-24: TS2551: Property 'API' does not exist on type 'typeof Subsystem'. Did you mean 'Api'?

server/api/admin.ts

[error] 1581-1581: TS2345: Argument of type 'number' is not assignable to parameter of type 'string'.


[error] 1634-1634: TS2345: Argument of type 'number' is not assignable to parameter of type 'string'.

@MohdShoaib-18169 MohdShoaib-18169 force-pushed the slack-channel-resumability branch from e25e9ef to d480bc4 Compare October 15, 2025 20:03
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (7)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)

593-597: Typo: “Slack OAuthA” → “Slack OAuth”

Already flagged earlier; please correct.

-              <CardTitle>Slack OAuthA</CardTitle>
+              <CardTitle>Slack OAuth</CardTitle>

807-904: Remove noisy console logs before merge

These console.log statements should be removed or gated behind dev flags to avoid noisy prod logs.


1576-1578: Normalize progress source (backend) to one field

Reading progress from three locations signals backend inconsistency. Prefer a single canonical field.

server/api/admin.ts (1)

1581-1584: Remove parseInt on numeric connectorId (build breaker)

payload.connectorId is already a number; parseInt causes TS errors. Forward the number (or use connector.id).

-    const connector = await getConnector(db, parseInt(payload.connectorId))
+    const connector = await getConnector(db, payload.connectorId)-    handleSlackChannelIngestion(
-      parseInt(payload.connectorId),
+    handleSlackChannelIngestion(
+      connector.id,

Also applies to: 1634-1641

server/api/ingestion.ts (2)

48-51: Address the @ts-ignore with proper typing.

As noted in the previous review, @ts-ignore directives can be avoided by properly typing the Hono context.


23-25: Fix TypeScript error: use Subsystem.Api instead of Subsystem.API.

The Subsystem enum does not have an API member. Change both references to Subsystem.Api.

Apply this diff to fix the TypeScript error:

-const Logger = getLogger(Subsystem.API)
-const loggerWithChild = getLoggerWithChild(Subsystem.API)
+const Logger = getLogger(Subsystem.Api)
+const loggerWithChild = getLoggerWithChild(Subsystem.Api)
server/integrations/slack/channelIngest.ts (1)

1359-1367: Add includeBotMessages to ingestion state for resumability.

As noted in the previous review, the includeBotMessages flag is not persisted in the ingestionState, so resumed runs will default to false and skip bot messages even if they were originally included.

Apply this diff:

               ingestionState: {
                 currentChannelId: existingChannelsToIngest[processedChannels],
                 channelsToIngest: existingChannelsToIngest,
                 startDate: existingStartDate,
                 endDate: existingEndDate,
+                includeBotMessages: includeBotMessages,
                 currentChannelIndex: processedChannels,
                 lastMessageTimestamp: globalLastMessageTimestamp,
                 lastUpdated: new Date().toISOString(),
               },
🧹 Nitpick comments (10)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)

794-795: Fix browser typing for interval ref

Using NodeJS.Timeout in a React browser app can cause TS/type issues. Use ReturnType.

-  const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null)
+  const pollingIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)

946-1047: Unify on typed API client instead of raw fetch

These ingestion calls bypass the existing typed api client. Prefer api.* helpers for consistency (auth, error handling), e.g., api.ingestion.resume.$post(…) if exposed, or add typed routes.


1553-1571: Avoid non-null assertions on connectorId in action handlers

connectorId! may throw if undefined. Guard or early-return.

- onClick={() => pauseIngestion(ingestion.id, connectorId!)}
+ onClick={() => connectorId && pauseIngestion(ingestion.id, connectorId)}

Also applies to: 1652-1669

server/db/schema/ingestions.ts (1)

85-87: Add index for frequent status lookups (userId, connectorId, status)

getActiveIngestionForUser filters by userId+connectorId+status; add an index to avoid scans at scale.

 import {
   serial,
   pgTable,
   text,
   integer,
   timestamp,
   jsonb,
   pgEnum,
+  index,
 } from "drizzle-orm/pg-core"
 …
 export const ingestions = pgTable(
   "ingestions",
   {
     …
   },
-  // No unique constraints - application logic will handle preventing concurrent active ingestions
-)
+  (t) => ({
+    // Accelerate active-ingestion queries
+    userConnectorStatusIdx: index("ing_user_connector_status_idx").on(
+      t.userId,
+      t.connectorId,
+      t.status,
+    ),
+  }),
+)
server/server.ts (1)

1434-1488: Add timeout/abort to sync-server proxy calls

Protect upstream calls with an AbortController (e.g., 30s) to avoid hanging requests on sync-server issues.

-    const response = await fetch(url, requestConfig)
+    const ac = new AbortController()
+    const t = setTimeout(() => ac.abort(), 30_000)
+    const response = await fetch(url, { ...requestConfig, signal: ac.signal })
+    clearTimeout(t)
server/db/ingestion.ts (3)

37-43: Prefer Drizzle's inArray for type safety.

The raw SQL for status filtering bypasses Drizzle's type system and is prone to typos or mismatches with the schema enum.

Apply this diff to use Drizzle's type-safe operator:

+import { eq, and, sql, inArray } from "drizzle-orm"
+
 export const getActiveIngestionForUser = async (
   txn: TxnOrClient,
   userId: number,
   connectorId: number
 ): Promise<SelectIngestion | null> => {
   const result = await txn
     .select()
     .from(ingestions)
     .where(
       and(
         eq(ingestions.userId, userId),
         eq(ingestions.connectorId, connectorId),
-        sql`status IN ('in_progress', 'paused', 'failed')`
+        inArray(ingestions.status, ['in_progress', 'paused', 'failed'])
       )
     )
     .limit(1)
   
   return (result[0] as SelectIngestion) || null
 }

90-93: Consider typing the metadata parameter.

The metadata parameter is typed as any, which bypasses type checking. Consider using the metadata shape from the schema or a generic type parameter.

Apply this diff to improve type safety:

 export const updateIngestionMetadata = async (
   txn: TxnOrClient,
   ingestionId: number,
-  metadata: any
+  metadata: unknown
 ): Promise<SelectIngestion> => {

Using unknown requires explicit type checking before use, providing better safety than any.


135-141: Prefer Drizzle's inArray for consistency.

Similar to getActiveIngestionForUser, use Drizzle's type-safe operator instead of raw SQL.

Apply this diff:

   const result = await txn
     .select({ count: sql<number>`count(*)` })
     .from(ingestions)
     .where(
       and(
         eq(ingestions.userId, userId),
         eq(ingestions.connectorId, connectorId),
-        sql`status IN ('pending', 'in_progress', 'paused', 'failed')`
+        inArray(ingestions.status, ['pending', 'in_progress', 'paused', 'failed'])
       )
     )
server/integrations/slack/channelIngest.ts (1)

505-676: Consider reducing logging verbosity for production.

The message processing loop contains extensive debug-style logging (e.g., "Processing message X/Y", "Found N mentions", "Successfully fetched user info", etc.). While helpful during development and initial rollout, consider:

  1. Using debug-level logs for detailed operation tracking
  2. Keeping only error and key milestone logs at info level
  3. Adding a configurable log level for this feature

This will reduce log volume and costs in production while maintaining debuggability when needed.

server/api/ingestion.ts (1)

260-285: Align variable naming with the ingestion function signature.
Rename the local includeBotMessage constant to includeBotMessages (plural) when reading from state and passing it to handleSlackChannelIngestion to match its parameter name.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e25e9ef and d480bc4.

📒 Files selected for processing (9)
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx (10 hunks)
  • server/api/admin.ts (2 hunks)
  • server/api/ingestion.ts (1 hunks)
  • server/db/ingestion.ts (1 hunks)
  • server/db/schema/index.ts (1 hunks)
  • server/db/schema/ingestions.ts (1 hunks)
  • server/integrations/slack/channelIngest.ts (19 hunks)
  • server/server.ts (3 hunks)
  • server/sync-server.ts (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/db/schema/index.ts
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/db/ingestion.ts
  • server/integrations/slack/channelIngest.ts
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
  • server/api/ingestion.ts
  • server/db/schema/ingestions.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.

Applied to files:

  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/server.ts (1)
server/api/ingestion.ts (4)
  • getIngestionStatusSchema (28-30)
  • cancelIngestionSchema (32-34)
  • pauseIngestionSchema (41-43)
  • resumeIngestionSchema (36-38)
server/api/admin.ts (6)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (2)
  • db (3-3)
  • getConnector (143-164)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/db/ingestion.ts (2)
  • hasActiveIngestion (127-144)
  • createIngestion (15-24)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1196-1773)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/db/ingestion.ts (2)
server/types.ts (1)
  • TxnOrClient (319-319)
server/db/schema/ingestions.ts (4)
  • InsertIngestion (146-146)
  • SelectIngestion (147-147)
  • ingestions (39-87)
  • IngestionStatus (148-148)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (3)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/db/connector.ts (2)
  • db (3-3)
  • getOAuthConnectorWithCredentials (195-317)
server/sync-server.ts (1)
server/api/ingestion.ts (8)
  • getIngestionStatusSchema (28-30)
  • GetIngestionStatusApi (48-109)
  • cancelIngestionSchema (32-34)
  • CancelIngestionApi (114-159)
  • pauseIngestionSchema (41-43)
  • PauseIngestionApi (164-209)
  • resumeIngestionSchema (36-38)
  • ResumeIngestionApi (214-305)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (9)
server/db/schema/ingestions.ts (1)
  • IngestionStatus (148-148)
frontend/src/components/ui/card.tsx (1)
  • CardTitle (76-76)
frontend/src/hooks/use-toast.ts (1)
  • toast (201-201)
server/utils.ts (1)
  • getErrorMessage (103-106)
frontend/src/api.ts (1)
  • api (5-5)
frontend/src/components/ui/button.tsx (1)
  • Button (57-57)
frontend/src/components/ui/progress.tsx (1)
  • Progress (26-26)
frontend/src/components/ui/label.tsx (1)
  • Label (24-24)
frontend/src/components/ui/input.tsx (1)
  • Input (25-25)
server/api/ingestion.ts (6)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • getLoggerWithChild (192-200)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (1)
  • getConnector (143-164)
server/db/ingestion.ts (3)
  • getActiveIngestionForUser (29-47)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
server/db/schema/ingestions.ts (2)
  • ingestions (39-87)
  • SelectIngestion (147-147)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1196-1773)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
  • users (27-57)
server/db/schema/connectors.ts (1)
  • connectors (60-109)
server/db/schema/workspaces.ts (1)
  • workspaces (7-22)
🔇 Additional comments (1)
server/db/ingestion.ts (1)

58-66: Clarify the startedAt condition.

The condition status === "in_progress" && !errorMessage (line 64) is unclear. If the intent is to set startedAt only on the first transition to in_progress (not when resuming), this should be documented or the condition should check if startedAt is already set.

Please verify: should startedAt be set every time status becomes in_progress, or only the first time? If the latter, consider checking whether startedAt is already null before setting it.

Additionally, consider typing updateData explicitly instead of using any for better type safety:

-  const updateData: any = {
+  const updateData: Partial<typeof ingestions.$inferInsert> = {
     status,
     updatedAt: sql`NOW()`,
   }

@MohdShoaib-18169 MohdShoaib-18169 force-pushed the slack-channel-resumability branch from d480bc4 to f4c113d Compare October 16, 2025 14:10
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (5)
server/db/schema/ingestions.ts (1)

37-87: Add composite index for active‑ingestion checks

Queries repeatedly filter by userId+connectorId+status. Add an index to prevent seq scans at scale.

-import {
+import {
   serial,
   pgTable,
   text,
   integer,
   timestamp,
   jsonb,
   pgEnum,
+  index,
 } from "drizzle-orm/pg-core"
@@
-export const ingestions = pgTable(
-  "ingestions",
-  {
+export const ingestions = pgTable(
+  "ingestions",
+  {
     // Primary key for the ingestion record
     id: serial("id").notNull().primaryKey(),
@@
   },
-  // No unique constraints - application logic will handle preventing concurrent active ingestions
-)
+  // No unique constraints - application logic will handle preventing concurrent active ingestions
+  (t) => ({
+    userConnectorStatusIdx: index("ingestions_user_connector_status_idx").on(
+      t.userId,
+      t.connectorId,
+      t.status,
+    ),
+  }),
+)
server/db/ingestion.ts (1)

123-144: Revisit “active” definition (include/exclude failed)

Including failed in “active” blocks restarts until manual intervention. If you want quick retries, exclude 'failed' here (keep it in getActiveIngestionForUser for resume UI).

server/integrations/slack/channelIngest.ts (2)

1316-1324: Avoid dynamic import inside the interval

Minor perf: import getIngestionById once outside the timer to avoid repeat awaits (module cache still incurs async overhead).

-          const { getIngestionById } = await import("@/db/ingestion")
-          const currentIngestion = await getIngestionById(db, ingestionId!)
+          // Move this import to function top scope:
+          // import { getIngestionById } from "@/db/ingestion"
+          const currentIngestion = await getIngestionById(db, ingestionId!)

1377-1395: Use for…of when iterating arrays

for…in over arrays iterates keys as strings. Prefer for…of to avoid subtle index coercions.

-    for (const channel in existingChannelsToIngest) {
-      const channelId = existingChannelsToIngest[channel]
+    for (const channelId of existingChannelsToIngest) {
       try {
         const response = await client.conversations.info({ channel: channelId })
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)

1670-1678: "Resume" button is semantically incorrect for cancelled ingestions.

Line 1673 shows a "Resume" button for both failed and cancelled ingestions. While "Resume" makes sense for failed ingestions (retry from where it failed), it's semantically incorrect for cancelled ingestions since the user intentionally stopped them.

Consider using different button text based on the status:

  <Button
    size="sm"
    variant="outline"
    onClick={() => resumeIngestion(ingestion.id, connectorId!)}
    disabled={ingestionLoading}
  >
-   <RotateCcw className="h-3 w-3 mr-1" />
-   Resume
+   {ingestion.status === 'failed' ? (
+     <>
+       <RotateCcw className="h-3 w-3 mr-1" />
+       Retry
+     </>
+   ) : (
+     <>
+       <Play className="h-3 w-3 mr-1" />
+       Restart
+     </>
+   )}
  </Button>
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d480bc4 and f4c113d.

📒 Files selected for processing (9)
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx (9 hunks)
  • server/api/admin.ts (2 hunks)
  • server/api/ingestion.ts (1 hunks)
  • server/db/ingestion.ts (1 hunks)
  • server/db/schema/index.ts (1 hunks)
  • server/db/schema/ingestions.ts (1 hunks)
  • server/integrations/slack/channelIngest.ts (19 hunks)
  • server/server.ts (3 hunks)
  • server/sync-server.ts (8 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • server/api/ingestion.ts
  • server/sync-server.ts
  • server/db/schema/index.ts
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/db/ingestion.ts
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
  • server/integrations/slack/channelIngest.ts
  • server/db/schema/ingestions.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.

Applied to files:

  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (6)
server/db/ingestion.ts (2)
server/types.ts (1)
  • TxnOrClient (319-321)
server/db/schema/ingestions.ts (4)
  • InsertIngestion (146-146)
  • SelectIngestion (147-147)
  • ingestions (39-87)
  • IngestionStatus (148-154)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
server/db/schema/ingestions.ts (1)
  • IngestionStatus (148-154)
server/utils.ts (1)
  • getErrorMessage (103-106)
frontend/src/api.ts (1)
  • api (5-5)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (3)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/db/connector.ts (2)
  • db (3-3)
  • getOAuthConnectorWithCredentials (195-317)
server/api/admin.ts (6)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (2)
  • db (3-3)
  • getConnector (143-164)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/db/ingestion.ts (2)
  • hasActiveIngestion (127-144)
  • createIngestion (15-24)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1196-1785)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/server.ts (1)
server/api/ingestion.ts (4)
  • getIngestionStatusSchema (28-30)
  • cancelIngestionSchema (32-34)
  • pauseIngestionSchema (41-43)
  • resumeIngestionSchema (36-38)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
  • users (27-57)
server/db/schema/connectors.ts (1)
  • connectors (60-109)
server/db/schema/workspaces.ts (1)
  • workspaces (7-22)
🪛 GitHub Actions: TypeScript Build Check
server/integrations/slack/channelIngest.ts

[error] 1319-1319: bunx tsc -b: TS2345: Argument of type 'number | undefined' is not assignable to parameter of type 'number'. Type 'undefined' is not assignable to type 'number'.


[error] 1780-1781: bunx tsc -b: TS2304: Cannot find name 'interval'.

🔇 Additional comments (3)
server/server.ts (2)

1177-1197: LGTM: ingestion management routes and method-aware proxy

GET/POST routing with zod validation and cookie-forwarding proxy looks solid.


1443-1497: LGTM: proxyToSyncServer GET/POST handling

Query propagation for GET and JSON forwarding for POST are handled cleanly; cookie-based auth preserved.

frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)

1563-1602: Inaccurate recommendation to remove fallback logic
The backend only delivers progress via metadata.slack.websocketData.progress (and legacy metadata.slack.ingestionState); ingestion.progress isn’t populated. The existing fallbacks are required.

Likely an incorrect or invalid review comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
server/integrations/slack/channelIngest.ts (3)

1819-1827: Await the connector status transaction

The transaction isn’t awaited; the function may return before the update commits.

-    db.transaction(async (trx) => {
+    await db.transaction(async (trx) => {
       await trx
         .update(connectors)
         .set({
           status: ConnectorStatus.Connected,
           state: JSON.stringify({}),
         })
         .where(eq(connectors.id, connector.id))
     })

706-714: Avoid duplicate replies on resume and honor pause during replies

  • Thread fetch ignores timestamp, so resume can reprocess old replies.
  • No pause/cancel check inside replies loop.

Apply:

-          const threadMessages: SlackMessage[] = await fetchThreadMessages(
+          const threadMessages: SlackMessage[] = await fetchThreadMessages(
             client,
             channelId,
             message.thread_ts,
+            timestamp, // restrict replies to resume boundary
           )
@@
-          for (const reply of replies) {
+          for (const reply of replies) {
+            if (checkCancellationOrPause) {
+              const { shouldStop } = await checkCancellationOrPause()
+              if (shouldStop) {
+                loggerWithChild({ email }).info(
+                  `Reply processing stopped due to pause/cancellation/deletion in channel ${channelId}`,
+                )
+                return
+              }
+            }

Also applies to: 723-741


1153-1155: Fix reaction count reduce precedence/nullability

Ensure count is safely summed.

-      reactions: message.reactions?.reduce((acc, curr) => {
-        return acc + (curr as Reaction).count! || 0
-      }, 0),
+      reactions: message.reactions?.reduce((acc, curr) => {
+        const c = (curr as Reaction).count ?? 0
+        return acc + c
+      }, 0),
♻️ Duplicate comments (1)
server/integrations/slack/channelIngest.ts (1)

1353-1441: Interval hoisting/cleanup + ingestionId narrowing look good

The interval is hoisted and cleared in finally; ingestionId is narrowed before use in the interval callback. This addresses prior TS/build and leak issues.

🧹 Nitpick comments (4)
server/integrations/slack/channelIngest.ts (4)

1443-1460: Use for...of when iterating arrays

for...in iterates keys (and inherited props). Prefer for...of for array values.

-    for (const channel in existingChannelsToIngest) {
-      const channelId = existingChannelsToIngest[channel] // Get the channel ID string using the index from the for...in loop
+    for (const channelId of existingChannelsToIngest) {

503-506: Reduce hot‑loop log volume and PII exposure

  • Many info logs inside per‑message/reply loops; can flood logs and slow ingestion.
  • Email logged in every line; consider redacting or downgrading to debug.

Suggestions:

  • Gate with a verbose/debug flag or lower to debug.
  • Redact emails (e.g., hash or mask user part) at info level.

Also applies to: 561-567, 719-721, 852-855


211-225: Remove unused helper

replaceMentionsIfPresent isn’t used. Delete or integrate to avoid dead code.


245-246: Typo in log message

“data rage” → “date range”.

-    `starting the slack ingestion for data rage ${startDate} -> ${endDate}`,
+    `starting the slack ingestion for date range ${startDate} -> ${endDate}`,
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f4c113d and beab827.

📒 Files selected for processing (1)
  • server/integrations/slack/channelIngest.ts (19 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/integrations/slack/channelIngest.ts
🧬 Code graph analysis (1)
server/integrations/slack/channelIngest.ts (2)
server/integrations/slack/index.ts (4)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
  • insertChannelMessages (303-534)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/api/admin.ts (1)

1565-1571: Validate non-empty channelsToIngest
Add a check after parsing the payload in server/api/admin.ts:

   const payload = c.req.valid("json") as {
     connectorId: number
     channelsToIngest: string[]
     startDate: string
     endDate: string
     includeBotMessage: boolean
   }

+  if (payload.channelsToIngest.length === 0) {
+    throw new HTTPException(400, {
+      message: "At least one channel must be specified for ingestion"
+    })
+  }
server/integrations/slack/channelIngest.ts (1)

694-855: Pause/cancel check missing inside thread reply loop

You added checkCancellationOrPause before each top-level message, but thread replies bypass it. A pause/cancel issued while processing replies will be ignored until the whole thread finishes, defeating resumability guarantees (esp. for long threads). Please call checkCancellationOrPause inside the replies loop (mirroring the top-level message check) so we exit promptly when paused/cancelled.

           for (const reply of replies) {
+            if (checkCancellationOrPause) {
+              const { shouldStop } = await checkCancellationOrPause()
+              if (shouldStop) {
+                loggerWithChild({ email }).info(
+                  `Reply processing stopped due to pause/cancellation/deletion in channel ${channelId}`,
+                )
+                return
+              }
+            }
♻️ Duplicate comments (1)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)

44-103: Type duplication issue persists from previous review.

These local type definitions (DbIngestionStatus, SlackIngestionMetadata, IngestionData, IngestionStatusResponse, ProgressData) duplicate backend types and increase maintenance burden. As flagged in a previous review, these should be imported from the backend schema to ensure type consistency.

Based on relevant snippets, SlackIngestionMetadata is exported from server/db/schema/ingestions.ts (line 154). Import and reuse backend types instead of redefining them here.

🧹 Nitpick comments (3)
server/integrations/slack/channelIngest.ts (1)

494-688: Excessive per-message logging will bury real issues

The new per-message logs (Processing message…, “Formatting special mentions…”, “Completed text processing”, etc.) run once for every Slack message and reply. At ingestion scale (tens of thousands of events) this will flood structured logs, drive costs up, and hide real errors. Please drop these debug logs or lower them behind a trace flag before merging.

frontend/src/routes/_authenticated/admin/integrations/slack.tsx (2)

1554-1774: Consider refactoring for improved maintainability.

The renderIngestionStatus function is quite long (220 lines) and calls getProgressData multiple times (lines 1621, 1673, 1718, 1760). Consider these improvements:

  1. Memoize progress data: Extract getProgressData(ingestion) once at the start of the function and reuse the result.
  2. Break down by status: Extract status-specific rendering into smaller helper functions (e.g., renderInProgressStatus, renderPausedStatus, etc.).

Example refactor to memoize progress data:

 const renderIngestionStatus = () => {
   // ... loading and error checks ...
   
   if (!ingestionStatus?.hasActiveIngestion) {
     return null
   }

   const ingestion = ingestionStatus.ingestion!
+  const progressData = getProgressData(ingestion)
   
   if (ingestion.status === 'in_progress') {
     return (
       <div className="mb-4 p-4 bg-blue-50 dark:bg-blue-900/20 border border-blue-200 dark:border-blue-800 rounded-lg">
         {/* ... */}
         {(() => {
-          const progressData = getProgressData(ingestion);
           if (progressData && (progressData.totalChannels || progressData.processedChannels)) {
             // ... use progressData ...

Apply similar changes to other status checks that use getProgressData.


1781-1872: Consider simplifying the form visibility condition.

The conditional logic at line 1781 is quite complex and hard to read at a glance:

{!ingestionStatus?.hasActiveIngestion || !['in_progress', 'paused', 'failed'].includes(ingestionStatus?.ingestion?.status || '') ? (

Consider extracting this into a clearer constant:

+ const shouldShowForm = !ingestionStatus?.hasActiveIngestion || 
+   !ingestionStatus?.ingestion?.status ||
+   !['in_progress', 'paused', 'failed'].includes(ingestionStatus.ingestion.status)
+
  return (
    <div className="space-y-4">
      {renderIngestionStatus()}
      
-     {!ingestionStatus?.hasActiveIngestion || !['in_progress', 'paused', 'failed'].includes(ingestionStatus?.ingestion?.status || '') ? (
+     {shouldShowForm ? (
        <form

This makes the intent clearer: "show the form unless there's an active ingestion that's currently in progress, paused, or failed."

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between beab827 and 0a57de9.

📒 Files selected for processing (4)
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx (9 hunks)
  • server/api/admin.ts (3 hunks)
  • server/api/ingestion.ts (1 hunks)
  • server/integrations/slack/channelIngest.ts (19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/api/ingestion.ts
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/integrations/slack/channelIngest.ts
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.

Applied to files:

  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (3)
server/api/admin.ts (5)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/db/connector.ts (2)
  • db (3-3)
  • getConnector (143-164)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/db/ingestion.ts (2)
  • hasActiveIngestion (127-144)
  • createIngestion (15-24)
server/integrations/slack/channelIngest.ts (1)
  • handleSlackChannelIngestion (1214-1944)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (4)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
  • insertChannelMessages (303-534)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/integrations/slack/sync.ts (1)
  • insertChannelMessages (105-321)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
server/db/schema/ingestions.ts (1)
  • SlackIngestionMetadata (155-155)
frontend/src/api.ts (1)
  • api (5-5)
server/utils.ts (1)
  • getErrorMessage (103-106)
🔇 Additional comments (10)
server/api/admin.ts (4)

1471-1474: LGTM: Security check implemented correctly.

The connector ownership validation prevents cross-tenant access by verifying both userId and workspaceId match before allowing ingestion. This addresses the security concern raised in previous reviews.


1574-1594: LGTM: User and connector validation with proper authorization.

The validation flow correctly:

  • Retrieves user from JWT token
  • Validates connector existence
  • Enforces tenant isolation by checking both userId and workspaceId

This prevents unauthorized cross-tenant ingestion access.


1608-1638: LGTM: Comprehensive metadata structure for resumability.

The metadata design is well-structured:

  • websocketData.progress: Frontend progress tracking (total/processed channels and messages)
  • ingestionState: All parameters needed to resume (channels, dates, current position)

This enables pause/resume functionality effectively.


1640-1656: Approve with note: Async ingestion with proper error logging.

The background ingestion pattern is correct:

  • Immediate return to user with ingestion ID
  • Async processing with .catch() for error logging
  • No blocking of API response

The relevant code snippet confirms handleSlackChannelIngestion updates the status early, preventing indefinite "pending" states on failures.

frontend/src/routes/_authenticated/admin/integrations/slack.tsx (6)

850-866: LGTM! Clean polling setup.

The ingestion state management and polling setup is well-structured. Using refs for pollingIntervalRef, previousStatusRef, and isFetchingRef correctly avoids stale closure issues.


868-958: LGTM! Previous review issues resolved.

The fetchIngestionStatus function correctly addresses all previous review concerns:

  • ✅ Uses typed API client (api.ingestion.status.$get) instead of raw fetch
  • ✅ Checks status existence before includes() to avoid empty string matching
  • ✅ Uses refs (pollingIntervalRef.current, currentConnectorId) to avoid stale closures
  • ✅ Correct dependency array with only stopPolling

The implementation is now robust and safe.


961-998: LGTM! Correctly uses typed API client and dependencies.

The startChannelIngestion function correctly addresses previous review concerns by including fetchIngestionStatus in the dependency array and using the typed API client for role-based routing.


1139-1151: LGTM! Correct effect dependencies and cleanup.

The effects correctly include fetchIngestionStatus in dependencies and properly clean up polling on unmount. This addresses the previous review concern about missing dependencies.


1486-1494: LGTM! Well-designed props interface.

The enhanced ManualIngestionFormProps interface clearly exposes ingestion management functionality with appropriate types and separation of concerns.


1001-1092: Keep .toString() for ingestionId The backend Zod schemas for resume, cancel, and pause ingestion use ingestionId: z.string(), so sending a string is required; no changes needed.

Likely an incorrect or invalid review comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/integrations/slack/channelIngest.ts (1)

723-861: Add cancellation/pause check inside thread reply processing loop.

Thread replies are processed in a loop without checking for pause/cancellation between replies. For threads with many replies, this delays responsiveness when users attempt to pause or cancel ingestion.

Insert a cancellation check at the start of the reply loop:

           for (const reply of replies) {
+            // Check for pause/cancellation before processing each reply
+            if (checkCancellationOrPause) {
+              const { shouldStop } = await checkCancellationOrPause()
+              if (shouldStop) {
+                loggerWithChild({ email }).info(
+                  `Reply processing stopped due to pause/cancellation in channel ${channelId}`
+                )
+                return
+              }
+            }
+
             // Check if reply should be processed based on includeBotMessages flag
             const isRegularReply =
🧹 Nitpick comments (4)
server/integrations/slack/channelIngest.ts (4)

504-505: Consider reducing log verbosity for per-message processing.

Logging detailed information for every message processed will generate excessive log volume in channels with thousands of messages. Consider logging at intervals (e.g., every 100 messages) or using debug level.

For example, replace per-message logs with periodic logging:

-        loggerWithChild({ email }).info(
-          `Message counter update: added ${response.messages.length} messages, total now ${messageCounters.totalMessages.value} (was ${previousTotal})`
-        )
+        if (messageCounters.totalMessages.value % 100 === 0) {
+          loggerWithChild({ email }).info(
+            `Message counter update: total now ${messageCounters.totalMessages.value}`
+          )
+        }

Also applies to: 562-563, 675-675, 690-690, 720-721, 853-854


1479-1484: Simplify redundant existence check condition.

The filter condition can be simplified for clarity:

     const conversationsToInsert = conversations.filter(
       (conversation) =>
-        (existenceMap[conversation.id!] &&
-          !existenceMap[conversation.id!].exists) ||
-        !existenceMap[conversation.id!],
+        !existenceMap[conversation.id!]?.exists
     )

1511-1513: Tracker current/total mismatch when resuming.

Setting tracker.setCurrent(resumeFromChannelIndex) uses the index from the original channel list, while tracker.setTotal(conversationsToInsert.length) uses the filtered count. When resuming from index 5 where 3 channels were already ingested, the tracker might display misleading progress (e.g., 5/7 instead of 2/7).

Consider adjusting to track within the filtered list:

-    tracker.setCurrent(resumeFromChannelIndex)
-    tracker.setTotal(conversationsToInsert.length)
+    tracker.setCurrent(0) // Starting from beginning of filtered list
+    tracker.setTotal(conversationsToProcess.length)

1520-1521: Optimize inefficient indexOf lookup in loop.

Calling existingChannelsToIngest.indexOf(conversation.id!) for each conversation is O(n) per iteration. For large channel lists, this becomes inefficient.

Create an index map once before the loop:

+    // Create index map for efficient lookup
+    const channelIndexMap = new Map<string, number>()
+    existingChannelsToIngest.forEach((id, idx) => channelIndexMap.set(id, idx))
+
     for (const conversation of conversationsToProcess) {
       // Update conversationIndex to match position in existingChannelsToIngest
-      conversationIndex = existingChannelsToIngest.indexOf(conversation.id!)
+      conversationIndex = channelIndexMap.get(conversation.id!) ?? conversationIndex
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0a57de9 and 164093c.

📒 Files selected for processing (2)
  • server/api/admin.ts (2 hunks)
  • server/integrations/slack/channelIngest.ts (19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/api/admin.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/integrations/slack/channelIngest.ts
🧬 Code graph analysis (1)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (4)
  • extractUserIdsFromBlocks (250-270)
  • formatSlackSpecialMentions (271-298)
  • getConversationUsers (636-678)
  • insertChannelMessages (303-534)
server/db/ingestion.ts (3)
  • getIngestionById (110-121)
  • updateIngestionStatus (52-85)
  • updateIngestionMetadata (90-105)
server/db/connector.ts (2)
  • db (3-3)
  • getOAuthConnectorWithCredentials (195-317)

@shivamashtikar shivamashtikar merged commit 4219d8b into main Oct 17, 2025
4 checks passed
@shivamashtikar shivamashtikar deleted the slack-channel-resumability branch October 17, 2025 13:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants