-
Notifications
You must be signed in to change notification settings - Fork 56
feat: migrate ingestion APIs to sync-server #1014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a user-triggered "Start Ingestion" flow for OAuth connectors, a new OAuthReadyForIngestion status, maps ConnectorStatus.Authenticated, moves Google ingestion start to a sync-server proxy with a main↔sync WebSocket bridge, and exposes new ingestion APIs, schemas, and proxy routing. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant FE as Frontend (OAuthTab)
participant API as Main Server (admin API)
participant Proxy as Main Server (proxyToSyncServer)
participant SS as Sync Server
participant GI as Google Integration Worker
User->>FE: Click "Start Ingestion"
FE->>API: POST /admin/start-google-ingestion { connectorId }
API->>Proxy: proxyToSyncServer(request)
Proxy->>SS: POST /sync/google/start_ingestion { connectorId }
SS->>GI: handleGoogleOAuthIngestion(connectorId)
GI->>GI: update connector status -> Connecting
GI-->>SS: emit progress/events
SS-->>API: 200 OK (ingestion started)
API-->>FE: 200 OK
Note over FE: set OAuthIntegrationStatus -> OAuthConnecting
sequenceDiagram
autonumber
participant SS as Sync Server
participant MS as Main Server (/internal/sync-websocket)
participant FE as Frontend WS
rect rgba(230,240,255,0.4)
SS->>MS: WS connect (Bearer METRICS_SECRET)
MS-->>SS: accept (if authorized)
end
SS->>MS: sendWebsocketMessageToMainServer(msg, connectorId)
MS->>FE: forward via wsConnections[connectorId]
Note over MS,FE: delivery routed to browser client for that connectorId
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @naSim087, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the application's architecture by separating data ingestion and synchronization logic into a dedicated Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change by migrating ingestion-related APIs to a separate sync-server
. This is a good move for offloading heavy processing from the main application server. The changes include proxying ingestion requests, establishing a WebSocket for progress reporting from the new server, and updating the frontend to support manual ingestion triggers. My review focuses on improving the new implementation's robustness and maintainability, highlighting a couple of high-severity issues related to the new server-to-server communication logic and security in the WebSocket handling. I've also included several medium-severity suggestions to enhance code clarity, type safety, and error handling in both the frontend and backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 15
🧹 Nitpick comments (5)
server/api/oauth.ts (1)
154-160
: Consider migrating Microsoft ingestion for consistency.Microsoft OAuth ingestion still runs automatically in the background while Google ingestion was moved to manual triggering. This creates an inconsistent user experience. Consider whether Microsoft should also use the manual ingestion flow.
server/integrations/metricStream.ts (1)
14-20
: Remove unused sendWebsocketMessageDirect helper.The function is declared in server/integrations/metricStream.ts but never referenced; delete it to eliminate dead code.
server/sync-server.ts (3)
50-50
: Add success log in open handler.The
open
event handler is empty. For observability, log a success message when the WebSocket connection is established.- mainServerWebSocket.on("open", () => {}) + mainServerWebSocket.on("open", () => { + Logger.info("WebSocket connection to main server established") + })
140-141
: Fix comment typo.Line 140 has an extra forward slash.
-// // Protected ingestion API routes - require JWT authentication +// Protected ingestion API routes - require JWT authentication app.use("*", AuthMiddleware)
169-170
: Inconsistent validation pattern for sync routes.The Slack and Google ingestion routes (lines 144-167) use
zValidator
middleware for request validation, but these sync routes validate inside the handler functions (HandlePerUserSlackSync
andHandlePerUserGoogleWorkSpaceSync
parse the body withsyncByMailSchema
). For consistency and early validation, consider addingzValidator
at the route level.First, verify that
syncByMailSchema
is exported from@/types
:#!/bin/bash # Description: Check if syncByMailSchema is exported from types rg -n 'export.*syncByMailSchema' server/types.tsIf exported, apply this diff:
import { ingestMoreChannelSchema, startSlackIngestionSchema, serviceAccountIngestMoreSchema, + syncByMailSchema, } from "@/types" // Sync APIs -app.post("/syncSlackByMail", HandlePerUserSlackSync) -app.post("/syncGoogleWorkSpaceByMail", HandlePerUserGoogleWorkSpaceSync) +app.post("/syncSlackByMail", zValidator("json", syncByMailSchema), HandlePerUserSlackSync) +app.post("/syncGoogleWorkSpaceByMail", zValidator("json", syncByMailSchema), HandlePerUserGoogleWorkSpaceSync)Then update the handlers in
server/api/admin.ts
to usec.req.valid("json")
instead ofc.req.parseBody()
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
frontend/src/components/OAuthTab.tsx
(5 hunks)frontend/src/routes/_authenticated/admin/integrations/google.tsx
(3 hunks)frontend/src/routes/_authenticated/admin/integrations/slack.tsx
(3 hunks)frontend/src/routes/_authenticated/integrations/google.tsx
(1 hunks)frontend/src/types.ts
(1 hunks)server/api/admin.ts
(2 hunks)server/api/oauth.ts
(2 hunks)server/integrations/google/index.ts
(1 hunks)server/integrations/metricStream.ts
(1 hunks)server/integrations/slack/index.ts
(1 hunks)server/server.ts
(8 hunks)server/shared/types.ts
(1 hunks)server/sync-server.ts
(3 hunks)server/types.ts
(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-07-04T07:00:46.837Z
Learnt from: naSim087
PR: xynehq/xyne#638
File: server/integrations/slack/index.ts:793-824
Timestamp: 2025-07-04T07:00:46.837Z
Learning: In server/integrations/slack/index.ts, the dual IngestionState instances follow a specific pattern: `ingestionState` stores the current state that gets updated during processing, while `ingestionOldState` stores the old/previously saved state used for resumption logic when restarting ingestion.
Applied to files:
server/integrations/slack/index.ts
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.
Applied to files:
server/server.ts
📚 Learning: 2025-06-10T05:40:04.427Z
Learnt from: naSim087
PR: xynehq/xyne#525
File: frontend/src/routes/_authenticated/admin/integrations/slack.tsx:141-148
Timestamp: 2025-06-10T05:40:04.427Z
Learning: In frontend/src/routes/_authenticated/admin/integrations/slack.tsx, the ConnectAction enum and related connectAction state (lines 141-148, 469-471) are intentionally kept for future development, even though they appear unused in the current implementation.
Applied to files:
frontend/src/routes/_authenticated/admin/integrations/slack.tsx
🧬 Code graph analysis (8)
server/integrations/google/index.ts (2)
server/db/client.ts (1)
db
(15-15)server/db/schema/connectors.ts (1)
connectors
(60-109)
frontend/src/components/OAuthTab.tsx (2)
frontend/src/api.ts (1)
api
(4-4)server/utils.ts (1)
getErrorMessage
(103-106)
server/integrations/slack/index.ts (1)
server/db/schema/connectors.ts (1)
connectors
(60-109)
server/api/admin.ts (6)
server/db/user.ts (1)
getUserByEmail
(147-156)server/db/connector.ts (2)
db
(3-3)getConnectorByExternalId
(369-403)server/db/client.ts (1)
db
(15-15)server/integrations/google/index.ts (2)
loggerWithChild
(147-149)handleGoogleOAuthIngestion
(861-1055)server/errors/index.ts (1)
NoUserFound
(322-327)server/utils.ts (1)
getErrorMessage
(103-106)
server/server.ts (1)
server/integrations/metricStream.ts (1)
wsConnections
(8-8)
server/sync-server.ts (3)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)LogMiddleware
(125-190)server/types.ts (4)
ingestMoreChannelSchema
(560-565)startSlackIngestionSchema
(566-568)serviceAccountIngestMoreSchema
(259-271)startGoogleIngestionSchema
(569-571)server/api/admin.ts (6)
IngestMoreChannelApi
(1555-1588)StartSlackIngestionApi
(1449-1498)ServiceAccountIngestMoreUsersApi
(932-1021)StartGoogleIngestionApi
(1500-1554)HandlePerUserSlackSync
(2122-2157)HandlePerUserGoogleWorkSpaceSync
(2084-2120)
frontend/src/routes/_authenticated/admin/integrations/google.tsx (1)
frontend/src/api.ts (1)
wsClient
(6-8)
server/integrations/metricStream.ts (2)
server/logger/index.ts (2)
getLogger
(36-93)Subsystem
(15-15)server/sync-server.ts (1)
sendWebsocketMessageToMainServer
(67-97)
🔇 Additional comments (22)
frontend/src/routes/_authenticated/integrations/google.tsx (1)
84-86
: LGTM! More specific connector filtering.The narrowed lookup now filters by both
app
andauthType
, ensuring the correct Google Drive OAuth connector is selected. This prevents potential issues if multiple OAuth connectors exist.frontend/src/types.ts (1)
54-54
: LGTM! New ingestion-ready state added.The
OAuthReadyForIngestion
state provides a clear intermediate step between OAuth connection and ingestion, enabling the new manual "Start Ingestion" flow. The placement in the enum logically reflects the connection sequence.server/shared/types.ts (1)
139-139
: LGTM! New Authenticated status added.The
Authenticated
status provides a clear intermediate state in the OAuth flow, enabling the distinction between OAuth completion and ingestion start. The placement afterNotConnected
groups OAuth-specific statuses together.Consider whether the order matters for your use case. If status progression is represented by enum order, placing
Authenticated
betweenConnecting
andConnected
might be more intuitive, though the current placement is also reasonable.server/server.ts (3)
153-162
: LGTM - imports aligned with new WebSocket routing.The new imports support the WebSocket message forwarding and proxy functionality introduced in this PR.
1026-1034
: Approve proxy integration for ingestion endpoints.The migration of ingestion endpoints to the sync-server via proxy is correctly implemented.
1057-1060
: Approve admin-specific sync endpoints.These admin sync endpoints (
syncGoogleWorkSpaceByMail
,syncSlackByMail
) are correctly proxied to the sync-server with admin authorization.server/api/oauth.ts (1)
136-136
: ConnectorStatus.Authenticated aligns with new manual ingestion flow
Server setsConnectorStatus.Authenticated
and both Slack and Google UIs map it toOAuthReadyForIngestion
.frontend/src/routes/_authenticated/admin/integrations/google.tsx (3)
1111-1112
: LGTM! New OAuth status transition correctly implemented.The addition of the
Authenticated
status mapping toOAuthReadyForIngestion
properly supports the new manual ingestion flow introduced in this PR.
1320-1326
: LGTM! ConnectorId correctly passed to OAuthTab.The
connectorId
prop is properly derived from the OAuth connector and safely passed toOAuthTab
with optional chaining. This enables the new ingestion flow introduced in this PR.
1132-1142
: Confirm WebSocket query parameter key alignmentThe server handler extracts
jobId
viac.req.param("jobId")
, but the client is invoking$ws({ query: { id: … } })
. You must verify that your$ws
helper mapsquery.id
to thejobId
route parameter; otherwise, update the client calls to usequery: { jobId: serviceAccountConnector.externalId }
(and similarly foroauthConnector.id
).frontend/src/routes/_authenticated/admin/integrations/slack.tsx (3)
699-712
: LGTM! Slack OAuth ingestion-ready UI properly implemented.The new
OAuthReadyForIngestion
status handling follows the same pattern as the Google integration, providing a clear UI for starting manual ingestion with proper loading states.
794-795
: LGTM! Status mapping consistent with Google integration.The
Authenticated
toOAuthReadyForIngestion
mapping correctly implements the new ingestion flow for Slack.
1026-1028
: LGTM! Manual ingestion form availability correctly extended.Allowing manual ingestion in both
OAuthReadyForIngestion
andOAuthConnected
states provides better UX and aligns with the ingestion flow design.server/api/admin.ts (1)
113-116
: LGTM! Required imports added for Google ingestion.The imports correctly bring in the necessary Google ingestion handlers for the new API endpoint.
server/integrations/metricStream.ts (2)
2-6
: LGTM! Logger setup follows standard patterns.The new imports and logger initialization are correctly implemented and follow the project's logging conventions.
22-37
: LGTM! Dynamic import correctly avoids circular dependencies.The
sendWebsocketMessageViaSyncServerWS
function properly uses dynamic imports to avoid circular dependency issues. The error handling with message loss logging is acceptable for this use case, as ingestion status updates are best-effort and the system maintains eventual consistency.frontend/src/components/OAuthTab.tsx (4)
14-14
: LGTM! New imports and props correctly added.The new imports support the ingestion functionality, and the optional
connectorId
prop maintains backward compatibility while enabling the new flow.Also applies to: 20-22, 31-31, 41-41
48-48
: LGTM! Loading state properly initialized.The
isStartingIngestion
state correctly tracks the ingestion start operation to prevent duplicate requests.
69-111
: LGTM! Ingestion handler properly implemented.The
handleStartIngestion
function correctly:
- Validates required
connectorId
- Routes API calls based on user role
- Handles success and error cases with appropriate user feedback
- Ensures loading state is always reset via finally block
The status transition to
OAuthConnecting
on success is correct, as it reflects that the ingestion process has started.
138-156
: LGTM! Ingestion-ready UI cleanly implemented.The new UI for the
OAuthReadyForIngestion
state provides clear messaging and a properly disabled button during the ingestion start operation. The placement in the status flow is correct.server/sync-server.ts (2)
34-34
: LGTM!Correctly typed the Hono app with
JwtVariables
to support JWT authentication middleware.
207-208
: WebSocket connection may not be ready when endpoints are called.The
connectToMainServer()
call is fire-and-forget, so the WebSocket may still be connecting when ingestion endpoints are invoked. ThesendWebsocketMessageToMainServer
function handles this by logging a warning and attempting reconnection, but be aware that early messages may be lost until the connection is established.Consider adding a readiness check or delaying server start until the WebSocket is connected if message delivery is critical. Otherwise, ensure clients can tolerate initial message loss or implement the message queue suggested earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (9)
server/server.ts (3)
1072-1080
: Remove duplicate ingestion endpoint registrations.Lines 1072-1080 duplicate the ingestion endpoints already registered at lines 1026-1034. The
.basePath("/admin")
at line 1049 means these routes are accessible at both/api/v1/slack/...
and/api/v1/admin/slack/...
, which is redundant.Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.
Apply this diff to remove the duplicates:
.post( "/microsoft/service_account", zValidator("form", microsoftServiceSchema), AddServiceConnectionMicrosoft, ) - .post("/slack/ingest_more_channel", (c) => - proxyToSyncServer(c, "/slack/ingest_more_channel"), - ) - .post("/slack/start_ingestion", (c) => - proxyToSyncServer(c, "/slack/start_ingestion"), - ) - .post("/google/start_ingestion", (c) => - proxyToSyncServer(c, "/google/start_ingestion"), - ) .delete( "/oauth/connector/delete",
1186-1236
: Critical: WebSocket authentication vulnerability and missing validation.The sync-server WebSocket endpoint has several security and reliability issues:
- Auth bypass: Lines 1200-1204 return empty handlers for unauthorized connections, but the WebSocket upgrade still succeeds. The
onOpen
callback executes even though it's empty, leaving an open connection.- No connection limit: Multiple sync-servers could connect simultaneously.
- No message validation: Line 1213 parses JSON without schema validation.
- No rate limiting: A compromised sync-server could flood messages.
The authentication check should reject the upgrade before any handlers execute:
export const SyncServerWsApp = app.get( "/internal/sync-websocket", upgradeWebSocket((c) => { // Verify authentication const authHeader = c.req.header("Authorization") const expectedSecret = process.env.METRICS_SECRET if ( !authHeader || !authHeader.startsWith("Bearer ") || authHeader.slice(7) !== expectedSecret ) { Logger.warn("Unauthorized sync-server WebSocket connection attempt") - return { - onOpen() { - // Close immediately if unauthorized - }, - } + throw new HTTPException(401, { message: "Unauthorized" }) }Additionally, consider adding:
- Connection limit tracking (singleton sync-server connection)
- Message schema validation with Zod
- Basic rate limiting (messages per second)
Based on learnings.
1279-1319
: Add timeout and make sync-server URL configurable.The proxy function has several issues:
- Hardcoded localhost (line 1293): Won't work in containerized/distributed deployments
- No timeout: Fetch call can hang indefinitely
- Information disclosure (line 1309): Might expose internal error details
Apply these improvements:
const proxyToSyncServer = async (c: Context, endpoint: string) => { try { // Get JWT token from cookie const token = getCookie(c, AccessTokenCookieName) if (!token) { throw new HTTPException(401, { message: "No authentication token" }) } // Get request body const body = await c.req.json() + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), 30000) // 30s timeout + // Forward to sync server - const response = await fetch( - `http://localhost:${config.syncServerPort}${endpoint}`, - { + try { + const response = await fetch( + `${process.env.SYNC_SERVER_HOST || 'http://localhost'}:${config.syncServerPort}${endpoint}`, + { - method: "POST", - headers: { - "Content-Type": "application/json", - Cookie: `${AccessTokenCookieName}=${token}`, - }, - body: JSON.stringify(body), - }, - ) + method: "POST", + headers: { + "Content-Type": "application/json", + Cookie: `${AccessTokenCookieName}=${token}`, + }, + body: JSON.stringify(body), + signal: controller.signal, + }, + ) + clearTimeout(timeoutId) - if (!response.ok) { - const errorData = await response - .json() - .catch(() => ({ message: "Proxy request failed" })) - throw new HTTPException(response.status as any, { - message: errorData.message || "Proxy request failed", - }) - } + if (!response.ok) { + Logger.warn(`Sync-server request failed: ${response.status}`) + throw new HTTPException(response.status as any, { + message: "Request to sync server failed", + }) + } - return c.json(await response.json()) + return c.json(await response.json()) + } catch (fetchError) { + clearTimeout(timeoutId) + if (fetchError instanceof Error && fetchError.name === 'AbortError') { + Logger.error(`Sync-server request timeout for ${endpoint}`) + throw new HTTPException(504, { message: "Request timeout" }) + } + throw fetchError + } } catch (error) { if (error instanceof HTTPException) throw error Logger.error(error, `Proxy request to ${endpoint} failed`) - throw new HTTPException(500, { message: "Proxy request failed" }) + throw new HTTPException(500, { message: "Internal server error" }) } }Add to config.ts:
syncServerHost: process.env.SYNC_SERVER_HOST || 'http://localhost'server/sync-server.ts (6)
24-25
: Remove unused imports.The
db
andgetUserByEmail
imports are not directly used in this file—the API handlers consume them internally.Apply this diff:
-import { db } from "@/db/client" -import { getUserByEmail } from "@/db/user"
41-64
: Hardcoded localhost URL and missing secret validation remain.The WebSocket URL is still hardcoded to
localhost:${config.port}
, which won't work in containerized or multi-host deployments. Additionally,METRICS_SECRET
is read without validation—if undefined, the connection attempts withAuthorization: Bearer undefined
.
67-97
: Messages are still lost when WebSocket is unavailable.Lines 84-86 log and discard messages when the connection is down. For critical ingestion status updates, this data loss may prevent the UI from reflecting actual ingestion state.
88-95
: Race condition in reconnect logic persists.Both the
error
/close
handlers (lines 56, 62) and this function (lines 93-94) can triggerconnectToMainServer()
concurrently, potentially creating duplicate connections or connection churn.
99-107
: Validate ACCESS_TOKEN_SECRET at startup.The non-null assertion on line 100 remains and will throw at runtime if the environment variable is missing.
176-178
: Use ESM-compatible path resolution for Worker script.The code still relies on
__dirname
, which is not available in ESM contexts under Bun.
🧹 Nitpick comments (2)
server/sync-server.ts (2)
169-170
: Consider adding zValidator for consistency.The sync endpoints (lines 169-170) perform validation internally via
parseBody()
andsyncByMailSchema.parse()
, while other endpoints usezValidator
middleware. For consistency and early validation, consider addingzValidator("form", syncByMailSchema)
to these routes.Example for line 169:
-app.post("/syncSlackByMail", HandlePerUserSlackSync) +app.post( + "/syncSlackByMail", + zValidator("form", syncByMailSchema), + HandlePerUserSlackSync +)
207-208
: Consider startup sequencing with main server.
connectToMainServer()
is called during sync-server initialization, but there's no guarantee the main server (and its/internal/sync-websocket
endpoint) is ready. In development or containerized environments where servers start concurrently, this may cause repeated connection failures until the main server is up.Consider adding a startup delay or a readiness check:
// Connect to main server WebSocket + // Delay connection to allow main server to start + setTimeout(connectToMainServer, 2000) - connectToMainServer()Or implement a more robust retry with backoff:
const connectWithBackoff = (retries = 0, maxRetries = 10) => { const delay = Math.min(1000 * Math.pow(2, retries), 30000) if (retries > 0) { Logger.info(`Retrying connection to main server (attempt ${retries + 1}/${maxRetries})`) } connectToMainServer() // Add logic in error handler to call connectWithBackoff with incremented retries }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
server/api/admin.ts
(2 hunks)server/api/oauth.ts
(2 hunks)server/config.ts
(2 hunks)server/integrations/google/index.ts
(4 hunks)server/integrations/metricStream.ts
(1 hunks)server/integrations/slack/index.ts
(1 hunks)server/server.ts
(9 hunks)server/sync-server.ts
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- server/integrations/google/index.ts
- server/integrations/metricStream.ts
- server/integrations/slack/index.ts
- server/api/oauth.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.
Applied to files:
server/server.ts
🧬 Code graph analysis (3)
server/api/admin.ts (5)
server/db/user.ts (1)
getUserByEmail
(147-156)server/db/connector.ts (2)
db
(3-3)getConnectorByExternalId
(369-403)server/db/client.ts (1)
db
(15-15)server/integrations/google/index.ts (2)
loggerWithChild
(147-149)handleGoogleOAuthIngestion
(861-1059)server/utils.ts (1)
getErrorMessage
(103-106)
server/server.ts (1)
server/integrations/metricStream.ts (1)
wsConnections
(17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)LogMiddleware
(125-190)server/types.ts (4)
ingestMoreChannelSchema
(560-565)startSlackIngestionSchema
(566-568)serviceAccountIngestMoreSchema
(259-271)startGoogleIngestionSchema
(569-571)server/api/admin.ts (6)
IngestMoreChannelApi
(1553-1586)StartSlackIngestionApi
(1449-1498)ServiceAccountIngestMoreUsersApi
(932-1021)StartGoogleIngestionApi
(1500-1552)HandlePerUserSlackSync
(2120-2155)HandlePerUserGoogleWorkSpaceSync
(2082-2118)
🔇 Additional comments (10)
server/config.ts (1)
59-59
: LGTM! Centralized cookie name configuration.The addition of
AccessTokenCookie
constant centralizes the access token cookie name, replacing hard-coded strings and improving maintainability.Also applies to: 223-223
server/api/admin.ts (2)
113-116
: LGTM! Necessary imports for Google ingestion.The added imports for
handleGoogleOAuthIngestion
andhandleGoogleServiceAccountIngestion
support the new Google ingestion API endpoint.
1500-1552
: LGTM! Follows established patterns for ingestion APIs.The
StartGoogleIngestionApi
correctly mirrors theStartSlackIngestionApi
pattern with appropriate error handling and fire-and-forget async ingestion.Note: Past review comments have already flagged debug console.log statements and type assertion improvements that should be addressed.
server/server.ts (5)
153-162
: LGTM! Necessary imports for WebSocket message forwarding.The imports enable the sync-server to forward WebSocket messages to frontend connections through the metric stream infrastructure.
301-301
: LGTM! Uses centralized config constant.Correctly derives
AccessTokenCookieName
fromconfig.AccessTokenCookie
instead of hard-coding the string.
1026-1034
: LGTM! Ingestion routes correctly proxied to sync-server.The ingestion endpoints are appropriately forwarded to the sync-server using the proxy function.
1057-1060
: LGTM! Admin sync routes proxied to sync-server.The per-user sync routes are correctly forwarded to the sync-server.
1104-1106
: LGTM! Service account ingestion proxied to sync-server.The service account ingest more users endpoint is correctly proxied to the sync-server.
server/sync-server.ts (2)
34-34
: LGTM: Proper typing for JWT context.The Hono app is correctly typed with
JwtVariables
to enable type-safe JWT payload access.
140-170
: LGTM: Protected endpoints properly configured.The JWT middleware is correctly applied to all routes, and the Slack/Google ingestion endpoints use
zValidator
for request validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (9)
server/types.ts (1)
563-565
: Resolve the connectorId type inconsistency flagged in the previous review.The
startGoogleIngestionSchema
still usesconnectorId: z.string()
(line 564) whilestartSlackIngestionSchema
usesconnectorId: z.number()
(line 561). This inconsistency was flagged in the previous review and remains unresolved, which will cause runtime validation failures or integration issues when connectorId values are passed between systems expecting different types.Apply this diff to align with the database schema and
startSlackIngestionSchema
:export const startGoogleIngestionSchema = z.object({ - connectorId: z.string(), + connectorId: z.number(), })server/sync-server.ts (5)
176-227
: Use ESM-compatible path resolution for Worker.The worker path uses
__dirname
(line 183) which isn't reliably available in ESM contexts. Bun supportsimport.meta.dir
for directory resolution.Apply this diff:
Logger.info(`Starting ${workerCount} file processing worker threads...`) for (let i = 0; i < workerCount; i++) { - const fileProcessingWorker = new Worker(path.join(__dirname, "fileProcessingWorker.ts")) + const fileProcessingWorker = new Worker(path.join(import.meta.dir, "fileProcessingWorker.ts")) workerThreads.push(fileProcessingWorker)And update the restart logic:
Logger.info(`Restarting file processing worker thread ${i + 1}...`) - const newWorker = new Worker(path.join(__dirname, "fileProcessingWorker.ts")) + const newWorker = new Worker(path.join(import.meta.dir, "fileProcessingWorker.ts")) workerThreads[i] = newWorker
24-25
: Remove unused imports.The
db
andgetUserByEmail
imports aren't used directly in this file. The API handlers imported from@/api/admin
use them internally, so these imports are unnecessary here.Apply this diff:
-import { db } from "@/db/client" -import { getUserByEmail } from "@/db/user" import type { JwtVariables } from "hono/jwt"
41-64
: Make WebSocket URL configurable and validate secret.The WebSocket connection has critical issues:
Hardcoded localhost (line 42): Won't work in containerized or distributed deployments. The config variable mentioned in comments should be used.
Missing secret validation (line 43): If
METRICS_SECRET
is undefined, the connection will useAuthorization: Bearer undefined
.Apply this diff:
+const mainServerWsUrl = process.env.MAIN_SERVER_WS_URL || `ws://${process.env.MAIN_SERVER_HOST || 'localhost'}:${config.port}/internal/sync-websocket` +const authSecret = process.env.METRICS_SECRET + +if (!authSecret) { + throw new Error("METRICS_SECRET environment variable is required for sync-server WebSocket connection") +} + const connectToMainServer = () => { - const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket` - const authSecret = process.env.METRICS_SECRET - mainServerWebSocket = new WebSocket(mainServerUrl, { + mainServerWebSocket = new WebSocket(mainServerWsUrl, { headers: { Authorization: `Bearer ${authSecret}`, }, })
67-97
: Address message loss and race condition.Two reliability issues remain:
Messages are lost when WebSocket is unavailable (lines 84-86): Critical ingestion status updates are logged and discarded. Consider implementing an in-memory queue to buffer messages during connection outages.
Race condition in reconnect logic (lines 88-95): Both error/close handlers (lines 56, 62) and this function can trigger
connectToMainServer()
concurrently, potentially creating duplicate connections or connection churn.For the race condition, add a reconnection flag:
+let isReconnecting = false + const connectToMainServer = () => { + if (isReconnecting) return + isReconnecting = true + mainServerWebSocket = new WebSocket(mainServerWsUrl, { headers: { Authorization: `Bearer ${authSecret}`, }, }) mainServerWebSocket.on("open", () => { + isReconnecting = false + Logger.info("WebSocket connection to main server established") }) mainServerWebSocket.on("error", (error) => { Logger.error(error, "WebSocket connection to main server failed") mainServerWebSocket = null + isReconnecting = false setTimeout(connectToMainServer, 5000) }) mainServerWebSocket.on("close", () => { + Logger.info("WebSocket connection to main server closed") mainServerWebSocket = null + isReconnecting = false setTimeout(connectToMainServer, 5000) }) }And update the reconnect in
sendWebsocketMessageToMainServer
:if ( !mainServerWebSocket || mainServerWebSocket.readyState === WebSocket.CLOSED ) { - Logger.info("Attempting to reconnect to main server...") - connectToMainServer() + if (!isReconnecting) { + Logger.info("Attempting to reconnect to main server...") + connectToMainServer() + } }
99-107
: Validate ACCESS_TOKEN_SECRET at startup.The non-null assertion on line 100 will cause a runtime error if
ACCESS_TOKEN_SECRET
is missing. Validate the environment variable at module initialization.Apply this diff:
const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET! +if (!accessTokenSecret) { + throw new Error("ACCESS_TOKEN_SECRET environment variable is required") +} const AccessTokenCookieName = config.AccessTokenCookieserver/server.ts (3)
1187-1195
: Remove duplicate ingestion endpoint registrations.Lines 1187-1195 duplicate the ingestion endpoints already registered at lines 1141-1149. The
.basePath("/admin")
at line 1164 means these routes are accessible at both/api/v1/slack/...
and/api/v1/admin/slack/...
, which is redundant.Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.
Apply this diff to remove the duplicates:
.post( "/microsoft/service_account", zValidator("form", microsoftServiceSchema), AddServiceConnectionMicrosoft, ) - .post("/slack/ingest_more_channel", (c) => - proxyToSyncServer(c, "/slack/ingest_more_channel"), - ) - .post("/slack/start_ingestion", (c) => - proxyToSyncServer(c, "/slack/start_ingestion"), - ) - .post("/google/start_ingestion", (c) => - proxyToSyncServer(c, "/google/start_ingestion"), - ) .delete( "/oauth/connector/delete",
1301-1351
: Address WebSocket authentication and reliability issues.The sync-server WebSocket endpoint has several issues previously flagged:
Ineffective authentication (lines 1315-1319): Unauthorized connections return an empty
onOpen
handler but the WebSocket upgrade still succeeds. The connection should be rejected during the upgrade phase.No connection limit: Multiple sync-servers could connect simultaneously without restriction.
Missing validation (line 1328):
JSON.parse
is called without schema validation on the message fields.No rate limiting: A compromised sync-server could flood messages.
Consider implementing these improvements:
- Reject unauthorized upgrades immediately by throwing
HTTPException(401)
instead of returning empty handlers- Add connection limiting with a module-scoped flag/counter
- Validate message schema before accessing
message
andconnectorId
fields- Implement rate limiting per connection
Generate a verification script to check if there are existing validation utilities:
#!/bin/bash # Search for message validation patterns in WebSocket handlers rg -n "WebSocket.*schema|zod.*parse" --type=ts -C3
1394-1434
: Make sync-server URL configurable and add timeout.The proxy function has issues previously flagged:
Hardcoded localhost (line 1408): Won't work in containerized deployments (Docker, Kubernetes). The reviewer mentioned using
xyne-app-sync
in Docker environments.No timeout: The fetch call can hang indefinitely, blocking the request.
Generic error messages: Consider whether internal error details should be exposed.
Apply these improvements:
const proxyToSyncServer = async (c: Context, endpoint: string) => { try { // Get JWT token from cookie const token = getCookie(c, AccessTokenCookieName) if (!token) { throw new HTTPException(401, { message: "No authentication token" }) } // Get request body const body = await c.req.json() + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), 30000) // 30s timeout + // Forward to sync server - const response = await fetch( - `http://localhost:${config.syncServerPort}${endpoint}`, - { + try { + const syncServerHost = process.env.SYNC_SERVER_HOST || 'http://localhost' + const response = await fetch( + `${syncServerHost}:${config.syncServerPort}${endpoint}`, + { - method: "POST", - headers: { - "Content-Type": "application/json", - Cookie: `${AccessTokenCookieName}=${token}`, - }, - body: JSON.stringify(body), - }, - ) + method: "POST", + headers: { + "Content-Type": "application/json", + Cookie: `${AccessTokenCookieName}=${token}`, + }, + body: JSON.stringify(body), + signal: controller.signal, + }, + ) + clearTimeout(timeoutId) - if (!response.ok) { - const errorData = await response - .json() - .catch(() => ({ message: "Proxy request failed" })) - throw new HTTPException(response.status as any, { - message: errorData.message || "Proxy request failed", - }) - } + if (!response.ok) { + Logger.warn(`Sync-server request failed: ${response.status}`) + throw new HTTPException(response.status as any, { + message: "Request to sync server failed", + }) + } - return c.json(await response.json()) + return c.json(await response.json()) + } catch (fetchError) { + clearTimeout(timeoutId) + if (fetchError instanceof Error && fetchError.name === 'AbortError') { + Logger.error(`Sync-server request timeout for ${endpoint}`) + throw new HTTPException(504, { message: "Request timeout" }) + } + throw fetchError + } } catch (error) { if (error instanceof HTTPException) throw error Logger.error(error, `Proxy request to ${endpoint} failed`) throw new HTTPException(500, { message: "Internal server error" }) } }Add to your config:
syncServerHost: process.env.SYNC_SERVER_HOST
🧹 Nitpick comments (1)
server/shared/types.ts (1)
130-140
: LGTM! Consider adding documentation for the new status.The new
Authenticated
status is well-placed in the OAuth flow progression (NotConnected → Connecting → Authenticated → Connected). However, adding a JSDoc comment explaining when this status is used would help other developers understand the distinction betweenAuthenticated
andConnected
.Consider adding documentation:
export enum ConnectorStatus { Connected = "connected", // Pending = 'pending', Connecting = "connecting", - Paused = "paused", Failed = "failed", // for oauth we will default to this NotConnected = "not-connected", + // OAuth flow: user authenticated but ingestion not yet started Authenticated = "authenticated", }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
server/config.ts
(2 hunks)server/server.ts
(9 hunks)server/shared/types.ts
(1 hunks)server/sync-server.ts
(3 hunks)server/types.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/config.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.
Applied to files:
server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
wsConnections
(17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)LogMiddleware
(125-190)server/types.ts (4)
ingestMoreChannelSchema
(554-559)startSlackIngestionSchema
(560-562)serviceAccountIngestMoreSchema
(259-271)startGoogleIngestionSchema
(563-565)server/api/admin.ts (6)
IngestMoreChannelApi
(1553-1586)StartSlackIngestionApi
(1449-1498)ServiceAccountIngestMoreUsersApi
(932-1021)StartGoogleIngestionApi
(1500-1552)HandlePerUserSlackSync
(2120-2155)HandlePerUserGoogleWorkSpaceSync
(2082-2118)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (5)
server/server.ts (2)
166-175
: LGTM! Imports support the new WebSocket bridge.The new imports are used appropriately:
wsConnections
is used in the sync-server WebSocket handler (line 1331)- Other imports support the proxy functionality
314-314
: LGTM! Cookie name is now configurable.Using
config.AccessTokenCookie
instead of a hardcoded string improves maintainability and consistency.server/sync-server.ts (3)
34-34
: LGTM! Type-safe JWT variables.Adding
JwtVariables
to the Hono app type provides type safety for JWT payload access and is consistent with the main server's pattern.
140-170
: LGTM! Protected ingestion endpoints are well-structured.The ingestion endpoints are properly:
- Protected with JWT authentication
- Validated with Zod schemas
- Organized by integration (Slack, Google)
237-240
: LGTM! WebSocket connection established at startup.Connecting to the main server during initialization is appropriate, assuming the configuration and validation issues flagged earlier are addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/sync-server.ts (1)
173-198
: Broken merge leaves unterminatedinitSyncServer
.There’s a half-removed
initSyncServer
definition (lines 173‑184) that never closes itsfor
loop or the function. That’s what TS is complaining about (TS1005: '}' expected
). Drop this orphaned block and keep the new helper‑based implementation starting atstartAndMonitorWorkers
. As written, the file doesn’t compile.
♻️ Duplicate comments (3)
server/server.ts (1)
1308-1354
: Unauthorized sync-server sockets stay open.The auth guard returns a handler even when the bearer secret is wrong, so the upgrade completes and the unauthorized client keeps an idle connection contrary to the comment. Throw an
HTTPException(401)
(or callws.close(1008, ...)
insideonOpen
) to actually reject the socket. This was flagged earlier and still needs fixing.- if ( - !authHeader || - !authHeader.startsWith("Bearer ") || - authHeader.slice(7) !== expectedSecret - ) { - Logger.warn("Unauthorized sync-server WebSocket connection attempt") - return { - onOpen() { - // Close immediately if unauthorized - }, - } - } + if ( + !authHeader || + !authHeader.startsWith("Bearer ") || + authHeader.slice(7) !== expectedSecret + ) { + Logger.warn("Unauthorized sync-server WebSocket connection attempt") + throw new HTTPException(401, { message: "Unauthorized" }) + }server/sync-server.ts (2)
41-94
: Make the main-server WS client configurable and guarded.
connectToMainServer
still dialsws://localhost:${config.port}
withBearer undefined
whenMETRICS_SECRET
isn’t set, so we just spin on failed connections outside local dev. Please pull host/port from config (config.mainServerWsUrl
or similar), validateMETRICS_SECRET
at startup, and ensure only one reconnect attempt runs at a time (see earlier feedback aboutisReconnecting
).
99-107
: Validate ACCESS_TOKEN_SECRET before wiring JWT.Using
process.env.ACCESS_TOKEN_SECRET!
means we’ll passundefined
into the Hono JWT middleware if the env var is missing, crashing requests at runtime. Fail fast instead: read the env, throw a clear error when it’s absent, and only buildAuthMiddleware
with a verified string.-const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET! +const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET +if (!accessTokenSecret) { + throw new Error("ACCESS_TOKEN_SECRET environment variable is required") +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
server/config.ts
(2 hunks)server/server.ts
(9 hunks)server/sync-server.ts
(3 hunks)server/types.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/config.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.
Applied to files:
server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
wsConnections
(17-17)
server/sync-server.ts (3)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)LogMiddleware
(125-190)server/types.ts (4)
ingestMoreChannelSchema
(557-562)startSlackIngestionSchema
(563-565)serviceAccountIngestMoreSchema
(262-274)startGoogleIngestionSchema
(566-568)server/api/admin.ts (5)
StartSlackIngestionApi
(1449-1498)ServiceAccountIngestMoreUsersApi
(932-1021)StartGoogleIngestionApi
(1500-1552)HandlePerUserSlackSync
(2120-2155)HandlePerUserGoogleWorkSpaceSync
(2082-2118)
🪛 GitHub Actions: TypeScript Build Check
server/sync-server.ts
[error] 313-313: TS1005: '}' expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (6)
server/sync-server.ts (6)
172-248
: Use ESM-compatible path resolution for Worker scripts.The code uses
__dirname
(line 184) which is not ESM-compatible. In Bun with ESM modules,__dirname
may not be defined or may resolve unexpectedly.Use
import.meta.dir
(Bun-specific) or reconstruct__dirname
viafileURLToPath
:Option 1: Use Bun's
import.meta.dir
:- const worker = new Worker(path.join(__dirname, workerScript)) + const worker = new Worker(path.join(import.meta.dir, workerScript))Option 2: ESM-compatible reconstruction:
Add at the top of the file:
+import { fileURLToPath } from "url" +const __filename = fileURLToPath(import.meta.url) +const __dirname = path.dirname(__filename)
24-25
: Remove unused imports.The imports
db
andgetUserByEmail
are not used in this file. They're used by the API handlers in@/api/admin
, which import them directly.Apply this diff:
-import { db } from "@/db/client" -import { getUserByEmail } from "@/db/user"
41-64
: Hardcoded localhost URL and missing secret validation.The WebSocket URL is hardcoded to
localhost
(line 42), which won't work in containerized or distributed deployments. Additionally,METRICS_SECRET
(line 43) is read without validation—if undefined, the connection will useAuthorization: Bearer undefined
.Apply this diff to make the URL configurable and validate the secret:
+const mainServerWsUrl = process.env.MAIN_SERVER_WS_URL || `ws://localhost:${config.port}/internal/sync-websocket` +const authSecret = process.env.METRICS_SECRET + +if (!authSecret) { + throw new Error("METRICS_SECRET environment variable is required for sync-server WebSocket connection") +} + const connectToMainServer = () => { - const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket` - const authSecret = process.env.METRICS_SECRET - mainServerWebSocket = new WebSocket(mainServerUrl, { + mainServerWebSocket = new WebSocket(mainServerWsUrl, {
67-97
: Messages are lost when WebSocket is unavailable.When the WebSocket connection is down or not yet established, messages are logged and discarded (lines 84-86). For critical ingestion status updates, this results in data loss.
Consider implementing an in-memory queue or buffer to retry sending when the connection is re-established. Would you like me to suggest an implementation?
88-95
: Race condition in reconnect logic.Both the
error
/close
handlers (lines 56, 62) and this function can triggerconnectToMainServer()
concurrently. If the connection drops while a message is being sent, multiple reconnection attempts may race, potentially creating duplicate connections or connection churn.Add a reconnection flag to prevent concurrent reconnection attempts:
+let isReconnecting = false + const connectToMainServer = () => { + if (isReconnecting) return + isReconnecting = true + const mainServerUrl = `ws://localhost:${config.port}/internal/sync-websocket` const authSecret = process.env.METRICS_SECRET mainServerWebSocket = new WebSocket(mainServerUrl, { headers: { Authorization: `Bearer ${authSecret}`, }, }) mainServerWebSocket.on("open", () => { + isReconnecting = false }) mainServerWebSocket.on("error", (error) => { Logger.error(error, "WebSocket connection to main server failed") mainServerWebSocket = null + isReconnecting = false setTimeout(connectToMainServer, 5000) }) mainServerWebSocket.on("close", () => { mainServerWebSocket = null + isReconnecting = false setTimeout(connectToMainServer, 5000) }) }And update line 93-94:
- Logger.info("Attempting to reconnect to main server...") - connectToMainServer() + if (!isReconnecting) { + Logger.info("Attempting to reconnect to main server...") + connectToMainServer() + }
99-107
: Validate ACCESS_TOKEN_SECRET at startup.The non-null assertion on line 100 will cause a runtime error if
ACCESS_TOKEN_SECRET
is missing. This should be validated at module load time.Apply this diff:
const accessTokenSecret = process.env.ACCESS_TOKEN_SECRET! +if (!accessTokenSecret) { + throw new Error("ACCESS_TOKEN_SECRET environment variable is required") +} const AccessTokenCookieName = config.AccessTokenCookie
🧹 Nitpick comments (1)
server/sync-server.ts (1)
169-170
: Consider adding zValidator for consistency.The sync endpoints (lines 169-170) don't use
zValidator
middleware, unlike the other ingestion endpoints. While the handlers validate internally usingsyncByMailSchema.parse()
, addingzValidator
middleware would make the validation strategy consistent across all endpoints.Apply this diff if
syncByMailSchema
is exported from@/types
:+import { syncByMailSchema } from "@/types" + // Sync APIs -app.post("/syncSlackByMail", HandlePerUserSlackSync) -app.post("/syncGoogleWorkSpaceByMail", HandlePerUserGoogleWorkSpaceSync) +app.post("/syncSlackByMail", zValidator("form", syncByMailSchema), HandlePerUserSlackSync) +app.post("/syncGoogleWorkSpaceByMail", zValidator("form", syncByMailSchema), HandlePerUserGoogleWorkSpaceSync)Then remove the internal parsing/validation from the handlers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/sync-server.ts
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/sync-server.ts (3)
server/logger/index.ts (3)
getLogger
(36-93)Subsystem
(15-15)LogMiddleware
(125-190)server/types.ts (4)
ingestMoreChannelSchema
(557-562)startSlackIngestionSchema
(563-565)serviceAccountIngestMoreSchema
(262-274)startGoogleIngestionSchema
(566-568)server/api/admin.ts (6)
IngestMoreChannelApi
(1553-1586)StartSlackIngestionApi
(1449-1498)ServiceAccountIngestMoreUsersApi
(932-1021)StartGoogleIngestionApi
(1500-1552)HandlePerUserSlackSync
(2120-2155)HandlePerUserGoogleWorkSpaceSync
(2082-2118)
🔇 Additional comments (2)
server/sync-server.ts (2)
259-286
: LGTM! Clean initialization sequence.The initialization flow is well-structured:
- Worker threads started with proper monitoring
- Queue initialized in background with error handling
- WebSocket connection to main server established
The refactoring improves readability and maintainability.
34-34
: Good addition of type safety.Adding
JwtVariables
to the Hono app type ensures type-safe access to JWT payload viac.get(JwtPayloadKey)
throughout the application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
server/types.ts (1)
566-568
: Resolve the connectorId type inconsistency between Slack and Google schemas.
startSlackIngestionSchema
usesconnectorId: z.number()
(line 563-565) whilestartGoogleIngestionSchema
usesconnectorId: z.string()
(line 566-568). Although the implementation in server/api/admin.ts shows this is intentional (Slack uses internal numeric IDs withgetConnector
, Google uses external string IDs withgetConnectorByExternalId
), this inconsistency creates a confusing API surface and increases the risk of integration errors.Consider standardizing both schemas to use the same ID type, preferably string externalIds for consistency with external-facing APIs.
server/api/admin.ts (1)
1503-1503
: Use type inference from Zod schema instead of type assertion.The type assertion
as { connectorId: string }
bypasses type safety. Usez.infer<typeof startGoogleIngestionSchema>
to ensure the type matches the schema definition automatically.Apply this diff:
- const payload = c.req.valid("json") as { connectorId: string } + const payload = c.req.valid("json") as z.infer<typeof startGoogleIngestionSchema>server/server.ts (1)
1199-1207
: Remove duplicate ingestion endpoint registrations.Lines 1199-1207 duplicate the ingestion endpoints already registered at lines 1153-1161. The
.basePath("/admin")
at line 1176 means these routes are accessible at both/api/v1/slack/...
and/api/v1/admin/slack/...
, which is redundant.Based on learnings, these endpoints are intentionally accessible to regular authenticated users, so the duplication in the admin section is unnecessary.
Apply this diff to remove the duplicates:
.post( "/microsoft/service_account", zValidator("form", microsoftServiceSchema), AddServiceConnectionMicrosoft, ) - .post("/slack/ingest_more_channel", (c) => - proxyToSyncServer(c, "/slack/ingest_more_channel"), - ) - .post("/slack/start_ingestion", (c) => - proxyToSyncServer(c, "/slack/start_ingestion"), - ) - .post("/google/start_ingestion", (c) => - proxyToSyncServer(c, "/google/start_ingestion"), - ) .delete( "/oauth/connector/delete",
🧹 Nitpick comments (2)
server/api/admin.ts (2)
1452-1452
: Use type inference from Zod schema instead of type assertion.The type assertion
as { connectorId: number }
bypasses type safety. Usez.infer<typeof startSlackIngestionSchema>
to ensure the type matches the schema definition automatically.Apply this diff:
- const payload = c.req.valid("json") as { connectorId: number } + const payload = c.req.valid("json") as z.infer<typeof startSlackIngestionSchema>
1542-1542
: Type error asunknown
instead ofany
.Using
any
for error types bypasses type checking. Type it asunknown
and use the existinggetErrorMessage
helper function (already imported) to safely extract the error message.Apply this diff:
- } catch (error: any) { + } catch (error: unknown) { loggerWithChild({ email: sub }).error( error, `Error starting regular Google ingestion: ${getErrorMessage(error)}`,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
server/api/admin.ts
(5 hunks)server/api/oauth.ts
(2 hunks)server/config.ts
(2 hunks)server/integrations/google/index.ts
(4 hunks)server/integrations/slack/index.ts
(1 hunks)server/server.ts
(9 hunks)server/shared/types.ts
(1 hunks)server/types.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- server/integrations/slack/index.ts
- server/config.ts
- server/shared/types.ts
- server/api/oauth.ts
- server/integrations/google/index.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-11T14:10:56.008Z
Learnt from: naSim087
PR: xynehq/xyne#725
File: server/server.ts:784-811
Timestamp: 2025-08-11T14:10:56.008Z
Learning: In the xyne application (server/server.ts), the following endpoints are intentionally accessible to regular authenticated users (not just admins):
- POST /oauth/create - allows users to create OAuth providers
- POST /slack/ingest_more_channel - allows users to ingest Slack channels
- POST /slack/start_ingestion - allows users to start Slack ingestion
- DELETE /oauth/connector/delete - allows users to delete OAuth connectors
- POST /connector/update_status - allows users to update connector status
- GET /connectors/all - allows users to view all connectors
- GET /oauth/global-slack-provider - allows users to check for global Slack provider
These endpoints enable self-service integration management for users.
Applied to files:
server/server.ts
🧬 Code graph analysis (2)
server/server.ts (1)
server/integrations/metricStream.ts (1)
wsConnections
(17-17)
server/api/admin.ts (6)
server/db/connector.ts (3)
getConnector
(143-164)db
(3-3)getConnectorByExternalId
(382-416)server/db/client.ts (1)
db
(15-15)server/db/user.ts (1)
getUserByEmail
(148-157)server/integrations/google/index.ts (2)
loggerWithChild
(146-148)handleGoogleOAuthIngestion
(860-1058)server/errors/index.ts (1)
NoUserFound
(322-327)server/utils.ts (1)
getErrorMessage
(103-106)
🔇 Additional comments (2)
server/api/admin.ts (1)
1557-1557
: LGTM! Consistent with Slack schema.The payload type correctly uses
connectorId: number
to align with the updatedingestMoreChannelSchema
that uses numeric connector IDs.server/server.ts (1)
173-176
: LGTM! Configuration centralized properly.The imports for
wsConnections
andsendWebsocketMessage
are correctly added, and usingconfig.AccessTokenCookie
centralizes the cookie name configuration.Also applies to: 319-319
t push origin # the commit.
# [3.16.0](v3.15.0...v3.16.0) (2025-10-14) ### Features * migrate ingestion APIs to sync-server ([#1014](#1014)) ([f97ae06](f97ae06))
Description
Move ingestion endpoints from server.ts to sync-server.ts
Testing
Additional Notes
Summary by CodeRabbit
New Features
Improvements
Stability