-
Notifications
You must be signed in to change notification settings - Fork 56
feat(slack-channel-resumability): Added functionality to pause and re… #1107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds DB-backed, resumable Slack channel ingestion end-to-end: DB schema and accessors, ingestion lifecycle APIs (status/pause/resume/cancel/start), background resumable ingestion logic with metadata persistence, main-server proxy routing, and frontend polling/UI controls integrated into the Slack admin route and ManualIngestionForm. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor UI as Frontend (Slack Admin)
participant Main as Main Server
participant Sync as Sync Server
participant DB as Database
participant Worker as Ingest Worker
participant Slack as Slack API
UI->>Main: POST /admin/ingest-more-channel (start)
Main->>DB: createIngestion(status=pending, metadata)
Main->>Worker: start handleSlackChannelIngestion(..., ingestionId)
Main-->>UI: 200 {ingestionId}
loop Polling
UI->>Main: GET /ingestion/status?connectorId=...
Main->>Sync: proxy GET /ingestion/status
Sync->>DB: fetch active/latest ingestion
DB-->>Sync: ingestion payload
Sync-->>Main: status payload
Main-->>UI: status payload
end
rect rgba(200,240,200,0.25)
Worker->>DB: persist progress & metadata
Worker->>Slack: fetch channels/messages
alt paused / cancelled
Worker->>DB: updateIngestionStatus(paused/cancelled)
else completed
Worker->>DB: updateIngestionStatus(completed)
end
end
sequenceDiagram
autonumber
actor UI as Frontend
participant Sync as Sync Server
participant DB as Database
participant Worker as Ingest Worker
UI->>Sync: POST /ingestion/pause {ingestionId}
Sync->>DB: updateIngestionStatus(paused)
Sync-->>UI: 200
UI->>Sync: POST /ingestion/resume {ingestionId}
Sync->>DB: getIngestionById(ingestionId)
Sync->>Worker: handleSlackChannelIngestion(..., ingestionId)
Sync-->>UI: 200
UI->>Sync: POST /ingestion/cancel {ingestionId}
Sync->>DB: updateIngestionStatus(cancelled)
Sync-->>UI: 200
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (1)📚 Learning: 2025-07-04T07:00:46.837Z
Applied to files:
🧬 Code graph analysis (1)server/integrations/slack/channelIngest.ts (4)
🔇 Additional comments (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @MohdShoaib-18169, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a critical enhancement to the Slack channel ingestion feature by making it resumable and controllable. It provides users with the flexibility to manage their data imports more effectively, allowing them to pause and restart processes without losing progress. The changes span both frontend and backend, introducing new database structures, API endpoints, and UI components to support this robust functionality. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature for Slack integration: resumable channel ingestion. The changes include a new database table to track ingestion state, new APIs for managing the ingestion lifecycle (pause, resume, cancel, delete), and a polling mechanism on the frontend to display real-time progress. The implementation is comprehensive, but I've identified a critical runtime error in the ingestion logic that needs to be addressed, as well as a high-severity bug in the frontend related to React hooks that could lead to incorrect behavior. Additionally, there are several medium-severity issues concerning code quality, leftover debug code, and data consistency that should be reviewed.
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/integrations/slack/channelIngest.ts (1)
448-509
: Fix pause-check integration in message loop
checkCancellationOrPause
is referenced insideinsertChannelMessages
, but that helper is defined insidehandleSlackChannelIngestion
’s scope. TypeScript rightfully fails to compile (TS2304
). At the same time, the early-exit path currently returns an object even though the function promisesvoid
, which triggers the second pipeline error. Please pass the checker down from the caller and exit with a barereturn
.export async function insertChannelMessages( email: string, client: WebClient, channelId: string, abortController: AbortController, memberMap: Map<string, User>, tracker: Tracker, timestamp: string = "0", channelMap: Map<string, string>, startDate: string, endDate: string, includeBotMessages: boolean = false, - messageCounters?: { totalMessages: { value: number }, processedMessages: { value: number } }, + messageCounters?: { totalMessages: { value: number }, processedMessages: { value: number } }, + checkCancellationOrPause?: () => Promise<{ shouldStop: boolean; isPaused: boolean }>, ): Promise<void> { @@ - const { shouldStop } = await checkCancellationOrPause() - if (shouldStop) { + if (checkCancellationOrPause) { + const { shouldStop } = await checkCancellationOrPause() + if (shouldStop) { loggerWithChild({ email }).info(`Message processing stopped due to pause/cancellation/deletion in channel ${channelId}`) - return { messages: [], files: [], channel: channelId } + return + } }Remember to pass the new callback when you invoke
insertChannelMessages
insidehandleSlackChannelIngestion
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
(10 hunks)server/api/admin.ts
(2 hunks)server/api/ingestion.ts
(1 hunks)server/db/ingestion.ts
(1 hunks)server/db/schema/index.ts
(1 hunks)server/db/schema/ingestions.ts
(1 hunks)server/integrations/slack/channelIngest.ts
(12 hunks)server/server.ts
(3 hunks)server/sync-server.ts
(9 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/sync-server.ts
server/db/schema/ingestions.ts
server/integrations/slack/channelIngest.ts
server/api/ingestion.ts
server/db/ingestion.ts
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.
Applied to files:
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/sync-server.ts (1)
server/api/ingestion.ts (10)
getIngestionStatusSchema
(27-29)GetIngestionStatusApi
(50-92)cancelIngestionSchema
(31-33)CancelIngestionApi
(97-142)pauseIngestionSchema
(43-45)PauseIngestionApi
(147-192)resumeIngestionSchema
(35-37)ResumeIngestionApi
(197-288)deleteIngestionSchema
(39-41)DeleteIngestionApi
(293-339)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
users
(27-57)server/db/schema/connectors.ts (1)
connectors
(60-109)server/db/schema/workspaces.ts (1)
workspaces
(7-22)
server/integrations/slack/channelIngest.ts (3)
server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)server/db/connector.ts (2)
db
(3-3)getOAuthConnectorWithCredentials
(195-317)server/metrics/slack/slack-metrics.ts (2)
totalConversationsSkipped
(109-113)totalConversationsToBeInserted
(93-97)
server/server.ts (1)
server/api/ingestion.ts (5)
getIngestionStatusSchema
(27-29)cancelIngestionSchema
(31-33)pauseIngestionSchema
(43-45)resumeIngestionSchema
(35-37)deleteIngestionSchema
(39-41)
server/api/ingestion.ts (5)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)getLoggerWithChild
(192-200)server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (1)
getConnector
(143-164)server/db/ingestion.ts (4)
getActiveIngestionForUser
(29-47)getIngestionById
(110-121)updateIngestionStatus
(52-85)deleteIngestion
(126-133)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1159-1588)
server/api/admin.ts (6)
server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (2)
db
(3-3)getConnector
(143-164)server/errors/index.ts (1)
NoUserFound
(322-327)server/db/ingestion.ts (2)
hasActiveIngestion
(138-155)createIngestion
(15-24)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1159-1588)server/utils.ts (1)
getErrorMessage
(103-106)
server/db/ingestion.ts (2)
server/types.ts (1)
TxnOrClient
(319-319)server/db/schema/ingestions.ts (4)
InsertIngestion
(146-146)SelectIngestion
(147-147)ingestions
(39-87)IngestionStatus
(148-148)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (5)
server/db/schema/ingestions.ts (1)
IngestionStatus
(148-148)frontend/src/hooks/use-toast.ts (1)
toast
(201-201)server/utils.ts (1)
getErrorMessage
(103-106)frontend/src/api.ts (1)
api
(5-5)server/db/ingestion.ts (1)
deleteIngestion
(126-133)
🪛 GitHub Actions: TypeScript Build Check
server/integrations/slack/channelIngest.ts
[error] 501-501: TS2304: Cannot find name 'checkCancellationOrPause'.
[error] 504-504: TS2322: Type '{ messages: never[]; files: never[]; channel: string; }' is not assignable to type 'void'.
server/api/ingestion.ts
[error] 23-23: TS2551: Property 'API' does not exist on type 'typeof Subsystem'. Did you mean 'Api'?
[error] 24-24: TS2551: Property 'API' does not exist on type 'typeof Subsystem'. Did you mean 'Api'?
server/api/admin.ts
[error] 1581-1581: TS2345: Argument of type 'number' is not assignable to parameter of type 'string'.
[error] 1634-1634: TS2345: Argument of type 'number' is not assignable to parameter of type 'string'.
e25e9ef
to
d480bc4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (7)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
593-597
: Typo: “Slack OAuthA” → “Slack OAuth”Already flagged earlier; please correct.
- <CardTitle>Slack OAuthA</CardTitle> + <CardTitle>Slack OAuth</CardTitle>
807-904
: Remove noisy console logs before mergeThese console.log statements should be removed or gated behind dev flags to avoid noisy prod logs.
1576-1578
: Normalize progress source (backend) to one fieldReading progress from three locations signals backend inconsistency. Prefer a single canonical field.
server/api/admin.ts (1)
1581-1584
: Remove parseInt on numeric connectorId (build breaker)payload.connectorId is already a number; parseInt causes TS errors. Forward the number (or use connector.id).
- const connector = await getConnector(db, parseInt(payload.connectorId)) + const connector = await getConnector(db, payload.connectorId) … - handleSlackChannelIngestion( - parseInt(payload.connectorId), + handleSlackChannelIngestion( + connector.id,Also applies to: 1634-1641
server/api/ingestion.ts (2)
48-51
: Address the@ts-ignore
with proper typing.As noted in the previous review,
@ts-ignore
directives can be avoided by properly typing the Hono context.
23-25
: Fix TypeScript error: useSubsystem.Api
instead ofSubsystem.API
.The
Subsystem
enum does not have anAPI
member. Change both references toSubsystem.Api
.Apply this diff to fix the TypeScript error:
-const Logger = getLogger(Subsystem.API) -const loggerWithChild = getLoggerWithChild(Subsystem.API) +const Logger = getLogger(Subsystem.Api) +const loggerWithChild = getLoggerWithChild(Subsystem.Api)server/integrations/slack/channelIngest.ts (1)
1359-1367
: AddincludeBotMessages
to ingestion state for resumability.As noted in the previous review, the
includeBotMessages
flag is not persisted in theingestionState
, so resumed runs will default tofalse
and skip bot messages even if they were originally included.Apply this diff:
ingestionState: { currentChannelId: existingChannelsToIngest[processedChannels], channelsToIngest: existingChannelsToIngest, startDate: existingStartDate, endDate: existingEndDate, + includeBotMessages: includeBotMessages, currentChannelIndex: processedChannels, lastMessageTimestamp: globalLastMessageTimestamp, lastUpdated: new Date().toISOString(), },
🧹 Nitpick comments (10)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
794-795
: Fix browser typing for interval refUsing NodeJS.Timeout in a React browser app can cause TS/type issues. Use ReturnType.
- const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null) + const pollingIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
946-1047
: Unify on typed API client instead of raw fetchThese ingestion calls bypass the existing typed api client. Prefer api.* helpers for consistency (auth, error handling), e.g., api.ingestion.resume.$post(…) if exposed, or add typed routes.
1553-1571
: Avoid non-null assertions on connectorId in action handlersconnectorId! may throw if undefined. Guard or early-return.
- onClick={() => pauseIngestion(ingestion.id, connectorId!)} + onClick={() => connectorId && pauseIngestion(ingestion.id, connectorId)}Also applies to: 1652-1669
server/db/schema/ingestions.ts (1)
85-87
: Add index for frequent status lookups (userId, connectorId, status)getActiveIngestionForUser filters by userId+connectorId+status; add an index to avoid scans at scale.
import { serial, pgTable, text, integer, timestamp, jsonb, pgEnum, + index, } from "drizzle-orm/pg-core" … export const ingestions = pgTable( "ingestions", { … }, - // No unique constraints - application logic will handle preventing concurrent active ingestions -) + (t) => ({ + // Accelerate active-ingestion queries + userConnectorStatusIdx: index("ing_user_connector_status_idx").on( + t.userId, + t.connectorId, + t.status, + ), + }), +)server/server.ts (1)
1434-1488
: Add timeout/abort to sync-server proxy callsProtect upstream calls with an AbortController (e.g., 30s) to avoid hanging requests on sync-server issues.
- const response = await fetch(url, requestConfig) + const ac = new AbortController() + const t = setTimeout(() => ac.abort(), 30_000) + const response = await fetch(url, { ...requestConfig, signal: ac.signal }) + clearTimeout(t)server/db/ingestion.ts (3)
37-43
: Prefer Drizzle'sinArray
for type safety.The raw SQL for status filtering bypasses Drizzle's type system and is prone to typos or mismatches with the schema enum.
Apply this diff to use Drizzle's type-safe operator:
+import { eq, and, sql, inArray } from "drizzle-orm" + export const getActiveIngestionForUser = async ( txn: TxnOrClient, userId: number, connectorId: number ): Promise<SelectIngestion | null> => { const result = await txn .select() .from(ingestions) .where( and( eq(ingestions.userId, userId), eq(ingestions.connectorId, connectorId), - sql`status IN ('in_progress', 'paused', 'failed')` + inArray(ingestions.status, ['in_progress', 'paused', 'failed']) ) ) .limit(1) return (result[0] as SelectIngestion) || null }
90-93
: Consider typing the metadata parameter.The
metadata
parameter is typed asany
, which bypasses type checking. Consider using the metadata shape from the schema or a generic type parameter.Apply this diff to improve type safety:
export const updateIngestionMetadata = async ( txn: TxnOrClient, ingestionId: number, - metadata: any + metadata: unknown ): Promise<SelectIngestion> => {Using
unknown
requires explicit type checking before use, providing better safety thanany
.
135-141
: Prefer Drizzle'sinArray
for consistency.Similar to
getActiveIngestionForUser
, use Drizzle's type-safe operator instead of raw SQL.Apply this diff:
const result = await txn .select({ count: sql<number>`count(*)` }) .from(ingestions) .where( and( eq(ingestions.userId, userId), eq(ingestions.connectorId, connectorId), - sql`status IN ('pending', 'in_progress', 'paused', 'failed')` + inArray(ingestions.status, ['pending', 'in_progress', 'paused', 'failed']) ) )server/integrations/slack/channelIngest.ts (1)
505-676
: Consider reducing logging verbosity for production.The message processing loop contains extensive debug-style logging (e.g., "Processing message X/Y", "Found N mentions", "Successfully fetched user info", etc.). While helpful during development and initial rollout, consider:
- Using debug-level logs for detailed operation tracking
- Keeping only error and key milestone logs at info level
- Adding a configurable log level for this feature
This will reduce log volume and costs in production while maintaining debuggability when needed.
server/api/ingestion.ts (1)
260-285
: Align variable naming with the ingestion function signature.
Rename the localincludeBotMessage
constant toincludeBotMessages
(plural) when reading from state and passing it tohandleSlackChannelIngestion
to match its parameter name.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
(10 hunks)server/api/admin.ts
(2 hunks)server/api/ingestion.ts
(1 hunks)server/db/ingestion.ts
(1 hunks)server/db/schema/index.ts
(1 hunks)server/db/schema/ingestions.ts
(1 hunks)server/integrations/slack/channelIngest.ts
(19 hunks)server/server.ts
(3 hunks)server/sync-server.ts
(9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/db/schema/index.ts
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/db/ingestion.ts
server/integrations/slack/channelIngest.ts
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
server/api/ingestion.ts
server/db/schema/ingestions.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.
Applied to files:
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/server.ts (1)
server/api/ingestion.ts (4)
getIngestionStatusSchema
(28-30)cancelIngestionSchema
(32-34)pauseIngestionSchema
(41-43)resumeIngestionSchema
(36-38)
server/api/admin.ts (6)
server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (2)
db
(3-3)getConnector
(143-164)server/errors/index.ts (1)
NoUserFound
(322-327)server/db/ingestion.ts (2)
hasActiveIngestion
(127-144)createIngestion
(15-24)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1196-1773)server/utils.ts (1)
getErrorMessage
(103-106)
server/db/ingestion.ts (2)
server/types.ts (1)
TxnOrClient
(319-319)server/db/schema/ingestions.ts (4)
InsertIngestion
(146-146)SelectIngestion
(147-147)ingestions
(39-87)IngestionStatus
(148-148)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (3)
extractUserIdsFromBlocks
(250-270)formatSlackSpecialMentions
(271-298)getConversationUsers
(636-678)server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)server/db/connector.ts (2)
db
(3-3)getOAuthConnectorWithCredentials
(195-317)
server/sync-server.ts (1)
server/api/ingestion.ts (8)
getIngestionStatusSchema
(28-30)GetIngestionStatusApi
(48-109)cancelIngestionSchema
(32-34)CancelIngestionApi
(114-159)pauseIngestionSchema
(41-43)PauseIngestionApi
(164-209)resumeIngestionSchema
(36-38)ResumeIngestionApi
(214-305)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (9)
server/db/schema/ingestions.ts (1)
IngestionStatus
(148-148)frontend/src/components/ui/card.tsx (1)
CardTitle
(76-76)frontend/src/hooks/use-toast.ts (1)
toast
(201-201)server/utils.ts (1)
getErrorMessage
(103-106)frontend/src/api.ts (1)
api
(5-5)frontend/src/components/ui/button.tsx (1)
Button
(57-57)frontend/src/components/ui/progress.tsx (1)
Progress
(26-26)frontend/src/components/ui/label.tsx (1)
Label
(24-24)frontend/src/components/ui/input.tsx (1)
Input
(25-25)
server/api/ingestion.ts (6)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)getLoggerWithChild
(192-200)server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (1)
getConnector
(143-164)server/db/ingestion.ts (3)
getActiveIngestionForUser
(29-47)getIngestionById
(110-121)updateIngestionStatus
(52-85)server/db/schema/ingestions.ts (2)
ingestions
(39-87)SelectIngestion
(147-147)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1196-1773)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
users
(27-57)server/db/schema/connectors.ts (1)
connectors
(60-109)server/db/schema/workspaces.ts (1)
workspaces
(7-22)
🔇 Additional comments (1)
server/db/ingestion.ts (1)
58-66
: Clarify thestartedAt
condition.The condition
status === "in_progress" && !errorMessage
(line 64) is unclear. If the intent is to setstartedAt
only on the first transition toin_progress
(not when resuming), this should be documented or the condition should check ifstartedAt
is already set.Please verify: should
startedAt
be set every time status becomesin_progress
, or only the first time? If the latter, consider checking whetherstartedAt
is already null before setting it.Additionally, consider typing
updateData
explicitly instead of usingany
for better type safety:- const updateData: any = { + const updateData: Partial<typeof ingestions.$inferInsert> = { status, updatedAt: sql`NOW()`, }
…sume the channel ingestion-1
d480bc4
to
f4c113d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (5)
server/db/schema/ingestions.ts (1)
37-87
: Add composite index for active‑ingestion checksQueries repeatedly filter by userId+connectorId+status. Add an index to prevent seq scans at scale.
-import { +import { serial, pgTable, text, integer, timestamp, jsonb, pgEnum, + index, } from "drizzle-orm/pg-core" @@ -export const ingestions = pgTable( - "ingestions", - { +export const ingestions = pgTable( + "ingestions", + { // Primary key for the ingestion record id: serial("id").notNull().primaryKey(), @@ }, - // No unique constraints - application logic will handle preventing concurrent active ingestions -) + // No unique constraints - application logic will handle preventing concurrent active ingestions + (t) => ({ + userConnectorStatusIdx: index("ingestions_user_connector_status_idx").on( + t.userId, + t.connectorId, + t.status, + ), + }), +)server/db/ingestion.ts (1)
123-144
: Revisit “active” definition (include/exclude failed)Including failed in “active” blocks restarts until manual intervention. If you want quick retries, exclude 'failed' here (keep it in getActiveIngestionForUser for resume UI).
server/integrations/slack/channelIngest.ts (2)
1316-1324
: Avoid dynamic import inside the intervalMinor perf: import getIngestionById once outside the timer to avoid repeat awaits (module cache still incurs async overhead).
- const { getIngestionById } = await import("@/db/ingestion") - const currentIngestion = await getIngestionById(db, ingestionId!) + // Move this import to function top scope: + // import { getIngestionById } from "@/db/ingestion" + const currentIngestion = await getIngestionById(db, ingestionId!)
1377-1395
: Use for…of when iterating arraysfor…in over arrays iterates keys as strings. Prefer for…of to avoid subtle index coercions.
- for (const channel in existingChannelsToIngest) { - const channelId = existingChannelsToIngest[channel] + for (const channelId of existingChannelsToIngest) { try { const response = await client.conversations.info({ channel: channelId })frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)
1670-1678
: "Resume" button is semantically incorrect for cancelled ingestions.Line 1673 shows a "Resume" button for both failed and cancelled ingestions. While "Resume" makes sense for failed ingestions (retry from where it failed), it's semantically incorrect for cancelled ingestions since the user intentionally stopped them.
Consider using different button text based on the status:
<Button size="sm" variant="outline" onClick={() => resumeIngestion(ingestion.id, connectorId!)} disabled={ingestionLoading} > - <RotateCcw className="h-3 w-3 mr-1" /> - Resume + {ingestion.status === 'failed' ? ( + <> + <RotateCcw className="h-3 w-3 mr-1" /> + Retry + </> + ) : ( + <> + <Play className="h-3 w-3 mr-1" /> + Restart + </> + )} </Button>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
(9 hunks)server/api/admin.ts
(2 hunks)server/api/ingestion.ts
(1 hunks)server/db/ingestion.ts
(1 hunks)server/db/schema/index.ts
(1 hunks)server/db/schema/ingestions.ts
(1 hunks)server/integrations/slack/channelIngest.ts
(19 hunks)server/server.ts
(3 hunks)server/sync-server.ts
(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- server/api/ingestion.ts
- server/sync-server.ts
- server/db/schema/index.ts
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/db/ingestion.ts
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
server/integrations/slack/channelIngest.ts
server/db/schema/ingestions.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.
Applied to files:
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (6)
server/db/ingestion.ts (2)
server/types.ts (1)
TxnOrClient
(319-321)server/db/schema/ingestions.ts (4)
InsertIngestion
(146-146)SelectIngestion
(147-147)ingestions
(39-87)IngestionStatus
(148-154)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
server/db/schema/ingestions.ts (1)
IngestionStatus
(148-154)server/utils.ts (1)
getErrorMessage
(103-106)frontend/src/api.ts (1)
api
(5-5)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (3)
extractUserIdsFromBlocks
(250-270)formatSlackSpecialMentions
(271-298)getConversationUsers
(636-678)server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)server/db/connector.ts (2)
db
(3-3)getOAuthConnectorWithCredentials
(195-317)
server/api/admin.ts (6)
server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (2)
db
(3-3)getConnector
(143-164)server/errors/index.ts (1)
NoUserFound
(322-327)server/db/ingestion.ts (2)
hasActiveIngestion
(127-144)createIngestion
(15-24)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1196-1785)server/utils.ts (1)
getErrorMessage
(103-106)
server/server.ts (1)
server/api/ingestion.ts (4)
getIngestionStatusSchema
(28-30)cancelIngestionSchema
(32-34)pauseIngestionSchema
(41-43)resumeIngestionSchema
(36-38)
server/db/schema/ingestions.ts (3)
server/db/schema/users.ts (1)
users
(27-57)server/db/schema/connectors.ts (1)
connectors
(60-109)server/db/schema/workspaces.ts (1)
workspaces
(7-22)
🪛 GitHub Actions: TypeScript Build Check
server/integrations/slack/channelIngest.ts
[error] 1319-1319: bunx tsc -b: TS2345: Argument of type 'number | undefined' is not assignable to parameter of type 'number'. Type 'undefined' is not assignable to type 'number'.
[error] 1780-1781: bunx tsc -b: TS2304: Cannot find name 'interval'.
🔇 Additional comments (3)
server/server.ts (2)
1177-1197
: LGTM: ingestion management routes and method-aware proxyGET/POST routing with zod validation and cookie-forwarding proxy looks solid.
1443-1497
: LGTM: proxyToSyncServer GET/POST handlingQuery propagation for GET and JSON forwarding for POST are handled cleanly; cookie-based auth preserved.
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)
1563-1602
: Inaccurate recommendation to remove fallback logic
The backend only delivers progress viametadata.slack.websocketData.progress
(and legacymetadata.slack.ingestionState
);ingestion.progress
isn’t populated. The existing fallbacks are required.Likely an incorrect or invalid review comment.
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
Outdated
Show resolved
Hide resolved
…sume the channel ingestion-2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
server/integrations/slack/channelIngest.ts (3)
1819-1827
: Await the connector status transactionThe transaction isn’t awaited; the function may return before the update commits.
- db.transaction(async (trx) => { + await db.transaction(async (trx) => { await trx .update(connectors) .set({ status: ConnectorStatus.Connected, state: JSON.stringify({}), }) .where(eq(connectors.id, connector.id)) })
706-714
: Avoid duplicate replies on resume and honor pause during replies
- Thread fetch ignores
timestamp
, so resume can reprocess old replies.- No pause/cancel check inside replies loop.
Apply:
- const threadMessages: SlackMessage[] = await fetchThreadMessages( + const threadMessages: SlackMessage[] = await fetchThreadMessages( client, channelId, message.thread_ts, + timestamp, // restrict replies to resume boundary ) @@ - for (const reply of replies) { + for (const reply of replies) { + if (checkCancellationOrPause) { + const { shouldStop } = await checkCancellationOrPause() + if (shouldStop) { + loggerWithChild({ email }).info( + `Reply processing stopped due to pause/cancellation/deletion in channel ${channelId}`, + ) + return + } + }Also applies to: 723-741
1153-1155
: Fix reaction count reduce precedence/nullabilityEnsure count is safely summed.
- reactions: message.reactions?.reduce((acc, curr) => { - return acc + (curr as Reaction).count! || 0 - }, 0), + reactions: message.reactions?.reduce((acc, curr) => { + const c = (curr as Reaction).count ?? 0 + return acc + c + }, 0),
♻️ Duplicate comments (1)
server/integrations/slack/channelIngest.ts (1)
1353-1441
: Interval hoisting/cleanup + ingestionId narrowing look goodThe interval is hoisted and cleared in finally;
ingestionId
is narrowed before use in the interval callback. This addresses prior TS/build and leak issues.
🧹 Nitpick comments (4)
server/integrations/slack/channelIngest.ts (4)
1443-1460
: Usefor...of
when iterating arrays
for...in
iterates keys (and inherited props). Preferfor...of
for array values.- for (const channel in existingChannelsToIngest) { - const channelId = existingChannelsToIngest[channel] // Get the channel ID string using the index from the for...in loop + for (const channelId of existingChannelsToIngest) {
503-506
: Reduce hot‑loop log volume and PII exposure
- Many info logs inside per‑message/reply loops; can flood logs and slow ingestion.
- Email logged in every line; consider redacting or downgrading to debug.
Suggestions:
- Gate with a verbose/debug flag or lower to debug.
- Redact emails (e.g., hash or mask user part) at info level.
Also applies to: 561-567, 719-721, 852-855
211-225
: Remove unused helper
replaceMentionsIfPresent
isn’t used. Delete or integrate to avoid dead code.
245-246
: Typo in log message“data rage” → “date range”.
- `starting the slack ingestion for data rage ${startDate} -> ${endDate}`, + `starting the slack ingestion for date range ${startDate} -> ${endDate}`,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/integrations/slack/channelIngest.ts
(19 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/integrations/slack/channelIngest.ts
🧬 Code graph analysis (1)
server/integrations/slack/channelIngest.ts (2)
server/integrations/slack/index.ts (4)
extractUserIdsFromBlocks
(250-270)formatSlackSpecialMentions
(271-298)getConversationUsers
(636-678)insertChannelMessages
(303-534)server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)
…sume the channel ingestion-3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
server/api/admin.ts (1)
1565-1571
: Validate non-emptychannelsToIngest
Add a check after parsing the payload inserver/api/admin.ts
:const payload = c.req.valid("json") as { connectorId: number channelsToIngest: string[] startDate: string endDate: string includeBotMessage: boolean } + if (payload.channelsToIngest.length === 0) { + throw new HTTPException(400, { + message: "At least one channel must be specified for ingestion" + }) + }server/integrations/slack/channelIngest.ts (1)
694-855
: Pause/cancel check missing inside thread reply loopYou added
checkCancellationOrPause
before each top-level message, but thread replies bypass it. A pause/cancel issued while processing replies will be ignored until the whole thread finishes, defeating resumability guarantees (esp. for long threads). Please callcheckCancellationOrPause
inside the replies loop (mirroring the top-level message check) so we exit promptly when paused/cancelled.for (const reply of replies) { + if (checkCancellationOrPause) { + const { shouldStop } = await checkCancellationOrPause() + if (shouldStop) { + loggerWithChild({ email }).info( + `Reply processing stopped due to pause/cancellation/deletion in channel ${channelId}`, + ) + return + } + }
♻️ Duplicate comments (1)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (1)
44-103
: Type duplication issue persists from previous review.These local type definitions (
DbIngestionStatus
,SlackIngestionMetadata
,IngestionData
,IngestionStatusResponse
,ProgressData
) duplicate backend types and increase maintenance burden. As flagged in a previous review, these should be imported from the backend schema to ensure type consistency.Based on relevant snippets,
SlackIngestionMetadata
is exported fromserver/db/schema/ingestions.ts
(line 154). Import and reuse backend types instead of redefining them here.
🧹 Nitpick comments (3)
server/integrations/slack/channelIngest.ts (1)
494-688
: Excessive per-message logging will bury real issuesThe new per-message logs (
Processing message…
, “Formatting special mentions…”, “Completed text processing”, etc.) run once for every Slack message and reply. At ingestion scale (tens of thousands of events) this will flood structured logs, drive costs up, and hide real errors. Please drop these debug logs or lower them behind a trace flag before merging.frontend/src/routes/_authenticated/admin/integrations/slack.tsx (2)
1554-1774
: Consider refactoring for improved maintainability.The
renderIngestionStatus
function is quite long (220 lines) and callsgetProgressData
multiple times (lines 1621, 1673, 1718, 1760). Consider these improvements:
- Memoize progress data: Extract
getProgressData(ingestion)
once at the start of the function and reuse the result.- Break down by status: Extract status-specific rendering into smaller helper functions (e.g.,
renderInProgressStatus
,renderPausedStatus
, etc.).Example refactor to memoize progress data:
const renderIngestionStatus = () => { // ... loading and error checks ... if (!ingestionStatus?.hasActiveIngestion) { return null } const ingestion = ingestionStatus.ingestion! + const progressData = getProgressData(ingestion) if (ingestion.status === 'in_progress') { return ( <div className="mb-4 p-4 bg-blue-50 dark:bg-blue-900/20 border border-blue-200 dark:border-blue-800 rounded-lg"> {/* ... */} {(() => { - const progressData = getProgressData(ingestion); if (progressData && (progressData.totalChannels || progressData.processedChannels)) { // ... use progressData ...Apply similar changes to other status checks that use
getProgressData
.
1781-1872
: Consider simplifying the form visibility condition.The conditional logic at line 1781 is quite complex and hard to read at a glance:
{!ingestionStatus?.hasActiveIngestion || !['in_progress', 'paused', 'failed'].includes(ingestionStatus?.ingestion?.status || '') ? (Consider extracting this into a clearer constant:
+ const shouldShowForm = !ingestionStatus?.hasActiveIngestion || + !ingestionStatus?.ingestion?.status || + !['in_progress', 'paused', 'failed'].includes(ingestionStatus.ingestion.status) + return ( <div className="space-y-4"> {renderIngestionStatus()} - {!ingestionStatus?.hasActiveIngestion || !['in_progress', 'paused', 'failed'].includes(ingestionStatus?.ingestion?.status || '') ? ( + {shouldShowForm ? ( <formThis makes the intent clearer: "show the form unless there's an active ingestion that's currently in progress, paused, or failed."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
(9 hunks)server/api/admin.ts
(3 hunks)server/api/ingestion.ts
(1 hunks)server/integrations/slack/channelIngest.ts
(19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/api/ingestion.ts
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/integrations/slack/channelIngest.ts
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.
Applied to files:
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (3)
server/api/admin.ts (5)
server/db/user.ts (1)
getUserByEmail
(148-157)server/db/connector.ts (2)
db
(3-3)getConnector
(143-164)server/errors/index.ts (1)
NoUserFound
(322-327)server/db/ingestion.ts (2)
hasActiveIngestion
(127-144)createIngestion
(15-24)server/integrations/slack/channelIngest.ts (1)
handleSlackChannelIngestion
(1214-1944)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (4)
extractUserIdsFromBlocks
(250-270)formatSlackSpecialMentions
(271-298)getConversationUsers
(636-678)insertChannelMessages
(303-534)server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)server/integrations/slack/sync.ts (1)
insertChannelMessages
(105-321)
frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
server/db/schema/ingestions.ts (1)
SlackIngestionMetadata
(155-155)frontend/src/api.ts (1)
api
(5-5)server/utils.ts (1)
getErrorMessage
(103-106)
🔇 Additional comments (10)
server/api/admin.ts (4)
1471-1474
: LGTM: Security check implemented correctly.The connector ownership validation prevents cross-tenant access by verifying both userId and workspaceId match before allowing ingestion. This addresses the security concern raised in previous reviews.
1574-1594
: LGTM: User and connector validation with proper authorization.The validation flow correctly:
- Retrieves user from JWT token
- Validates connector existence
- Enforces tenant isolation by checking both userId and workspaceId
This prevents unauthorized cross-tenant ingestion access.
1608-1638
: LGTM: Comprehensive metadata structure for resumability.The metadata design is well-structured:
websocketData.progress
: Frontend progress tracking (total/processed channels and messages)ingestionState
: All parameters needed to resume (channels, dates, current position)This enables pause/resume functionality effectively.
1640-1656
: Approve with note: Async ingestion with proper error logging.The background ingestion pattern is correct:
- Immediate return to user with ingestion ID
- Async processing with
.catch()
for error logging- No blocking of API response
The relevant code snippet confirms
handleSlackChannelIngestion
updates the status early, preventing indefinite "pending" states on failures.frontend/src/routes/_authenticated/admin/integrations/slack.tsx (6)
850-866
: LGTM! Clean polling setup.The ingestion state management and polling setup is well-structured. Using refs for
pollingIntervalRef
,previousStatusRef
, andisFetchingRef
correctly avoids stale closure issues.
868-958
: LGTM! Previous review issues resolved.The
fetchIngestionStatus
function correctly addresses all previous review concerns:
- ✅ Uses typed API client (
api.ingestion.status.$get
) instead of raw fetch- ✅ Checks status existence before
includes()
to avoid empty string matching- ✅ Uses refs (
pollingIntervalRef.current
,currentConnectorId
) to avoid stale closures- ✅ Correct dependency array with only
stopPolling
The implementation is now robust and safe.
961-998
: LGTM! Correctly uses typed API client and dependencies.The
startChannelIngestion
function correctly addresses previous review concerns by includingfetchIngestionStatus
in the dependency array and using the typed API client for role-based routing.
1139-1151
: LGTM! Correct effect dependencies and cleanup.The effects correctly include
fetchIngestionStatus
in dependencies and properly clean up polling on unmount. This addresses the previous review concern about missing dependencies.
1486-1494
: LGTM! Well-designed props interface.The enhanced
ManualIngestionFormProps
interface clearly exposes ingestion management functionality with appropriate types and separation of concerns.
1001-1092
: Keep.toString()
for ingestionId The backend Zod schemas for resume, cancel, and pause ingestion useingestionId: z.string()
, so sending a string is required; no changes needed.Likely an incorrect or invalid review comment.
…sume the channel ingestion-4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/integrations/slack/channelIngest.ts (1)
723-861
: Add cancellation/pause check inside thread reply processing loop.Thread replies are processed in a loop without checking for pause/cancellation between replies. For threads with many replies, this delays responsiveness when users attempt to pause or cancel ingestion.
Insert a cancellation check at the start of the reply loop:
for (const reply of replies) { + // Check for pause/cancellation before processing each reply + if (checkCancellationOrPause) { + const { shouldStop } = await checkCancellationOrPause() + if (shouldStop) { + loggerWithChild({ email }).info( + `Reply processing stopped due to pause/cancellation in channel ${channelId}` + ) + return + } + } + // Check if reply should be processed based on includeBotMessages flag const isRegularReply =
🧹 Nitpick comments (4)
server/integrations/slack/channelIngest.ts (4)
504-505
: Consider reducing log verbosity for per-message processing.Logging detailed information for every message processed will generate excessive log volume in channels with thousands of messages. Consider logging at intervals (e.g., every 100 messages) or using debug level.
For example, replace per-message logs with periodic logging:
- loggerWithChild({ email }).info( - `Message counter update: added ${response.messages.length} messages, total now ${messageCounters.totalMessages.value} (was ${previousTotal})` - ) + if (messageCounters.totalMessages.value % 100 === 0) { + loggerWithChild({ email }).info( + `Message counter update: total now ${messageCounters.totalMessages.value}` + ) + }Also applies to: 562-563, 675-675, 690-690, 720-721, 853-854
1479-1484
: Simplify redundant existence check condition.The filter condition can be simplified for clarity:
const conversationsToInsert = conversations.filter( (conversation) => - (existenceMap[conversation.id!] && - !existenceMap[conversation.id!].exists) || - !existenceMap[conversation.id!], + !existenceMap[conversation.id!]?.exists )
1511-1513
: Tracker current/total mismatch when resuming.Setting
tracker.setCurrent(resumeFromChannelIndex)
uses the index from the original channel list, whiletracker.setTotal(conversationsToInsert.length)
uses the filtered count. When resuming from index 5 where 3 channels were already ingested, the tracker might display misleading progress (e.g., 5/7 instead of 2/7).Consider adjusting to track within the filtered list:
- tracker.setCurrent(resumeFromChannelIndex) - tracker.setTotal(conversationsToInsert.length) + tracker.setCurrent(0) // Starting from beginning of filtered list + tracker.setTotal(conversationsToProcess.length)
1520-1521
: Optimize inefficient indexOf lookup in loop.Calling
existingChannelsToIngest.indexOf(conversation.id!)
for each conversation is O(n) per iteration. For large channel lists, this becomes inefficient.Create an index map once before the loop:
+ // Create index map for efficient lookup + const channelIndexMap = new Map<string, number>() + existingChannelsToIngest.forEach((id, idx) => channelIndexMap.set(id, idx)) + for (const conversation of conversationsToProcess) { // Update conversationIndex to match position in existingChannelsToIngest - conversationIndex = existingChannelsToIngest.indexOf(conversation.id!) + conversationIndex = channelIndexMap.get(conversation.id!) ?? conversationIndex
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/api/admin.ts
(2 hunks)server/integrations/slack/channelIngest.ts
(19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/api/admin.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/integrations/slack/channelIngest.ts
🧬 Code graph analysis (1)
server/integrations/slack/channelIngest.ts (3)
server/integrations/slack/index.ts (4)
extractUserIdsFromBlocks
(250-270)formatSlackSpecialMentions
(271-298)getConversationUsers
(636-678)insertChannelMessages
(303-534)server/db/ingestion.ts (3)
getIngestionById
(110-121)updateIngestionStatus
(52-85)updateIngestionMetadata
(90-105)server/db/connector.ts (2)
db
(3-3)getOAuthConnectorWithCredentials
(195-317)
…sume the channel ingestion-5
…sume the channel ingestion
Description
Testing
Additional Notes
Summary by CodeRabbit
New Features
Improvements