Skip to content

Conversation

naSim087
Copy link
Contributor

@naSim087 naSim087 commented Sep 30, 2025

Description

Move ingestion endpoints from server.ts to sync-server.ts

Testing

Additional Notes

Summary by CodeRabbit

  • New Features

    • Start Ingestion button added to Google and Slack OAuth flows with loading/disabled states and success/error toasts; handles admin vs non-admin start paths.
  • Improvements

    • New "Ready for Ingestion" OAuth state shown in UI; manual ingestion available once ready.
    • OAuth tabs now receive connector context and connector selection tightened to avoid mismatches.
  • Stability

    • Ingestion and real-time messaging routed through a central sync server/WebSocket with authenticated proxying and improved logging.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 30, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a user-triggered "Start Ingestion" flow for OAuth connectors, a new OAuthReadyForIngestion status, maps ConnectorStatus.Authenticated, moves Google ingestion start to a sync-server proxy with a main↔sync WebSocket bridge, and exposes new ingestion APIs, schemas, and proxy routing.

Changes

Cohort / File(s) Summary
Frontend: OAuth UI & routes
frontend/src/components/OAuthTab.tsx, frontend/src/routes/_authenticated/admin/integrations/google.tsx, frontend/src/routes/_authenticated/admin/integrations/slack.tsx, frontend/src/routes/_authenticated/integrations/google.tsx
Adds optional connectorId?: string to OAuthTab props; renders "Start Ingestion" UI when OAuthReadyForIngestion; implements handleStartIngestion with admin vs non-admin API routing, loading/toast/error handling; maps ConnectorStatus.AuthenticatedOAuthReadyForIngestion; narrows Google connector lookup by app.
Frontend types
frontend/src/types.ts
Adds OAuthIntegrationStatus.OAuthReadyForIngestion.
Server API surface
server/api/admin.ts, server/api/oauth.ts, server/types.ts
Adds StartGoogleIngestionApi and startGoogleIngestionSchema (connectorId); OAuth callback now sets connector status to Authenticated and no longer auto-starts Google ingestion; Slack ingestion payload types adjusted to numeric connectorId.
Server integrations (Google/Slack)
server/integrations/google/index.ts, server/integrations/slack/index.ts
Persist connector status update to Connecting in DB before ingestion begins; small refactors and retry/backoff formatting preserved.
Metric stream & WebSocket bridge
server/integrations/metricStream.ts, server/server.ts, server/sync-server.ts
Adds internal /internal/sync-websocket guarded by a Bearer token; introduces proxyToSyncServer to forward ingestion requests to sync-server; sync-server connects to main server via WS and exposes ingestion endpoints; metricStream forwards via sync-server with lazy imports.
Shared/server types & config
server/shared/types.ts, server/config.ts, server/types.ts
Adds ConnectorStatus.Authenticated; adds AccessTokenCookie config export; exports startGoogleIngestionSchema.
Server routing & wiring
server/server.ts
Replaces direct per-user ingestion handlers with proxy forwarding to sync-server; adds SyncServerWsApp endpoint and wiring for forwarding sync messages to frontend connections.
Other minor changes
server/integrations/metricStream.ts (helpers), assorted formatting
Adds helper send/forward functions, lazy-loading to avoid cycles, logging and formatting tweaks.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant FE as Frontend (OAuthTab)
  participant API as Main Server (admin API)
  participant Proxy as Main Server (proxyToSyncServer)
  participant SS as Sync Server
  participant GI as Google Integration Worker

  User->>FE: Click "Start Ingestion"
  FE->>API: POST /admin/start-google-ingestion { connectorId }
  API->>Proxy: proxyToSyncServer(request)
  Proxy->>SS: POST /sync/google/start_ingestion { connectorId }
  SS->>GI: handleGoogleOAuthIngestion(connectorId)
  GI->>GI: update connector status -> Connecting
  GI-->>SS: emit progress/events
  SS-->>API: 200 OK (ingestion started)
  API-->>FE: 200 OK
  Note over FE: set OAuthIntegrationStatus -> OAuthConnecting
Loading
sequenceDiagram
  autonumber
  participant SS as Sync Server
  participant MS as Main Server (/internal/sync-websocket)
  participant FE as Frontend WS

  rect rgba(230,240,255,0.4)
    SS->>MS: WS connect (Bearer METRICS_SECRET)
    MS-->>SS: accept (if authorized)
  end

  SS->>MS: sendWebsocketMessageToMainServer(msg, connectorId)
  MS->>FE: forward via wsConnections[connectorId]
  Note over MS,FE: delivery routed to browser client for that connectorId
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • zereraz
  • kalpadhwaryu
  • devesh-juspay

Poem

I’m a rabbit clicking "Start" with cheer,
OAuth wakes and status hops near,
Sync-server hums and websockets sing,
Connectors wake as events take wing,
Ingestion starts — let carrots appear! 🐇✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title “feat: migrate ingestion APIs to sync-server” accurately and concisely captures the primary change of moving ingestion endpoints to the sync server, clearly reflecting the main objective without extraneous detail.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/migrateIngestionToSyncServer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @naSim087, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the application's architecture by separating data ingestion and synchronization logic into a dedicated sync-server. This move aims to improve the scalability and resilience of the main API server by offloading resource-intensive tasks. It also enhances the user experience by introducing a new 'ready for ingestion' state, allowing users to manually initiate data synchronization after completing OAuth authentication, and establishes a robust WebSocket communication channel between the sync server and the main server for real-time updates.

Highlights

  • Ingestion API Migration: All ingestion-related API endpoints for Slack and Google, including starting ingestion and syncing, have been migrated from the main server.ts to the dedicated sync-server.ts. The main server now proxies these requests to the sync server.
  • New OAuth Status for Manual Ingestion: A new OAuthReadyForIngestion status has been introduced in the frontend and a corresponding Authenticated status in the backend ConnectorStatus enum. This allows for a state where OAuth is complete, but data ingestion has not yet begun, enabling manual initiation.
  • Frontend 'Start Ingestion' Button: The frontend UI for Google and Slack integrations now displays a 'Start Ingestion' button when the OAuthReadyForIngestion status is detected, allowing users to manually trigger the data ingestion process after successful OAuth authentication.
  • Sync Server WebSocket Communication: A new internal WebSocket endpoint (/internal/sync-websocket) has been added to the main server.ts. The sync-server now establishes a WebSocket connection to this endpoint to forward real-time messages (e.g., ingestion progress) back to the main server, which then relays them to the appropriate frontend clients.
  • Sync Server Authentication: The sync-server now enforces JWT authentication for all its API routes, ensuring that only authenticated requests can trigger ingestion and sync operations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant architectural change by migrating ingestion-related APIs to a separate sync-server. This is a good move for offloading heavy processing from the main application server. The changes include proxying ingestion requests, establishing a WebSocket for progress reporting from the new server, and updating the frontend to support manual ingestion triggers. My review focuses on improving the new implementation's robustness and maintainability, highlighting a couple of high-severity issues related to the new server-to-server communication logic and security in the WebSocket handling. I've also included several medium-severity suggestions to enhance code clarity, type safety, and error handling in both the frontend and backend.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🧹 Nitpick comments (5)
server/api/oauth.ts (1)

154-160: Consider migrating Microsoft ingestion for consistency.

Microsoft OAuth ingestion still runs automatically in the background while Google ingestion was moved to manual triggering. This creates an inconsistent user experience. Consider whether Microsoft should also use the manual ingestion flow.

server/integrations/metricStream.ts (1)

14-20: Remove unused sendWebsocketMessageDirect helper.

The function is declared in server/integrations/metricStream.ts but never referenced; delete it to eliminate dead code.

server/sync-server.ts (3)

50-50: Add success log in open handler.

The open event handler is empty. For observability, log a success message when the WebSocket connection is established.

-  mainServerWebSocket.on("open", () => {})
+  mainServerWebSocket.on("open", () => {
+    Logger.info("WebSocket connection to main server established")
+  })

140-141: Fix comment typo.

Line 140 has an extra forward slash.

-// // Protected ingestion API routes - require JWT authentication
+// Protected ingestion API routes - require JWT authentication
 app.use("*", AuthMiddleware)

169-170: Inconsistent validation pattern for sync routes.

The Slack and Google ingestion routes (lines 144-167) use zValidator middleware for request validation, but these sync routes validate inside the handler functions (HandlePerUserSlackSync and HandlePerUserGoogleWorkSpaceSync parse the body with syncByMailSchema). For consistency and early validation, consider adding zValidator at the route level.

First, verify that syncByMailSchema is exported from @/types:

#!/bin/bash
# Description: Check if syncByMailSchema is exported from types

rg -n 'export.*syncByMailSchema' server/types.ts

If exported, apply this diff:

 import {
   ingestMoreChannelSchema,
   startSlackIngestionSchema,
   serviceAccountIngestMoreSchema,
+  syncByMailSchema,
 } from "@/types"

 // Sync APIs
-app.post("/syncSlackByMail", HandlePerUserSlackSync)
-app.post("/syncGoogleWorkSpaceByMail", HandlePerUserGoogleWorkSpaceSync)
+app.post("/syncSlackByMail", zValidator("json", syncByMailSchema), HandlePerUserSlackSync)
+app.post("/syncGoogleWorkSpaceByMail", zValidator("json", syncByMailSchema), HandlePerUserGoogleWorkSpaceSync)

Then update the handlers in server/api/admin.ts to use c.req.valid("json") instead of c.req.parseBody().

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5aeac64 and 3b3bac9.

📒 Files selected for processing (14)
  • frontend/src/components/OAuthTab.tsx (5 hunks)
  • frontend/src/routes/_authenticated/admin/integrations/google.tsx (3 hunks)
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3 hunks)
  • frontend/src/routes/_authenticated/integrations/google.tsx (1 hunks)
  • frontend/src/types.ts (1 hunks)
  • server/api/admin.ts (2 hunks)
  • server/api/oauth.ts (2 hunks)
  • server/integrations/google/index.ts (1 hunks)
  • server/integrations/metricStream.ts (1 hunks)
  • server/integrations/slack/index.ts (1 hunks)
  • server/server.ts (8 hunks)
  • server/shared/types.ts (1 hunks)
  • server/sync-server.ts (3 hunks)
  • server/types.ts (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.

Applied to files:

  • server/integrations/slack/index.ts
  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.

Applied to files:

  • server/server.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.

Applied to files:

  • frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/integrations/google/index.ts (2)
server/db/client.ts (1)
  • db (15-15)
server/db/schema/connectors.ts (1)
  • connectors (60-109)
frontend/src/components/OAuthTab.tsx (2)
frontend/src/api.ts (1)
  • api (4-4)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/integrations/slack/index.ts (1)
server/db/schema/connectors.ts (1)
  • connectors (60-109)
server/api/admin.ts (6)
server/db/user.ts (1)
  • getUserByEmail (147-156)
server/db/connector.ts (2)
  • db (3-3)
  • getConnectorByExternalId (369-403)
server/db/client.ts (1)
  • db (15-15)
server/integrations/google/index.ts (2)
  • loggerWithChild (147-149)
  • handleGoogleOAuthIngestion (861-1055)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/server.ts (1)
server/integrations/metricStream.ts (1)
  • wsConnections (8-8)
server/sync-server.ts (3)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • LogMiddleware (125-190)
server/types.ts (4)
  • ingestMoreChannelSchema (560-565)
  • startSlackIngestionSchema (566-568)
  • serviceAccountIngestMoreSchema (259-271)
  • startGoogleIngestionSchema (569-571)
server/api/admin.ts (6)
  • IngestMoreChannelApi (1555-1588)
  • StartSlackIngestionApi (1449-1498)
  • ServiceAccountIngestMoreUsersApi (932-1021)
  • StartGoogleIngestionApi (1500-1554)
  • HandlePerUserSlackSync (2122-2157)
  • HandlePerUserGoogleWorkSpaceSync (2084-2120)
frontend/src/routes/_authenticated/admin/integrations/google.tsx (1)
frontend/src/api.ts (1)
  • wsClient (6-8)
server/integrations/metricStream.ts (2)
server/logger/index.ts (2)
  • getLogger (36-93)
  • Subsystem (15-15)
server/sync-server.ts (1)
  • sendWebsocketMessageToMainServer (67-97)
🔇 Additional comments (22)
frontend/src/routes/_authenticated/integrations/google.tsx (1)

84-86: LGTM! More specific connector filtering.

The narrowed lookup now filters by both app and authType, ensuring the correct Google Drive OAuth connector is selected. This prevents potential issues if multiple OAuth connectors exist.

frontend/src/types.ts (1)

54-54: LGTM! New ingestion-ready state added.

The OAuthReadyForIngestion state provides a clear intermediate step between OAuth connection and ingestion, enabling the new manual "Start Ingestion" flow. The placement in the enum logically reflects the connection sequence.

server/shared/types.ts (1)

139-139: LGTM! New Authenticated status added.

The Authenticated status provides a clear intermediate state in the OAuth flow, enabling the distinction between OAuth completion and ingestion start. The placement after NotConnected groups OAuth-specific statuses together.

Consider whether the order matters for your use case. If status progression is represented by enum order, placing Authenticated between Connecting and Connected might be more intuitive, though the current placement is also reasonable.

server/server.ts (3)

153-162: LGTM - imports aligned with new WebSocket routing.

The new imports support the WebSocket message forwarding and proxy functionality introduced in this PR.


1026-1034: Approve proxy integration for ingestion endpoints.

The migration of ingestion endpoints to the sync-server via proxy is correctly implemented.


1057-1060: Approve admin-specific sync endpoints.

These admin sync endpoints (syncGoogleWorkSpaceByMail, syncSlackByMail) are correctly proxied to the sync-server with admin authorization.

server/api/oauth.ts (1)

136-136: ConnectorStatus.Authenticated aligns with new manual ingestion flow
Server sets ConnectorStatus.Authenticated and both Slack and Google UIs map it to OAuthReadyForIngestion.

frontend/src/routes/_authenticated/admin/integrations/google.tsx (3)

1111-1112: LGTM! New OAuth status transition correctly implemented.

The addition of the Authenticated status mapping to OAuthReadyForIngestion properly supports the new manual ingestion flow introduced in this PR.


1320-1326: LGTM! ConnectorId correctly passed to OAuthTab.

The connectorId prop is properly derived from the OAuth connector and safely passed to OAuthTab with optional chaining. This enables the new ingestion flow introduced in this PR.


1132-1142: Confirm WebSocket query parameter key alignment

The server handler extracts jobId via c.req.param("jobId"), but the client is invoking $ws({ query: { id: … } }). You must verify that your $ws helper maps query.id to the jobId route parameter; otherwise, update the client calls to use query: { jobId: serviceAccountConnector.externalId } (and similarly for oauthConnector.id).

frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)

699-712: LGTM! Slack OAuth ingestion-ready UI properly implemented.

The new OAuthReadyForIngestion status handling follows the same pattern as the Google integration, providing a clear UI for starting manual ingestion with proper loading states.


794-795: LGTM! Status mapping consistent with Google integration.

The Authenticated to OAuthReadyForIngestion mapping correctly implements the new ingestion flow for Slack.


1026-1028: LGTM! Manual ingestion form availability correctly extended.

Allowing manual ingestion in both OAuthReadyForIngestion and OAuthConnected states provides better UX and aligns with the ingestion flow design.

server/api/admin.ts (1)

113-116: LGTM! Required imports added for Google ingestion.

The imports correctly bring in the necessary Google ingestion handlers for the new API endpoint.

server/integrations/metricStream.ts (2)

2-6: LGTM! Logger setup follows standard patterns.

The new imports and logger initialization are correctly implemented and follow the project's logging conventions.


22-37: LGTM! Dynamic import correctly avoids circular dependencies.

The sendWebsocketMessageViaSyncServerWS function properly uses dynamic imports to avoid circular dependency issues. The error handling with message loss logging is acceptable for this use case, as ingestion status updates are best-effort and the system maintains eventual consistency.

frontend/src/components/OAuthTab.tsx (4)

14-14: LGTM! New imports and props correctly added.

The new imports support the ingestion functionality, and the optional connectorId prop maintains backward compatibility while enabling the new flow.

Also applies to: 20-22, 31-31, 41-41


48-48: LGTM! Loading state properly initialized.

The isStartingIngestion state correctly tracks the ingestion start operation to prevent duplicate requests.


69-111: LGTM! Ingestion handler properly implemented.

The handleStartIngestion function correctly:

  • Validates required connectorId
  • Routes API calls based on user role
  • Handles success and error cases with appropriate user feedback
  • Ensures loading state is always reset via finally block

The status transition to OAuthConnecting on success is correct, as it reflects that the ingestion process has started.


138-156: LGTM! Ingestion-ready UI cleanly implemented.

The new UI for the OAuthReadyForIngestion state provides clear messaging and a properly disabled button during the ingestion start operation. The placement in the status flow is correct.

server/sync-server.ts (2)

34-34: LGTM!

Correctly typed the Hono app with JwtVariables to support JWT authentication middleware.


207-208: WebSocket connection may not be ready when endpoints are called.

The connectToMainServer() call is fire-and-forget, so the WebSocket may still be connecting when ingestion endpoints are invoked. The sendWebsocketMessageToMainServer function handles this by logging a warning and attempting reconnection, but be aware that early messages may be lost until the connection is established.

Consider adding a readiness check or delaying server start until the WebSocket is connected if message delivery is critical. Otherwise, ensure clients can tolerate initial message loss or implement the message queue suggested earlier.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (9)
server/server.ts (3)

1072-1080: Remove duplicate ingestion endpoint registrations.

Lines 1072-1080 duplicate the ingestion endpoints already registered at lines 1026-1034. The .basePath("/admin") at line 1049 means these routes are accessible at both /api/v1/slack/... and /api/v1/admin/slack/..., which is redundant.

Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.

Apply this diff to remove the duplicates:

   .post(
     "/microsoft/service_account",
     zValidator("form", microsoftServiceSchema),
     AddServiceConnectionMicrosoft,
   )
-  .post("/slack/ingest_more_channel", (c) =>
-    proxyToSyncServer(c, "/slack/ingest_more_channel"),
-  )
-  .post("/slack/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/slack/start_ingestion"),
-  )
-  .post("/google/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/google/start_ingestion"),
-  )
   .delete(
     "/oauth/connector/delete",

1186-1236: Critical: WebSocket authentication vulnerability and missing validation.

The sync-server WebSocket endpoint has several security and reliability issues:

  1. Auth bypass: Lines 1200-1204 return empty handlers for unauthorized connections, but the WebSocket upgrade still succeeds. The onOpen callback executes even though it's empty, leaving an open connection.
  2. No connection limit: Multiple sync-servers could connect simultaneously.
  3. No message validation: Line 1213 parses JSON without schema validation.
  4. No rate limiting: A compromised sync-server could flood messages.

The authentication check should reject the upgrade before any handlers execute:

 export const SyncServerWsApp = app.get(
   "/internal/sync-websocket",
   upgradeWebSocket((c) => {
     // Verify authentication
     const authHeader = c.req.header("Authorization")
     const expectedSecret = process.env.METRICS_SECRET
 
     if (
       !authHeader ||
       !authHeader.startsWith("Bearer ") ||
       authHeader.slice(7) !== expectedSecret
     ) {
       Logger.warn("Unauthorized sync-server WebSocket connection attempt")
-      return {
-        onOpen() {
-          // Close immediately if unauthorized
-        },
-      }
+      throw new HTTPException(401, { message: "Unauthorized" })
     }

Additionally, consider adding:

  • Connection limit tracking (singleton sync-server connection)
  • Message schema validation with Zod
  • Basic rate limiting (messages per second)

Based on learnings.


1279-1319: Add timeout and make sync-server URL configurable.

The proxy function has several issues:

  1. Hardcoded localhost (line 1293): Won't work in containerized/distributed deployments
  2. No timeout: Fetch call can hang indefinitely
  3. Information disclosure (line 1309): Might expose internal error details

Apply these improvements:

 const proxyToSyncServer = async (c: Context, endpoint: string) => {
   try {
     // Get JWT token from cookie
     const token = getCookie(c, AccessTokenCookieName)
     if (!token) {
       throw new HTTPException(401, { message: "No authentication token" })
     }
 
     // Get request body
     const body = await c.req.json()
 
+    const controller = new AbortController()
+    const timeoutId = setTimeout(() => controller.abort(), 30000) // 30s timeout
+
     // Forward to sync server
-    const response = await fetch(
-      `http://localhost:${config.syncServerPort}${endpoint}`,
-      {
+    try {
+      const response = await fetch(
+        `${process.env.SYNC_SERVER_HOST || 'http://localhost'}:${config.syncServerPort}${endpoint}`,
+        {
-        method: "POST",
-        headers: {
-          "Content-Type": "application/json",
-          Cookie: `${AccessTokenCookieName}=${token}`,
-        },
-        body: JSON.stringify(body),
-      },
-    )
+          method: "POST",
+          headers: {
+            "Content-Type": "application/json",
+            Cookie: `${AccessTokenCookieName}=${token}`,
+          },
+          body: JSON.stringify(body),
+          signal: controller.signal,
+        },
+      )
+      clearTimeout(timeoutId)
 
-    if (!response.ok) {
-      const errorData = await response
-        .json()
-        .catch(() => ({ message: "Proxy request failed" }))
-      throw new HTTPException(response.status as any, {
-        message: errorData.message || "Proxy request failed",
-      })
-    }
+      if (!response.ok) {
+        Logger.warn(`Sync-server request failed: ${response.status}`)
+        throw new HTTPException(response.status as any, {
+          message: "Request to sync server failed",
+        })
+      }
 
-    return c.json(await response.json())
+      return c.json(await response.json())
+    } catch (fetchError) {
+      clearTimeout(timeoutId)
+      if (fetchError instanceof Error && fetchError.name === 'AbortError') {
+        Logger.error(`Sync-server request timeout for ${endpoint}`)
+        throw new HTTPException(504, { message: "Request timeout" })
+      }
+      throw fetchError
+    }
   } catch (error) {
     if (error instanceof HTTPException) throw error
     Logger.error(error, `Proxy request to ${endpoint} failed`)
-    throw new HTTPException(500, { message: "Proxy request failed" })
+    throw new HTTPException(500, { message: "Internal server error" })
   }
 }

Add to config.ts:

syncServerHost: process.env.SYNC_SERVER_HOST || 'http://localhost'
server/sync-server.ts (6)

24-25: Remove unused imports.

The db and getUserByEmail imports are not directly used in this file—the API handlers consume them internally.

Apply this diff:

-import { db } from "@/db/client"
-import { getUserByEmail } from "@/db/user"

41-64: Hardcoded localhost URL and missing secret validation remain.

The WebSocket URL is still hardcoded to localhost:${config.port}, which won't work in containerized or multi-host deployments. Additionally, METRICS_SECRET is read without validation—if undefined, the connection attempts with Authorization: Bearer undefined.


67-97: Messages are still lost when WebSocket is unavailable.

Lines 84-86 log and discard messages when the connection is down. For critical ingestion status updates, this data loss may prevent the UI from reflecting actual ingestion state.


88-95: Race condition in reconnect logic persists.

Both the error/close handlers (lines 56, 62) and this function (lines 93-94) can trigger connectToMainServer() concurrently, potentially creating duplicate connections or connection churn.


99-107: Validate ACCESS_TOKEN_SECRET at startup.

The non-null assertion on line 100 remains and will throw at runtime if the environment variable is missing.


176-178: Use ESM-compatible path resolution for Worker script.

The code still relies on __dirname, which is not available in ESM contexts under Bun.

🧹 Nitpick comments (2)
server/sync-server.ts (2)

169-170: Consider adding zValidator for consistency.

The sync endpoints (lines 169-170) perform validation internally via parseBody() and syncByMailSchema.parse(), while other endpoints use zValidator middleware. For consistency and early validation, consider adding zValidator("form", syncByMailSchema) to these routes.

Example for line 169:

-app.post("/syncSlackByMail", HandlePerUserSlackSync)
+app.post(
+  "/syncSlackByMail",
+  zValidator("form", syncByMailSchema),
+  HandlePerUserSlackSync
+)

207-208: Consider startup sequencing with main server.

connectToMainServer() is called during sync-server initialization, but there's no guarantee the main server (and its /internal/sync-websocket endpoint) is ready. In development or containerized environments where servers start concurrently, this may cause repeated connection failures until the main server is up.

Consider adding a startup delay or a readiness check:

  // Connect to main server WebSocket
+ // Delay connection to allow main server to start
+ setTimeout(connectToMainServer, 2000)
- connectToMainServer()

Or implement a more robust retry with backoff:

const connectWithBackoff = (retries = 0, maxRetries = 10) => {
  const delay = Math.min(1000 * Math.pow(2, retries), 30000)
  if (retries > 0) {
    Logger.info(`Retrying connection to main server (attempt ${retries + 1}/${maxRetries})`)
  }
  connectToMainServer()
  // Add logic in error handler to call connectWithBackoff with incremented retries
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b3bac9 and 6e499b4.

📒 Files selected for processing (8)
  • server/api/admin.ts (2 hunks)
  • server/api/oauth.ts (2 hunks)
  • server/config.ts (2 hunks)
  • server/integrations/google/index.ts (4 hunks)
  • server/integrations/metricStream.ts (1 hunks)
  • server/integrations/slack/index.ts (1 hunks)
  • server/server.ts (9 hunks)
  • server/sync-server.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • server/integrations/google/index.ts
  • server/integrations/metricStream.ts
  • server/integrations/slack/index.ts
  • server/api/oauth.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.

Applied to files:

  • server/server.ts
🧬 Code graph analysis (3)
server/api/admin.ts (5)
server/db/user.ts (1)
  • getUserByEmail (147-156)
server/db/connector.ts (2)
  • db (3-3)
  • getConnectorByExternalId (369-403)
server/db/client.ts (1)
  • db (15-15)
server/integrations/google/index.ts (2)
  • loggerWithChild (147-149)
  • handleGoogleOAuthIngestion (861-1059)
server/utils.ts (1)
  • getErrorMessage (103-106)
server/server.ts (1)
server/integrations/metricStream.ts (1)
  • wsConnections (17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • LogMiddleware (125-190)
server/types.ts (4)
  • ingestMoreChannelSchema (560-565)
  • startSlackIngestionSchema (566-568)
  • serviceAccountIngestMoreSchema (259-271)
  • startGoogleIngestionSchema (569-571)
server/api/admin.ts (6)
  • IngestMoreChannelApi (1553-1586)
  • StartSlackIngestionApi (1449-1498)
  • ServiceAccountIngestMoreUsersApi (932-1021)
  • StartGoogleIngestionApi (1500-1552)
  • HandlePerUserSlackSync (2120-2155)
  • HandlePerUserGoogleWorkSpaceSync (2082-2118)
🔇 Additional comments (10)
server/config.ts (1)

59-59: LGTM! Centralized cookie name configuration.

The addition of AccessTokenCookie constant centralizes the access token cookie name, replacing hard-coded strings and improving maintainability.

Also applies to: 223-223

server/api/admin.ts (2)

113-116: LGTM! Necessary imports for Google ingestion.

The added imports for handleGoogleOAuthIngestion and handleGoogleServiceAccountIngestion support the new Google ingestion API endpoint.


1500-1552: LGTM! Follows established patterns for ingestion APIs.

The StartGoogleIngestionApi correctly mirrors the StartSlackIngestionApi pattern with appropriate error handling and fire-and-forget async ingestion.

Note: Past review comments have already flagged debug console.log statements and type assertion improvements that should be addressed.

server/server.ts (5)

153-162: LGTM! Necessary imports for WebSocket message forwarding.

The imports enable the sync-server to forward WebSocket messages to frontend connections through the metric stream infrastructure.


301-301: LGTM! Uses centralized config constant.

Correctly derives AccessTokenCookieName from config.AccessTokenCookie instead of hard-coding the string.


1026-1034: LGTM! Ingestion routes correctly proxied to sync-server.

The ingestion endpoints are appropriately forwarded to the sync-server using the proxy function.


1057-1060: LGTM! Admin sync routes proxied to sync-server.

The per-user sync routes are correctly forwarded to the sync-server.


1104-1106: LGTM! Service account ingestion proxied to sync-server.

The service account ingest more users endpoint is correctly proxied to the sync-server.

server/sync-server.ts (2)

34-34: LGTM: Proper typing for JWT context.

The Hono app is correctly typed with JwtVariables to enable type-safe JWT payload access.


140-170: LGTM: Protected endpoints properly configured.

The JWT middleware is correctly applied to all routes, and the Slack/Google ingestion endpoints use zValidator for request validation.

shivamashtikar
shivamashtikar previously approved these changes Sep 30, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (9)
server/types.ts (1)

563-565: Resolve the connectorId type inconsistency flagged in the previous review.

The startGoogleIngestionSchema still uses connectorId: z.string() (line 564) while startSlackIngestionSchema uses connectorId: z.number() (line 561). This inconsistency was flagged in the previous review and remains unresolved, which will cause runtime validation failures or integration issues when connectorId values are passed between systems expecting different types.

Apply this diff to align with the database schema and startSlackIngestionSchema:

 export const startGoogleIngestionSchema = z.object({
-  connectorId: z.string(),
+  connectorId: z.number(),
 })
server/sync-server.ts (5)

176-227: Use ESM-compatible path resolution for Worker.

The worker path uses __dirname (line 183) which isn't reliably available in ESM contexts. Bun supports import.meta.dir for directory resolution.

Apply this diff:

   Logger.info(`Starting ${workerCount} file processing worker threads...`)
   
   for (let i = 0; i < workerCount; i++) {
-    const fileProcessingWorker = new Worker(path.join(__dirname, "fileProcessingWorker.ts"))
+    const fileProcessingWorker = new Worker(path.join(import.meta.dir, "fileProcessingWorker.ts"))
     workerThreads.push(fileProcessingWorker)

And update the restart logic:

         Logger.info(`Restarting file processing worker thread ${i + 1}...`)
-        const newWorker = new Worker(path.join(__dirname, "fileProcessingWorker.ts"))
+        const newWorker = new Worker(path.join(import.meta.dir, "fileProcessingWorker.ts"))
         workerThreads[i] = newWorker

24-25: Remove unused imports.

The db and getUserByEmail imports aren't used directly in this file. The API handlers imported from @/api/admin use them internally, so these imports are unnecessary here.

Apply this diff:

-import { db } from "@/db/client"
-import { getUserByEmail } from "@/db/user"
 import type { JwtVariables } from "hono/jwt"

41-64: Make WebSocket URL configurable and validate secret.

The WebSocket connection has critical issues:

  1. Hardcoded localhost (line 42): Won't work in containerized or distributed deployments. The config variable mentioned in comments should be used.

  2. Missing secret validation (line 43): If METRICS_SECRET is undefined, the connection will use Authorization: Bearer undefined.

Apply this diff:

+const mainServerWsUrl = process.env.MAIN_SERVER_WS_URL || `ws://${process.env.MAIN_SERVER_HOST || 'localhost'}:${config.port}/internal/sync-websocket`
+const authSecret = process.env.METRICS_SECRET
+
+if (!authSecret) {
+  throw new Error("METRICS_SECRET environment variable is required for sync-server WebSocket connection")
+}
+
 const connectToMainServer = () => {
-  const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket`
-  const authSecret = process.env.METRICS_SECRET
-  mainServerWebSocket = new WebSocket(mainServerUrl, {
+  mainServerWebSocket = new WebSocket(mainServerWsUrl, {
     headers: {
       Authorization: `Bearer ${authSecret}`,
     },
   })

67-97: Address message loss and race condition.

Two reliability issues remain:

  1. Messages are lost when WebSocket is unavailable (lines 84-86): Critical ingestion status updates are logged and discarded. Consider implementing an in-memory queue to buffer messages during connection outages.

  2. Race condition in reconnect logic (lines 88-95): Both error/close handlers (lines 56, 62) and this function can trigger connectToMainServer() concurrently, potentially creating duplicate connections or connection churn.

For the race condition, add a reconnection flag:

+let isReconnecting = false
+
 const connectToMainServer = () => {
+  if (isReconnecting) return
+  isReconnecting = true
+
   mainServerWebSocket = new WebSocket(mainServerWsUrl, {
     headers: {
       Authorization: `Bearer ${authSecret}`,
     },
   })

   mainServerWebSocket.on("open", () => {
+    isReconnecting = false
+    Logger.info("WebSocket connection to main server established")
   })

   mainServerWebSocket.on("error", (error) => {
     Logger.error(error, "WebSocket connection to main server failed")
     mainServerWebSocket = null
+    isReconnecting = false
     setTimeout(connectToMainServer, 5000)
   })

   mainServerWebSocket.on("close", () => {
+    Logger.info("WebSocket connection to main server closed")
     mainServerWebSocket = null
+    isReconnecting = false
     setTimeout(connectToMainServer, 5000)
   })
 }

And update the reconnect in sendWebsocketMessageToMainServer:

     if (
       !mainServerWebSocket ||
       mainServerWebSocket.readyState === WebSocket.CLOSED
     ) {
-      Logger.info("Attempting to reconnect to main server...")
-      connectToMainServer()
+      if (!isReconnecting) {
+        Logger.info("Attempting to reconnect to main server...")
+        connectToMainServer()
+      }
     }

99-107: Validate ACCESS_TOKEN_SECRET at startup.

The non-null assertion on line 100 will cause a runtime error if ACCESS_TOKEN_SECRET is missing. Validate the environment variable at module initialization.

Apply this diff:

 const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET!
+if (!accessTokenSecret) {
+  throw new Error("ACCESS_TOKEN_SECRET environment variable is required")
+}
 const AccessTokenCookieName = config.AccessTokenCookie
server/server.ts (3)

1187-1195: Remove duplicate ingestion endpoint registrations.

Lines 1187-1195 duplicate the ingestion endpoints already registered at lines 1141-1149. The .basePath("/admin") at line 1164 means these routes are accessible at both /api/v1/slack/... and /api/v1/admin/slack/..., which is redundant.

Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.

Apply this diff to remove the duplicates:

   .post(
     "/microsoft/service_account",
     zValidator("form", microsoftServiceSchema),
     AddServiceConnectionMicrosoft,
   )
-  .post("/slack/ingest_more_channel", (c) =>
-    proxyToSyncServer(c, "/slack/ingest_more_channel"),
-  )
-  .post("/slack/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/slack/start_ingestion"),
-  )
-  .post("/google/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/google/start_ingestion"),
-  )
   .delete(
     "/oauth/connector/delete",

1301-1351: Address WebSocket authentication and reliability issues.

The sync-server WebSocket endpoint has several issues previously flagged:

  1. Ineffective authentication (lines 1315-1319): Unauthorized connections return an empty onOpen handler but the WebSocket upgrade still succeeds. The connection should be rejected during the upgrade phase.

  2. No connection limit: Multiple sync-servers could connect simultaneously without restriction.

  3. Missing validation (line 1328): JSON.parse is called without schema validation on the message fields.

  4. No rate limiting: A compromised sync-server could flood messages.

Consider implementing these improvements:

  1. Reject unauthorized upgrades immediately by throwing HTTPException(401) instead of returning empty handlers
  2. Add connection limiting with a module-scoped flag/counter
  3. Validate message schema before accessing message and connectorId fields
  4. Implement rate limiting per connection

Generate a verification script to check if there are existing validation utilities:

#!/bin/bash
# Search for message validation patterns in WebSocket handlers
rg -n "WebSocket.*schema|zod.*parse" --type=ts -C3

1394-1434: Make sync-server URL configurable and add timeout.

The proxy function has issues previously flagged:

  1. Hardcoded localhost (line 1408): Won't work in containerized deployments (Docker, Kubernetes). The reviewer mentioned using xyne-app-sync in Docker environments.

  2. No timeout: The fetch call can hang indefinitely, blocking the request.

  3. Generic error messages: Consider whether internal error details should be exposed.

Apply these improvements:

 const proxyToSyncServer = async (c: Context, endpoint: string) => {
   try {
     // Get JWT token from cookie
     const token = getCookie(c, AccessTokenCookieName)
     if (!token) {
       throw new HTTPException(401, { message: "No authentication token" })
     }

     // Get request body
     const body = await c.req.json()

+    const controller = new AbortController()
+    const timeoutId = setTimeout(() => controller.abort(), 30000) // 30s timeout
+
     // Forward to sync server
-    const response = await fetch(
-      `http://localhost:${config.syncServerPort}${endpoint}`,
-      {
+    try {
+      const syncServerHost = process.env.SYNC_SERVER_HOST || 'http://localhost'
+      const response = await fetch(
+        `${syncServerHost}:${config.syncServerPort}${endpoint}`,
+        {
-        method: "POST",
-        headers: {
-          "Content-Type": "application/json",
-          Cookie: `${AccessTokenCookieName}=${token}`,
-        },
-        body: JSON.stringify(body),
-      },
-    )
+          method: "POST",
+          headers: {
+            "Content-Type": "application/json",
+            Cookie: `${AccessTokenCookieName}=${token}`,
+          },
+          body: JSON.stringify(body),
+          signal: controller.signal,
+        },
+      )
+      clearTimeout(timeoutId)

-    if (!response.ok) {
-      const errorData = await response
-        .json()
-        .catch(() => ({ message: "Proxy request failed" }))
-      throw new HTTPException(response.status as any, {
-        message: errorData.message || "Proxy request failed",
-      })
-    }
+      if (!response.ok) {
+        Logger.warn(`Sync-server request failed: ${response.status}`)
+        throw new HTTPException(response.status as any, {
+          message: "Request to sync server failed",
+        })
+      }

-    return c.json(await response.json())
+      return c.json(await response.json())
+    } catch (fetchError) {
+      clearTimeout(timeoutId)
+      if (fetchError instanceof Error && fetchError.name === 'AbortError') {
+        Logger.error(`Sync-server request timeout for ${endpoint}`)
+        throw new HTTPException(504, { message: "Request timeout" })
+      }
+      throw fetchError
+    }
   } catch (error) {
     if (error instanceof HTTPException) throw error
     Logger.error(error, `Proxy request to ${endpoint} failed`)
     throw new HTTPException(500, { message: "Internal server error" })
   }
 }

Add to your config:

syncServerHost: process.env.SYNC_SERVER_HOST
🧹 Nitpick comments (1)
server/shared/types.ts (1)

130-140: LGTM! Consider adding documentation for the new status.

The new Authenticated status is well-placed in the OAuth flow progression (NotConnected → Connecting → Authenticated → Connected). However, adding a JSDoc comment explaining when this status is used would help other developers understand the distinction between Authenticated and Connected.

Consider adding documentation:

 export enum ConnectorStatus {
   Connected = "connected",
   // Pending = 'pending',
   Connecting = "connecting",
-
   Paused = "paused",
   Failed = "failed",
   // for oauth we will default to this
   NotConnected = "not-connected",
+  // OAuth flow: user authenticated but ingestion not yet started
   Authenticated = "authenticated",
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6e499b4 and 238dde9.

📒 Files selected for processing (5)
  • server/config.ts (2 hunks)
  • server/server.ts (9 hunks)
  • server/shared/types.ts (1 hunks)
  • server/sync-server.ts (3 hunks)
  • server/types.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/config.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.

Applied to files:

  • server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
  • wsConnections (17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • LogMiddleware (125-190)
server/types.ts (4)
  • ingestMoreChannelSchema (554-559)
  • startSlackIngestionSchema (560-562)
  • serviceAccountIngestMoreSchema (259-271)
  • startGoogleIngestionSchema (563-565)
server/api/admin.ts (6)
  • IngestMoreChannelApi (1553-1586)
  • StartSlackIngestionApi (1449-1498)
  • ServiceAccountIngestMoreUsersApi (932-1021)
  • StartGoogleIngestionApi (1500-1552)
  • HandlePerUserSlackSync (2120-2155)
  • HandlePerUserGoogleWorkSpaceSync (2082-2118)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (5)
server/server.ts (2)

166-175: LGTM! Imports support the new WebSocket bridge.

The new imports are used appropriately:

  • wsConnections is used in the sync-server WebSocket handler (line 1331)
  • Other imports support the proxy functionality

314-314: LGTM! Cookie name is now configurable.

Using config.AccessTokenCookie instead of a hardcoded string improves maintainability and consistency.

server/sync-server.ts (3)

34-34: LGTM! Type-safe JWT variables.

Adding JwtVariables to the Hono app type provides type safety for JWT payload access and is consistent with the main server's pattern.


140-170: LGTM! Protected ingestion endpoints are well-structured.

The ingestion endpoints are properly:

  • Protected with JWT authentication
  • Validated with Zod schemas
  • Organized by integration (Slack, Google)

237-240: LGTM! WebSocket connection established at startup.

Connecting to the main server during initialization is appropriate, assuming the configuration and validation issues flagged earlier are addressed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/sync-server.ts (1)

173-198: Broken merge leaves unterminated initSyncServer.

There’s a half-removed initSyncServer definition (lines 173‑184) that never closes its for loop or the function. That’s what TS is complaining about (TS1005: '}' expected). Drop this orphaned block and keep the new helper‑based implementation starting at startAndMonitorWorkers. As written, the file doesn’t compile.

♻️ Duplicate comments (3)
server/server.ts (1)

1308-1354: Unauthorized sync-server sockets stay open.

The auth guard returns a handler even when the bearer secret is wrong, so the upgrade completes and the unauthorized client keeps an idle connection contrary to the comment. Throw an HTTPException(401) (or call ws.close(1008, ...) inside onOpen) to actually reject the socket. This was flagged earlier and still needs fixing.

-    if (
-      !authHeader ||
-      !authHeader.startsWith("Bearer ") ||
-      authHeader.slice(7) !== expectedSecret
-    ) {
-      Logger.warn("Unauthorized sync-server WebSocket connection attempt")
-      return {
-        onOpen() {
-          // Close immediately if unauthorized
-        },
-      }
-    }
+    if (
+      !authHeader ||
+      !authHeader.startsWith("Bearer ") ||
+      authHeader.slice(7) !== expectedSecret
+    ) {
+      Logger.warn("Unauthorized sync-server WebSocket connection attempt")
+      throw new HTTPException(401, { message: "Unauthorized" })
+    }
server/sync-server.ts (2)

41-94: Make the main-server WS client configurable and guarded.

connectToMainServer still dials ws://localhost:${config.port} with Bearer undefined when METRICS_SECRET isn’t set, so we just spin on failed connections outside local dev. Please pull host/port from config (config.mainServerWsUrl or similar), validate METRICS_SECRET at startup, and ensure only one reconnect attempt runs at a time (see earlier feedback about isReconnecting).


99-107: Validate ACCESS_TOKEN_SECRET before wiring JWT.

Using process.env.ACCESS_TOKEN_SECRET! means we’ll pass undefined into the Hono JWT middleware if the env var is missing, crashing requests at runtime. Fail fast instead: read the env, throw a clear error when it’s absent, and only build AuthMiddleware with a verified string.

-const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET!
+const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET
+if (!accessTokenSecret) {
+  throw new Error("ACCESS_TOKEN_SECRET environment variable is required")
+}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 238dde9 and dbc5407.

📒 Files selected for processing (4)
  • server/config.ts (2 hunks)
  • server/server.ts (9 hunks)
  • server/sync-server.ts (3 hunks)
  • server/types.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/config.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.

Applied to files:

  • server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
  • wsConnections (17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • LogMiddleware (125-190)
server/types.ts (4)
  • ingestMoreChannelSchema (557-562)
  • startSlackIngestionSchema (563-565)
  • serviceAccountIngestMoreSchema (262-274)
  • startGoogleIngestionSchema (566-568)
server/api/admin.ts (5)
  • StartSlackIngestionApi (1449-1498)
  • ServiceAccountIngestMoreUsersApi (932-1021)
  • StartGoogleIngestionApi (1500-1552)
  • HandlePerUserSlackSync (2120-2155)
  • HandlePerUserGoogleWorkSpaceSync (2082-2118)
🪛 GitHub Actions: TypeScript Build Check
server/sync-server.ts

[error] 313-313: TS1005: '}' expected.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (6)
server/sync-server.ts (6)

172-248: Use ESM-compatible path resolution for Worker scripts.

The code uses __dirname (line 184) which is not ESM-compatible. In Bun with ESM modules, __dirname may not be defined or may resolve unexpectedly.

Use import.meta.dir (Bun-specific) or reconstruct __dirname via fileURLToPath:

Option 1: Use Bun's import.meta.dir:

-    const worker = new Worker(path.join(__dirname, workerScript))
+    const worker = new Worker(path.join(import.meta.dir, workerScript))

Option 2: ESM-compatible reconstruction:

Add at the top of the file:

+import { fileURLToPath } from "url"
+const __filename = fileURLToPath(import.meta.url)
+const __dirname = path.dirname(__filename)

24-25: Remove unused imports.

The imports db and getUserByEmail are not used in this file. They're used by the API handlers in @/api/admin, which import them directly.

Apply this diff:

-import { db } from "@/db/client"
-import { getUserByEmail } from "@/db/user"

41-64: Hardcoded localhost URL and missing secret validation.

The WebSocket URL is hardcoded to localhost (line 42), which won't work in containerized or distributed deployments. Additionally, METRICS_SECRET (line 43) is read without validation—if undefined, the connection will use Authorization: Bearer undefined.

Apply this diff to make the URL configurable and validate the secret:

+const mainServerWsUrl = process.env.MAIN_SERVER_WS_URL || `ws://localhost:${config.port}/internal/sync-websocket`
+const authSecret = process.env.METRICS_SECRET
+
+if (!authSecret) {
+  throw new Error("METRICS_SECRET environment variable is required for sync-server WebSocket connection")
+}
+
 const connectToMainServer = () => {
-  const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket`
-  const authSecret = process.env.METRICS_SECRET
-  mainServerWebSocket = new WebSocket(mainServerUrl, {
+  mainServerWebSocket = new WebSocket(mainServerWsUrl, {

67-97: Messages are lost when WebSocket is unavailable.

When the WebSocket connection is down or not yet established, messages are logged and discarded (lines 84-86). For critical ingestion status updates, this results in data loss.

Consider implementing an in-memory queue or buffer to retry sending when the connection is re-established. Would you like me to suggest an implementation?


88-95: Race condition in reconnect logic.

Both the error/close handlers (lines 56, 62) and this function can trigger connectToMainServer() concurrently. If the connection drops while a message is being sent, multiple reconnection attempts may race, potentially creating duplicate connections or connection churn.

Add a reconnection flag to prevent concurrent reconnection attempts:

+let isReconnecting = false
+
 const connectToMainServer = () => {
+  if (isReconnecting) return
+  isReconnecting = true
+
   const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket`
   const authSecret = process.env.METRICS_SECRET
   mainServerWebSocket = new WebSocket(mainServerUrl, {
     headers: {
       Authorization: `Bearer ${authSecret}`,
     },
   })

   mainServerWebSocket.on("open", () => {
+    isReconnecting = false
   })

   mainServerWebSocket.on("error", (error) => {
     Logger.error(error, "WebSocket connection to main server failed")
     mainServerWebSocket = null
+    isReconnecting = false
     setTimeout(connectToMainServer, 5000)
   })

   mainServerWebSocket.on("close", () => {
     mainServerWebSocket = null
+    isReconnecting = false
     setTimeout(connectToMainServer, 5000)
   })
 }

And update line 93-94:

-      Logger.info("Attempting to reconnect to main server...")
-      connectToMainServer()
+      if (!isReconnecting) {
+        Logger.info("Attempting to reconnect to main server...")
+        connectToMainServer()
+      }

99-107: Validate ACCESS_TOKEN_SECRET at startup.

The non-null assertion on line 100 will cause a runtime error if ACCESS_TOKEN_SECRET is missing. This should be validated at module load time.

Apply this diff:

 const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET!
+if (!accessTokenSecret) {
+  throw new Error("ACCESS_TOKEN_SECRET environment variable is required")
+}
 const AccessTokenCookieName = config.AccessTokenCookie
🧹 Nitpick comments (1)
server/sync-server.ts (1)

169-170: Consider adding zValidator for consistency.

The sync endpoints (lines 169-170) don't use zValidator middleware, unlike the other ingestion endpoints. While the handlers validate internally using syncByMailSchema.parse(), adding zValidator middleware would make the validation strategy consistent across all endpoints.

Apply this diff if syncByMailSchema is exported from @/types:

+import { syncByMailSchema } from "@/types"
+
 // Sync APIs
-app.post("/syncSlackByMail", HandlePerUserSlackSync)
-app.post("/syncGoogleWorkSpaceByMail", HandlePerUserGoogleWorkSpaceSync)
+app.post("/syncSlackByMail", zValidator("form", syncByMailSchema), HandlePerUserSlackSync)
+app.post("/syncGoogleWorkSpaceByMail", zValidator("form", syncByMailSchema), HandlePerUserGoogleWorkSpaceSync)

Then remove the internal parsing/validation from the handlers.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dbc5407 and 4a75fe0.

📒 Files selected for processing (1)
  • server/sync-server.ts (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/sync-server.ts (3)
server/logger/index.ts (3)
  • getLogger (36-93)
  • Subsystem (15-15)
  • LogMiddleware (125-190)
server/types.ts (4)
  • ingestMoreChannelSchema (557-562)
  • startSlackIngestionSchema (563-565)
  • serviceAccountIngestMoreSchema (262-274)
  • startGoogleIngestionSchema (566-568)
server/api/admin.ts (6)
  • IngestMoreChannelApi (1553-1586)
  • StartSlackIngestionApi (1449-1498)
  • ServiceAccountIngestMoreUsersApi (932-1021)
  • StartGoogleIngestionApi (1500-1552)
  • HandlePerUserSlackSync (2120-2155)
  • HandlePerUserGoogleWorkSpaceSync (2082-2118)
🔇 Additional comments (2)
server/sync-server.ts (2)

259-286: LGTM! Clean initialization sequence.

The initialization flow is well-structured:

  1. Worker threads started with proper monitoring
  2. Queue initialized in background with error handling
  3. WebSocket connection to main server established

The refactoring improves readability and maintainability.


34-34: Good addition of type safety.

Adding JwtVariables to the Hono app type ensures type-safe access to JWT payload via c.get(JwtPayloadKey) throughout the application.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
server/types.ts (1)

566-568: Resolve the connectorId type inconsistency between Slack and Google schemas.

startSlackIngestionSchema uses connectorId: z.number() (line 563-565) while startGoogleIngestionSchema uses connectorId: z.string() (line 566-568). Although the implementation in server/api/admin.ts shows this is intentional (Slack uses internal numeric IDs with getConnector, Google uses external string IDs with getConnectorByExternalId), this inconsistency creates a confusing API surface and increases the risk of integration errors.

Consider standardizing both schemas to use the same ID type, preferably string externalIds for consistency with external-facing APIs.

server/api/admin.ts (1)

1503-1503: Use type inference from Zod schema instead of type assertion.

The type assertion as { connectorId: string } bypasses type safety. Use z.infer<typeof startGoogleIngestionSchema> to ensure the type matches the schema definition automatically.

Apply this diff:

-  const payload = c.req.valid("json") as { connectorId: string }
+  const payload = c.req.valid("json") as z.infer<typeof startGoogleIngestionSchema>
server/server.ts (1)

1199-1207: Remove duplicate ingestion endpoint registrations.

Lines 1199-1207 duplicate the ingestion endpoints already registered at lines 1153-1161. The .basePath("/admin") at line 1176 means these routes are accessible at both /api/v1/slack/... and /api/v1/admin/slack/..., which is redundant.

Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.

Apply this diff to remove the duplicates:

   .post(
     "/microsoft/service_account",
     zValidator("form", microsoftServiceSchema),
     AddServiceConnectionMicrosoft,
   )
-  .post("/slack/ingest_more_channel", (c) =>
-    proxyToSyncServer(c, "/slack/ingest_more_channel"),
-  )
-  .post("/slack/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/slack/start_ingestion"),
-  )
-  .post("/google/start_ingestion", (c) =>
-    proxyToSyncServer(c, "/google/start_ingestion"),
-  )
   .delete(
     "/oauth/connector/delete",
🧹 Nitpick comments (2)
server/api/admin.ts (2)

1452-1452: Use type inference from Zod schema instead of type assertion.

The type assertion as { connectorId: number } bypasses type safety. Use z.infer<typeof startSlackIngestionSchema> to ensure the type matches the schema definition automatically.

Apply this diff:

-  const payload = c.req.valid("json") as { connectorId: number }
+  const payload = c.req.valid("json") as z.infer<typeof startSlackIngestionSchema>

1542-1542: Type error as unknown instead of any.

Using any for error types bypasses type checking. Type it as unknown and use the existing getErrorMessage helper function (already imported) to safely extract the error message.

Apply this diff:

-  } catch (error: any) {
+  } catch (error: unknown) {
     loggerWithChild({ email: sub }).error(
       error,
       `Error starting regular Google ingestion: ${getErrorMessage(error)}`,
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a75fe0 and f609598.

📒 Files selected for processing (8)
  • server/api/admin.ts (5 hunks)
  • server/api/oauth.ts (2 hunks)
  • server/config.ts (2 hunks)
  • server/integrations/google/index.ts (4 hunks)
  • server/integrations/slack/index.ts (1 hunks)
  • server/server.ts (9 hunks)
  • server/shared/types.ts (1 hunks)
  • server/types.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • server/integrations/slack/index.ts
  • server/config.ts
  • server/shared/types.ts
  • server/api/oauth.ts
  • server/integrations/google/index.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.

Applied to files:

  • server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
  • wsConnections (17-17)
server/api/admin.ts (6)
server/db/connector.ts (3)
  • getConnector (143-164)
  • db (3-3)
  • getConnectorByExternalId (382-416)
server/db/client.ts (1)
  • db (15-15)
server/db/user.ts (1)
  • getUserByEmail (148-157)
server/integrations/google/index.ts (2)
  • loggerWithChild (146-148)
  • handleGoogleOAuthIngestion (860-1058)
server/errors/index.ts (1)
  • NoUserFound (322-327)
server/utils.ts (1)
  • getErrorMessage (103-106)
🔇 Additional comments (2)
server/api/admin.ts (1)

1557-1557: LGTM! Consistent with Slack schema.

The payload type correctly uses connectorId: number to align with the updated ingestMoreChannelSchema that uses numeric connector IDs.

server/server.ts (1)

173-176: LGTM! Configuration centralized properly.

The imports for wsConnections and sendWebsocketMessage are correctly added, and using config.AccessTokenCookie centralizes the cookie name configuration.

Also applies to: 319-319

@shivamashtikar shivamashtikar merged commit f97ae06 into main Oct 14, 2025
4 checks passed
@shivamashtikar shivamashtikar deleted the feat/migrateIngestionToSyncServer branch October 14, 2025 12:29
kalpadhwaryu pushed a commit that referenced this pull request Oct 14, 2025
# [3.16.0](v3.15.0...v3.16.0) (2025-10-14)

### Features

* migrate ingestion APIs to sync-server ([#1014](#1014)) ([f97ae06](f97ae06))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants