diff --git a/frontend/bun.lockb b/frontend/bun.lockb index 64cdb8771..e6a3b31aa 100755 Binary files a/frontend/bun.lockb and b/frontend/bun.lockb differ diff --git a/frontend/src/routes/_authenticated/admin/integrations/microsoft.tsx b/frontend/src/routes/_authenticated/admin/integrations/microsoft.tsx index 1b945a238..881d6e22d 100644 --- a/frontend/src/routes/_authenticated/admin/integrations/microsoft.tsx +++ b/frontend/src/routes/_authenticated/admin/integrations/microsoft.tsx @@ -42,16 +42,15 @@ const submitOAuthForm = async ( navigate: UseNavigateResult, userRole: UserRole, ) => { - // Role-based API routing - const isAdmin = - userRole === UserRole.Admin || userRole === UserRole.SuperAdmin + // Map authType to isServiceAuth boolean + const isServiceAuth = value.authType === "appOnly" - const response = isAdmin - ? await api.admin.oauth.create.$post({ + const response = isServiceAuth + ? await api.admin.microsoft.service_account.$post({ form: { clientId: value.clientId, clientSecret: value.clientSecret, - scopes: value.scopes, + tenantId: value.tenantId, app: Apps.MicrosoftDrive, }, }) @@ -59,7 +58,7 @@ const submitOAuthForm = async ( form: { clientId: value.clientId, clientSecret: value.clientSecret, - scopes: value.scopes, + scopes: [value.scopes, ""], app: Apps.MicrosoftDrive, }, }) @@ -81,28 +80,51 @@ const submitOAuthForm = async ( type OAuthFormData = { clientId: string clientSecret: string - scopes: string[] + scopes: string + tenantId: string + authType: "delegated" | "appOnly" } export const OAuthForm = ({ onSuccess, userRole, -}: { onSuccess: any; userRole: UserRole }) => { + setOAuthIntegrationStatus, +}: { + onSuccess: any + userRole: UserRole + setOAuthIntegrationStatus: (status: OAuthIntegrationStatus) => void +}) => { const { toast } = useToast() const navigate = useNavigate() const form = useForm({ defaultValues: { clientId: "", clientSecret: "", - scopes: [], + tenantId: "", + scopes: "https://graph.microsoft.com/.default", + authType: "delegated", }, onSubmit: async ({ value }) => { try { await submitOAuthForm(value, navigate, userRole) - toast({ - title: "Microsoft OAuth integration added", - description: "Perform OAuth to add the data", - }) + + // Handle different flows based on authentication type + if (value.authType === "appOnly") { + // For app-only (service account), skip OAuth redirect and directly start connecting + toast({ + title: "Microsoft service account integration created", + description: "Starting data ingestion...", + }) + setOAuthIntegrationStatus(OAuthIntegrationStatus.OAuthConnecting) + } else { + // For delegated, show OAuth message and wait for OAuth redirect + toast({ + title: "Microsoft OAuth integration added", + description: "Perform OAuth to add the data", + }) + setOAuthIntegrationStatus(OAuthIntegrationStatus.OAuth) + } + onSuccess() } catch (error) { toast({ @@ -172,27 +194,106 @@ export const OAuthForm = ({ (!value ? "scopes are required" : undefined), - }} children={(field) => ( <> field.handleChange(e.target.value.split(","))} - placeholder="Enter OAuth scopes" + disabled={true} + placeholder="https://graph.microsoft.com/.default" + className="bg-gray-50 dark:bg-gray-800 text-gray-500 dark:text-gray-400" /> - {field.state.meta.isTouched && field.state.meta.errors.length ? ( -

- {field.state.meta.errors.join(", ")} -

- ) : null} )} /> + {/* Only show Authentication Type selection for Admin/SuperAdmin users */} + {(userRole === UserRole.Admin || userRole === UserRole.SuperAdmin) && ( + <> + + ( +
+
+ + field.handleChange( + e.target.value as "delegated" | "appOnly", + ) + } + className="h-4 w-4" + /> + +
+
+ + field.handleChange( + e.target.value as "delegated" | "appOnly", + ) + } + className="h-4 w-4" + /> + +
+
+ )} + /> + + {/* Only show Tenant ID when App-only is selected */} + + authTypeField.state.value === "appOnly" && ( + <> + + + !value ? "Tenant ID is required" : undefined, + }} + children={(field) => ( + <> + field.handleChange(e.target.value)} + placeholder="Enter tenant ID" + /> + {field.state.meta.isTouched && + field.state.meta.errors.length ? ( +

+ {field.state.meta.errors.join(", ")} +

+ ) : null} + + )} + /> + + ) + } + /> + + )} + ) @@ -342,10 +443,11 @@ export const MicrosoftOAuthTab = ({ if (oauthIntegrationStatus === OAuthIntegrationStatus.Provider) { return ( - setOAuthIntegrationStatus(OAuthIntegrationStatus.OAuth) - } + onSuccess={() => { + // This will be overridden by the form's own logic based on authType + }} userRole={userRole} + setOAuthIntegrationStatus={setOAuthIntegrationStatus} /> ) } @@ -372,7 +474,7 @@ export const MicrosoftOAuthTab = ({ <> - Microsoft OAuth + Microsoft Auth {oauthIntegrationStatus === OAuthIntegrationStatus.OAuthConnected ? ( diff --git a/server/api/admin.ts b/server/api/admin.ts index 03be954a1..7ef909af8 100644 --- a/server/api/admin.ts +++ b/server/api/admin.ts @@ -48,6 +48,8 @@ import { MCPConnectorMode, MCPClientConfig, Subsystem, + type microsoftService, + type MicrosoftServiceCredentials, // Added for tool status updates updateToolsStatusSchema, // Added for tool status updates type userRoleChange, } from "@/types" @@ -74,8 +76,18 @@ import { Slack, MicrosoftEntraId, } from "arctic" -import type { SelectOAuthProvider, SelectUser } from "@/db/schema" -import { users, chats, messages, agents } from "@/db/schema" // Add database schema imports +import type { + SelectConnector, + SelectOAuthProvider, + SelectUser, +} from "@/db/schema" +import { + users, + chats, + messages, + agents, + selectConnectorSchema, +} from "@/db/schema" // Add database schema imports import { getErrorMessage, IsGoogleApp, @@ -117,6 +129,11 @@ import { import { zValidator } from "@hono/zod-validator" import { handleSlackChanges } from "@/integrations/slack/sync" import { getAgentByExternalIdWithPermissionCheck } from "@/db/agent" +import { ClientSecretCredential } from "@azure/identity" +import { Client as GraphClient } from "@microsoft/microsoft-graph-client" +import type { AuthenticationProvider } from "@microsoft/microsoft-graph-client" +import { handleMicrosoftServiceAccountIngestion } from "@/integrations/microsoft" +import { CustomServiceAuthProvider } from "@/integrations/microsoft/utils" import { KbItemsSchema, type VespaSchema } from "@xyne/vespa-ts" import { GetDocument } from "@/search/vespa" import { getCollectionFilesVespaIds } from "@/db/knowledgeBase" @@ -444,6 +461,100 @@ export const CreateOAuthProvider = async (c: Context) => { }) } +export const AddServiceConnectionMicrosoft = async (c: Context) => { + const { sub, workspaceId } = c.get(JwtPayloadKey) + loggerWithChild({ email: sub }).info("AddServiceConnectionMicrosoft") + const email = sub + const userRes = await getUserByEmail(db, email) + if (!userRes || !userRes.length) { + throw new NoUserFound({}) + } + const [user] = userRes + // @ts-ignore + const form: microsoftService = c.req.valid("form") + + let { clientId, clientSecret, tenantId } = form + let scopes = ["https://graph.microsoft.com/.default"] + const app = Apps.MicrosoftSharepoint + + if (!clientId || !clientSecret || !tenantId) { + throw new HTTPException(400, { + message: "Client ID, Client Secret, and Tenant ID are required", + }) + } + + try { + const authProvider = new CustomServiceAuthProvider( + tenantId, + clientId, + clientSecret, + ) + + const accessToken = await authProvider.getAccessTokenWithExpiry() + const expiresAt = new Date(accessToken.expiresOnTimestamp) + + const credentialsData: MicrosoftServiceCredentials = { + tenantId, + clientId, + clientSecret, + scopes, + access_token: accessToken.token, + expires_at: expiresAt.toISOString(), + } + + const res = await insertConnector( + db, + user.workspaceId, + user.id, + user.workspaceExternalId, + `${app}-${ConnectorType.SaaS}-${AuthType.ServiceAccount}`, + ConnectorType.SaaS, + AuthType.ServiceAccount, + app, + {}, + JSON.stringify(credentialsData), // Store the validated credentials + email, // Use current user's email as subject email + null, + null, + ConnectorStatus.Connected, // Set as connected since we validated the connection + ) + + const connector = selectConnectorSchema.parse(res) + + if (!connector) { + throw new ConnectorNotCreated({}) + } + await handleMicrosoftServiceAccountIngestion(email, connector) + + loggerWithChild({ email: sub }).info( + `Microsoft service account connector created with ID: ${connector.externalId}`, + ) + + return c.json({ + success: true, + message: "connection created and job enqueued", + id: connector.externalId, + expiresAt: expiresAt.toISOString(), + }) + } catch (error) { + const errMessage = getErrorMessage(error) + loggerWithChild({ email: email }).error( + error, + `${new AddServiceConnectionError({ + cause: error as Error, + })} \n : ${errMessage} : ${(error as Error).stack}`, + ) + + if (error instanceof HTTPException) { + throw error + } + + throw new HTTPException(500, { + message: "Error creating Microsoft service account connection", + }) + } +} + export const AddServiceConnection = async (c: Context) => { const { sub, workspaceId } = c.get(JwtPayloadKey) loggerWithChild({ email: sub }).info("AddServiceConnection") diff --git a/server/bun.lockb b/server/bun.lockb index 1421bd1b1..dfc064608 100755 Binary files a/server/bun.lockb and b/server/bun.lockb differ diff --git a/server/db/connector.ts b/server/db/connector.ts index 7c40b12d4..504fbd538 100644 --- a/server/db/connector.ts +++ b/server/db/connector.ts @@ -11,7 +11,11 @@ import { type SelectConnector, type SelectOAuthProvider, } from "./schema" -import type { OAuthCredentials, TxnOrClient } from "@/types" // ConnectorType removed +import type { + MicrosoftServiceCredentials, + OAuthCredentials, + TxnOrClient, +} from "@/types" // ConnectorType removed import { Subsystem } from "@/types" import { and, eq } from "drizzle-orm" import { Apps, AuthType, ConnectorStatus, ConnectorType } from "@/shared/types" // ConnectorType added @@ -31,6 +35,8 @@ import { getOAuthProviderByConnectorId } from "@/db/oauthProvider" import { getErrorMessage } from "@/utils" import { syncJobs, syncHistory } from "@/db/schema" import { scopes } from "@/integrations/microsoft/config" +import { CustomServiceAuthProvider } from "@/integrations/microsoft/utils" +import { date } from "zod" const Logger = getLogger(Subsystem.Db).child({ module: "connector" }) export const insertConnector = async ( @@ -157,17 +163,15 @@ export const getConnector = async ( } } -const IsTokenExpired = ( +const IsExpired = ( app: Apps, - oauthCredentials: OAuthCredentials, + expiresAt: Date, bufferInSeconds: number, ): boolean => { if (IsGoogleApp(app) || IsMicrosoftApp(app)) { - const tokens = oauthCredentials.data const now: Date = new Date() - // make the type as Date, currently the date is stringified - const expirationTime = new Date(tokens.accessTokenExpiresAt).getTime() const currentTime = now.getTime() + const expirationTime = new Date(expiresAt).getTime() return currentTime + bufferInSeconds * 1000 > expirationTime } return false @@ -207,9 +211,9 @@ export const getOAuthConnectorWithCredentials = async ( // google tokens have expiry of 1 hour // 5 minutes before expiry we refresh them if ( - IsTokenExpired( + IsExpired( oauthRes.app, - oauthRes.oauthCredentials as OAuthCredentials, + (oauthRes.oauthCredentials as OAuthCredentials).data.accessTokenExpiresAt, 5 * 60, ) ) { @@ -299,6 +303,69 @@ export const getOAuthConnectorWithCredentials = async ( return oauthRes } +export const getMicrosoftAuthConnectorWithCredentials = async ( + trx: TxnOrClient, + connectorId: number, +): Promise => { + const res = await trx + .select() + .from(connectors) + .where( + and( + eq(connectors.id, connectorId), + eq(connectors.authType, AuthType.ServiceAccount), + ), + ) + .limit(1) + + if (!res.length) { + throw new NoOauthConnectorFound({ + message: `Could not get the oauth connector with id: ${connectorId}`, + }) + } + + let authRes: SelectConnector = selectConnectorSchema.parse(res[0]) + + if (!authRes.credentials) { + throw new MissingOauthConnectorCredentialsError({}) + } + // parse the string + const credentials: MicrosoftServiceCredentials = JSON.parse( + authRes.credentials as string, + ) + + if (IsExpired(authRes.app, new Date(credentials.expires_at), 5 * 60)) { + // token is expired. We should get new tokens + // update it in place + if (IsMicrosoftApp(authRes.app)) { + const authProvider = new CustomServiceAuthProvider( + credentials.tenantId, + credentials.clientId, + credentials.clientSecret, + ) + + const accessToken = await authProvider.getAccessTokenWithExpiry() + credentials.access_token = accessToken.token + credentials.expires_at = new Date( + accessToken.expiresOnTimestamp, + ).toISOString() + + authRes = await updateConnector(trx, authRes.id, { + credentials: JSON.stringify(credentials), + }) + Logger.info(`Microsoft connector successfully updated: ${authRes.id}`) + } else { + Logger.error( + `Token has to refresh but ${authRes.app} app not yet supported`, + ) + throw new Error( + `Token has to refresh but ${authRes.app} app not yet supported`, + ) + } + } + return authRes +} + export const getConnectorByExternalId = async ( trx: TxnOrClient, connectorId: string, diff --git a/server/integrations/microsoft/client.ts b/server/integrations/microsoft/client.ts index 1a6fb84a4..6a9f5b420 100644 --- a/server/integrations/microsoft/client.ts +++ b/server/integrations/microsoft/client.ts @@ -22,25 +22,46 @@ class CustomAuthProvider implements AuthenticationProvider { export interface MicrosoftGraphClient { client: Client accessToken: string - refreshToken: string + refreshToken?: string // Only for delegated clients + tenantId?: string // Only for service clients clientId: string clientSecret: string betaClient: Client - authProvider: CustomAuthProvider // Helper methods to get updated tokens after refresh getCurrentTokens(): { accessToken: string - refreshToken: string + refreshToken?: string expiresAt?: Date } } +export const updateMicrosoftGraphClient = ( + graphClient: MicrosoftGraphClient, + accessToken: string, + refreshToken?: string, +) => { + graphClient.accessToken = accessToken + if (refreshToken) graphClient.refreshToken = refreshToken + + const authProvider = new CustomAuthProvider(accessToken) + + graphClient.client = Client.initWithMiddleware({ + authProvider, + defaultVersion: "v1.0", + }) + graphClient.betaClient = Client.initWithMiddleware({ + authProvider, + defaultVersion: "beta", + }) +} + // Create Microsoft Graph client similar to Google's pattern export const createMicrosoftGraphClient = ( accessToken: string, - refreshToken: string, clientId: string, clientSecret: string, + refreshToken?: string, + tenantId?: string, tokenExpiresAt?: Date, ): MicrosoftGraphClient => { const authProvider = new CustomAuthProvider(accessToken) @@ -58,10 +79,10 @@ export const createMicrosoftGraphClient = ( client, accessToken, refreshToken, + tenantId, clientId, clientSecret, betaClient, - authProvider, getCurrentTokens() { return { accessToken, @@ -76,15 +97,16 @@ export const createMicrosoftGraphClient = ( export const makeGraphApiCall = async ( graphClient: MicrosoftGraphClient, endpoint: string, - options?: any, ): Promise => { return retryWithBackoff( async () => { - const result = await graphClient.client.api(endpoint).get(options) + const result = await graphClient.client.api(endpoint).get() return result }, `Making Microsoft Graph API call to ${endpoint}`, Apps.MicrosoftDrive, + 1, + graphClient, ) } export const makeBetaGraphApiCall = async ( @@ -99,6 +121,8 @@ export const makeBetaGraphApiCall = async ( }, `Making Microsoft Graph API call to ${endpoint}`, Apps.MicrosoftDrive, + 1, + graphClient, ) } export const makeGraphApiCallWithHeaders = async ( @@ -121,6 +145,8 @@ export const makeGraphApiCallWithHeaders = async ( }, `Making Microsoft Graph API call to ${endpoint} with headers`, Apps.MicrosoftDrive, + 1, + graphClient, ) } @@ -148,6 +174,8 @@ export const makePagedGraphApiCall = async ( }, `Making paginated Microsoft Graph API call to ${nextLink}`, Apps.MicrosoftDrive, + 1, + graphClient, ) if (response.value) { @@ -162,14 +190,17 @@ export const makePagedGraphApiCall = async ( // Download file from Microsoft Graph export async function downloadFileFromGraph( - graphClient: Client, + graphClient: MicrosoftGraphClient, fileId: string, + driveId?: string, ): Promise { try { - const response = await graphClient - .api(`/me/drive/items/${fileId}/content`) - .get() + let endpoint: string + + if (driveId) endpoint = `drives/${driveId}/items/${fileId}/content` + else endpoint = `me/drive/items/${fileId}/content` + const response = await makeGraphApiCall(graphClient, endpoint) return await streamToBuffer(response) } catch (error) { throw new Error(`Failed to download file ${fileId}: ${error}`) diff --git a/server/integrations/microsoft/config.ts b/server/integrations/microsoft/config.ts index 3f5c6bc8e..238d4e0c8 100644 --- a/server/integrations/microsoft/config.ts +++ b/server/integrations/microsoft/config.ts @@ -1,10 +1,6 @@ // Microsoft Graph API scopes for OAuth export const scopes = [ - "https://graph.microsoft.com/Files.Read.All", - "https://graph.microsoft.com/Mail.Read", - "https://graph.microsoft.com/Calendars.Read", - "https://graph.microsoft.com/Contacts.Read", - "https://graph.microsoft.com/User.Read", + "https://graph.microsoft.com/.default", "offline_access", // Required for refresh tokens ] diff --git a/server/integrations/microsoft/index.ts b/server/integrations/microsoft/index.ts index 8f7432a6d..5d93bafb7 100644 --- a/server/integrations/microsoft/index.ts +++ b/server/integrations/microsoft/index.ts @@ -1,10 +1,13 @@ import { Subsystem, SyncCron, + type MicrosoftServiceCredentials, type OAuthCredentials, + type SaaSJob, type SaaSOAuthJob, } from "@/types" -import { getOAuthConnectorWithCredentials } from "@/db/connector" +import { getConnector, getOAuthConnectorWithCredentials } from "@/db/connector" +import { v4 as uuidv4 } from "uuid" import { insertUser, insertWithRetry } from "@/search/vespa" import { db } from "@/db/client" import { @@ -62,17 +65,20 @@ import { makePagedGraphApiCall, type MicrosoftGraphClient, } from "./client" -import type { Client } from "@microsoft/microsoft-graph-client" +import { Client } from "@microsoft/microsoft-graph-client" import type { DriveItem } from "@microsoft/microsoft-graph-types" import { handleOutlookIngestion } from "./outlook" import { getUniqueEmails } from "../google" import { htmlToText } from "html-to-text" +import type { InvokeModelResponseFilterSensitiveLog } from "@aws-sdk/client-bedrock-runtime" +import { + discoverSharePointSites, + discoverSiteDrives, + processSiteDrives, +} from "./sharepoint" +import { getFilePermissions, processFileContent, loggerWithChild } from "./utils" const Logger = getLogger(Subsystem.Integrations).child({ module: "microsoft" }) -export const loggerWithChild = getLoggerWithChild(Subsystem.Integrations, { - module: "microsoft", -}) - export const getTextFromEventDescription = (description: string): string => { return htmlToText(description, { wordwrap: 130 }) } @@ -734,80 +740,6 @@ const insertFilesForUser = async ( } } -async function getFilePermissions( - client: MicrosoftGraphClient, - fileId: string, -): Promise { - try { - const response = await makeGraphApiCall( - client, - `/me/drive/items/${fileId}/permissions`, - ) - - const emails = new Set() - - if (response.value && Array.isArray(response.value)) { - for (const permission of response.value) { - // Skip link-only permissions (no user identities) - if ( - permission.link && - !permission.grantedToV2 && - !permission.grantedToIdentitiesV2 - ) { - continue - } - - // grantedToV2 (modern single user) - if (permission.grantedToV2?.siteUser?.email) { - emails.add(permission.grantedToV2.siteUser.email) - } else if (permission.grantedToV2?.user?.email) { - emails.add(permission.grantedToV2.user.email) - } else if (permission.grantedToV2?.user?.userPrincipalName) { - emails.add(permission.grantedToV2.user.userPrincipalName) - } - - // grantedToIdentitiesV2 (modern multiple users) - if (Array.isArray(permission.grantedToIdentitiesV2)) { - for (const identity of permission.grantedToIdentitiesV2) { - if (identity.siteUser?.email) { - emails.add(identity.siteUser.email) - } else if (identity.user?.email) { - emails.add(identity.user.email) - } else if (identity.user?.userPrincipalName) { - emails.add(identity.user.userPrincipalName) - } - } - } - - // grantedTo (legacy single user) - if (permission.grantedTo?.user?.email) { - emails.add(permission.grantedTo.user.email) - } else if (permission.grantedTo?.user?.userPrincipalName) { - emails.add(permission.grantedTo.user.userPrincipalName) - } - - // grantedToIdentities (legacy multiple users) - if (Array.isArray(permission.grantedToIdentities)) { - for (const identity of permission.grantedToIdentities) { - if (identity.user?.email) { - emails.add(identity.user.email) - } else if (identity.user?.userPrincipalName) { - emails.add(identity.user.userPrincipalName) - } - } - } - } - } - - return [...emails] // convert Set to array - } catch (error) { - Logger.warn( - `Failed to get permissions for file ${fileId}: ${(error as Error).message}`, - ) - return [] - } -} - // Get all OneDrive files and folders using delta API async function getAllOneDriveFiles( client: MicrosoftGraphClient, @@ -878,6 +810,115 @@ async function getAllOneDriveFiles( } } +export const handleMicrosoftServiceAccountIngestion = async ( + email: string, + connector: SelectConnector, +) => { + const jobId = uuidv4() + loggerWithChild({ email: email! }).info( + `handleMicrosoftServiceAccountIngestion starting with jobId: ${jobId}`, + ) + const tracker = new Tracker(Apps.MicrosoftSharepoint, AuthType.ServiceAccount) + + try { + const credentials: MicrosoftServiceCredentials = JSON.parse( + connector.credentials as string, + ) + + const graphClient = createMicrosoftGraphClient( + credentials.access_token, + credentials.clientId, + credentials.clientSecret, + undefined, + credentials.tenantId, + ) + + const interval = setInterval(() => { + sendWebsocketMessage( + JSON.stringify({ + progress: tracker.getProgress(), + userStats: tracker.getServiceAccountProgress().userStats, + startTime: tracker.getStartTime(), + }), + connector!.externalId, + ) + }, 4000) + + //Discover all SharePoint sites + let sites = await discoverSharePointSites(graphClient, email!) + + //For each site, discover all drives + const siteDrives = await discoverSiteDrives(graphClient, sites, email!) + + // Step 3: Process each drive and collect delta tokens + const deltaLinks = await processSiteDrives( + graphClient, + siteDrives, + email!, + tracker, + ) + + // const driveTokens = {} + setTimeout(() => { + clearInterval(interval) + }, 8000) + + //Store sync jobs with delta tokens for each drive + await db.transaction(async (trx) => { + await trx + .update(connectors) + .set({ + status: ConnectorStatus.Connected, + }) + .where(eq(connectors.id, connector!.id)) + + // Create sync job with all drive delta tokens + await insertSyncJob(trx, { + workspaceId: connector!.workspaceId, + workspaceExternalId: connector!.workspaceExternalId, + app: Apps.MicrosoftSharepoint, + connectorId: connector!.id, + authType: AuthType.ServiceAccount, + config: { + deltaLinks, // Store all drive delta Links as a record + type: "microsoftSharepointDeltaTokens", + lastSyncedAt: new Date().toISOString(), + }, + email: email!, + type: SyncCron.ChangeToken, + status: SyncJobStatus.NotStarted, + }) + + loggerWithChild({ email: email! }).info( + `Microsoft SharePoint service account ingestion completed (jobId: ${jobId})`, + ) + closeWs(connector!.externalId) + }) + } catch (error) { + loggerWithChild({ email: email! }).error( + error, + `handleMicrosoftServiceAccountIngestion (jobId: ${jobId}) failed: ${(error as Error).message}`, + ) + + if (connector) { + await db + .update(connectors) + .set({ + status: ConnectorStatus.Failed, + }) + .where(eq(connectors.id, connector.id)) + closeWs(connector.externalId) + } + + throw new CouldNotFinishJobSuccessfully({ + message: `Could not finish Microsoft SharePoint service account ingestion (jobId: ${jobId})`, + integration: Apps.MicrosoftSharepoint, + entity: "sites and drives", + cause: error as Error, + }) + } +} + // Main Microsoft OAuth ingestion handler export const handleMicrosoftOAuthIngestion = async (data: SaaSOAuthJob) => { const logger = loggerWithChild({ email: data.email }) @@ -1034,289 +1075,3 @@ export const handleMicrosoftOAuthIngestion = async (data: SaaSOAuthJob) => { }) } } - -// Download directory setup -export const downloadDir = path.resolve(__dirname, "../../downloads") - -if (process.env.NODE_ENV !== "production") { - const init = () => { - if (!fs.existsSync(downloadDir)) { - fs.mkdirSync(downloadDir, { recursive: true }) - } - } - init() -} - -// Helper function to delete files -export const deleteDocument = async (filePath: string) => { - try { - await unlink(filePath) - Logger.debug(`File at ${filePath} deleted successfully`) - } catch (err) { - Logger.error( - err, - `Error deleting file at ${filePath}: ${err} ${(err as Error).stack}`, - err, - ) - throw new DeleteDocumentError({ - message: "Error in the catch of deleting file", - cause: err as Error, - integration: Apps.MicrosoftDrive, - entity: DriveEntity.PDF, - }) - } -} - -// Helper function for safer PDF loading -export async function safeLoadPDF(pdfPath: string): Promise { - try { - const loader = new PDFLoader(pdfPath) - return await loader.load() - } catch (error) { - const { name, message } = error as Error - if ( - message.includes("PasswordException") || - name.includes("PasswordException") - ) { - Logger.warn("Password protected PDF, skipping") - } else { - Logger.error(error, `PDF load error: ${error}`) - } - return [] - } -} - -// Process Microsoft PDF files (similar to googlePDFsVespa) -async function processMicrosoftPDFs( - graphClient: Client, - pdfFiles: DriveItem[], - userEmail: string, -): Promise { - const results: VespaFileWithDrivePermission[] = [] - - for (const file of pdfFiles) { - try { - // Download PDF content - const pdfBuffer = await downloadFileFromGraph(graphClient, file.id!) - - // Save temporarily (reuse Google's download directory pattern) - const pdfFileName = `${hashPdfFilename(`${userEmail}_${file.id}_${file.name}`)}.pdf` - const pdfPath = `${downloadDir}/${pdfFileName}` - - // Write buffer to file - await fs.promises.writeFile(pdfPath, new Uint8Array(pdfBuffer)) - - // Use existing PDF processing utilities - const docs = await safeLoadPDF(pdfPath) - if (!docs || docs.length === 0) { - await deleteDocument(pdfPath) - continue - } - - // Use existing chunking utilities - const chunks = docs.flatMap((doc: Document) => - chunkDocument(doc.pageContent), - ) - - // Cleanup - await deleteDocument(pdfPath) - - // Create Vespa document structure - results.push({ - title: file.name!, - url: file.webUrl ?? "", - app: Apps.MicrosoftDrive, - docId: file.id!, - parentId: file.parentReference?.id ?? null, - owner: "", // Extract from file.createdBy if available - photoLink: "", - ownerEmail: userEmail, - entity: DriveEntity.PDF, - chunks: chunks.map((c: any) => c.chunk), - permissions: [], // Process file.permissions if available - mimeType: file.file?.mimeType ?? "", - metadata: JSON.stringify({ - parentPath: file.parentReference?.path, - size: file.size, - }), - createdAt: new Date(file.createdDateTime!).getTime(), - updatedAt: new Date(file.lastModifiedDateTime!).getTime(), - }) - } catch (error) { - console.error(`Error processing PDF ${file.name}:`, error) - continue - } - } - - return results -} - -// Process Microsoft Word documents -async function processMicrosoftWord( - graphClient: Client, - wordFiles: DriveItem[], - userEmail: string, -): Promise { - const results: VespaFileWithDrivePermission[] = [] - - for (const file of wordFiles) { - try { - // Download DOCX content - const docxBuffer = await downloadFileFromGraph(graphClient, file.id!) - - // Use existing DOCX processing utilities from server/docxChunks.ts - const extractedContent = - await extractTextAndImagesWithChunksFromDocx(docxBuffer) - - results.push({ - title: file.name!, - url: file.webUrl ?? "", - app: Apps.MicrosoftDrive, - docId: file.id!, - parentId: file.parentReference?.id ?? null, - owner: "", - photoLink: "", - ownerEmail: userEmail, - entity: DriveEntity.Docs, // Reuse Google's entity types - chunks: extractedContent.text_chunks || [], // Use text_chunks property - permissions: [], - mimeType: file.file?.mimeType ?? "", - metadata: JSON.stringify({ - parentPath: file.parentReference?.path, - size: file.size, - images: extractedContent.image_chunks?.length || 0, // Use image_chunks property - }), - createdAt: new Date(file.createdDateTime!).getTime(), - updatedAt: new Date(file.lastModifiedDateTime!).getTime(), - }) - } catch (error) { - console.error(`Error processing Word document ${file.name}:`, error) - continue - } - } - - return results -} - -// Process Microsoft Excel files -async function processMicrosoftExcel( - graphClient: Client, - excelFiles: DriveItem[], - userEmail: string, -): Promise { - const results: VespaFileWithDrivePermission[] = [] - - for (const file of excelFiles) { - try { - // Use Microsoft Graph API to get workbook data - const workbook = await graphClient - .api(`/me/drive/items/${file.id}/workbook/worksheets`) - .get() - - const chunks: string[] = [] - - for (const worksheet of workbook.value) { - try { - // Get worksheet data - const worksheetData = await graphClient - .api( - `/me/drive/items/${file.id}/workbook/worksheets/${worksheet.id}/usedRange`, - ) - .get() - - if (worksheetData.values) { - // Process similar to Google Sheets - filter textual content - const textualContent = worksheetData.values - .flat() - .filter( - (cell: any) => - cell && typeof cell === "string" && isNaN(Number(cell)), - ) - .join(" ") - - if (textualContent.length > 0) { - const worksheetChunks = chunkDocument(textualContent) - chunks.push(...worksheetChunks.map((c: any) => c.chunk)) - } - } - } catch (worksheetError) { - console.error( - `Error processing worksheet ${worksheet.name}:`, - worksheetError, - ) - continue - } - } - - results.push({ - title: file.name!, - url: file.webUrl ?? "", - app: Apps.MicrosoftDrive, - docId: file.id!, - parentId: file.parentReference?.id ?? null, - owner: "", - photoLink: "", - ownerEmail: userEmail, - entity: DriveEntity.Sheets, - chunks, - permissions: [], - mimeType: file.file?.mimeType ?? "", - metadata: JSON.stringify({ - parentPath: file.parentReference?.path, - size: file.size, - worksheetCount: workbook.value?.length || 0, - }), - createdAt: new Date(file.createdDateTime!).getTime(), - updatedAt: new Date(file.lastModifiedDateTime!).getTime(), - }) - } catch (error) { - console.error(`Error processing Excel file ${file.name}:`, error) - continue - } - } - - return results -} - -async function processFileContent( - graphClient: MicrosoftGraphClient, - file: DriveItem, - userEmail: string, -): Promise { - const mimeType = file.file?.mimeType - - try { - switch (mimeType) { - case "application/pdf": - const pdfResults = await processMicrosoftPDFs( - graphClient.client, - [file], - userEmail, - ) - return pdfResults[0]?.chunks || [] - - case "application/vnd.openxmlformats-officedocument.wordprocessingml.document": - const wordResults = await processMicrosoftWord( - graphClient.client, - [file], - userEmail, - ) - return wordResults[0]?.chunks || [] - - case "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": - const excelResults = await processMicrosoftExcel( - graphClient.client, - [file], - userEmail, - ) - return excelResults[0]?.chunks || [] - - default: - // For unsupported file types, return empty chunks (metadata only) - return [] - } - } catch (error) { - console.error(`Error processing file content for ${file.name}:`, error) - return [] // Fallback to metadata only on error - } -} diff --git a/server/integrations/microsoft/sharepoint/index.ts b/server/integrations/microsoft/sharepoint/index.ts new file mode 100644 index 000000000..ace27f9d7 --- /dev/null +++ b/server/integrations/microsoft/sharepoint/index.ts @@ -0,0 +1,251 @@ +import { Tracker, StatType } from "@/integrations/tracker" +import { insertWithRetry } from "@/search/vespa" +import { DriveEntity, fileSchema } from "@xyne/vespa-ts" +import { Apps } from "@/shared/types" +import { makeGraphApiCall, type MicrosoftGraphClient } from "../client" +import { getEntityFromMimeType,loggerWithChild, getFilePermissionsSharepoint, processFileContent } from "../utils" +import type { Drive, DriveItem, Site } from "@microsoft/microsoft-graph-types" +import type { drive_v3 } from "googleapis" + +// Function to discover all SharePoint sites +export const discoverSharePointSites = async ( + client: MicrosoftGraphClient, + userEmail: string, +): Promise> => { + try { + loggerWithChild({ email: userEmail }).info( + "Discovering SharePoint sites...", + ) + + const sites: Array = [] + + // Get all sites the service account has access to + let nextLink: string | undefined = + "/sites?$select=id,name,webUrl,displayName" + + while (nextLink) { + const response = await makeGraphApiCall(client, nextLink) + + if (response.value && Array.isArray(response.value)) { + for (const site of response.value) { + sites.push(site as Site) + } + } + + nextLink = response["@odata.nextLink"] + } + + return sites + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Failed to discover SharePoint sites: ${error}`, + ) + throw error + } +} + +// Function to discover all drives for each site +export const discoverSiteDrives = async ( + client: MicrosoftGraphClient, + sites: Array, + userEmail: string, +): Promise> => { + try { + loggerWithChild({ email: userEmail }).info( + "Discovering drives for each site...", + ) + + const siteDrives: Array = [] + + for (const site of sites) { + try { + loggerWithChild({ email: userEmail }).info( + `Discovering drives for site: ${site.name} (${site.id})`, + ) + + // Get all drives for this site + const response = await makeGraphApiCall( + client, + `/sites/${site.id}/drives?$select=id,name,driveType,webUrl,sharepointIds`, + ) + + if (response.value && Array.isArray(response.value)) { + for (const drive of response.value) { + try { + siteDrives.push(drive as Drive) + + loggerWithChild({ email: userEmail }).info( + `Found Drive: ${drive.name} (${drive.id}) from site ${site.name}`, + ) + } catch (error) { + loggerWithChild({ email: userEmail }).warn( + `Drive ${drive.name} in site ${site.name} does not support delta sync, skipping: ${error}`, + ) + } + } + } + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Failed to get drives for site ${site.name}: ${error}`, + ) + // Continue with other sites even if one fails + } + } + + loggerWithChild({ email: userEmail }).info( + `Discovered ${siteDrives.length} drives across ${sites.length} sites`, + ) + + return siteDrives + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Failed to discover site drives: ${error}`, + ) + throw error + } +} + +// Function to process each drive and collect delta tokens +export const processSiteDrives = async ( + client: MicrosoftGraphClient, + siteDrives: Array, + userEmail: string, + tracker?: Tracker, +): Promise> => { + try { + loggerWithChild({ email: userEmail }).info( + `Processing ${siteDrives.length} drives for initial sync and delta token collection`, + ) + + const deltaLinks: Record = {} + + let totalFiles = 0 + + for (const siteDrive of siteDrives) { + try { + if (!siteDrive.sharePointIds?.siteId || !siteDrive.id) { + loggerWithChild({ email: userEmail }).warn( + `Skipping drive ${siteDrive.name} - missing sharePointIds or id`, + ) + continue + } + loggerWithChild({ email: userEmail }).info( + `Processing drive: ${siteDrive.name} from site: ${siteDrive.name}`, + ) + + let deltaLink = "" + let driveFileCount = 0 + + // Use delta API for initial sync to get all files and the delta token + let nextLink: string | undefined = + `/sites/${siteDrive.sharePointIds?.siteId}/drives/${siteDrive.id}/root/delta?$select=id,name,size,createdDateTime,lastModifiedDateTime,webUrl,file,folder,parentReference,createdBy,lastModifiedBy,@microsoft.graph.downloadUrl,deleted` + + while (nextLink) { + const response = await makeGraphApiCall(client, nextLink) + + if (response.value && Array.isArray(response.value)) { + for (const item of response.value) { + try { + let permissions: string[] = [] + if (siteDrive.id) { + permissions = await getFilePermissionsSharepoint( + client, + item.id, + siteDrive.id, + ) + } + + const fileToBeIngested = { + title: item.name ?? "", + url: item.webUrl ?? "", + app: Apps.MicrosoftSharepoint, + docId: item.id, + parentId: item.parentReference?.id ?? null, + owner: item.createdBy?.user?.displayName ?? userEmail, + photoLink: "", + ownerEmail: userEmail, + entity: getEntityFromMimeType(item.file?.mimeType), + chunks: await processFileContent(client, item, userEmail), + permissions, + mimeType: item.file?.mimeType ?? "application/octet-stream", + metadata: JSON.stringify({ + size: item.size, + downloadUrl: item["@microsoft.graph.downloadUrl"], + siteId: siteDrive.sharePointIds?.siteId, + driveId: siteDrive.id, + driveName: siteDrive.name, + driveType: siteDrive.driveType, + parentId: item.parentReference?.id ?? "", + parentPath: item.parentReference?.path ?? "/", + eTag: item.eTag ?? "", + }), + createdAt: new Date(item.createdDateTime).getTime(), + updatedAt: new Date(item.lastModifiedDateTime).getTime(), + } + + await insertWithRetry(fileToBeIngested, fileSchema) + tracker?.updateUserStats(userEmail, StatType.Drive, 1) + driveFileCount++ + totalFiles++ + + if (driveFileCount % 100 === 0) { + loggerWithChild({ email: userEmail }).info( + `Processed ${driveFileCount} files from drive: ${siteDrive.name}`, + ) + } + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Error processing file ${item.id} from drive ${siteDrive.name}: ${(error as Error).message}`, + ) + } + } + } + + // Check for pagination and delta token + if (response["@odata.nextLink"]) { + nextLink = response["@odata.nextLink"] + } else { + // Final response should contain delta token + deltaLink = response["@odata.deltaLink"] || "" + nextLink = undefined + } + } + + // Store the delta token for this drive + if (deltaLink && siteDrive.id) { + deltaLinks[`${siteDrive.sharePointIds?.siteId}::${siteDrive.id}`] = + deltaLink + + loggerWithChild({ email: userEmail }).info( + `Stored delta token for drive ${siteDrive.name} (${siteDrive.id}): processed ${driveFileCount} files`, + ) + } else { + loggerWithChild({ email: userEmail }).warn( + `No delta token received for drive ${siteDrive.name} (${siteDrive.id})`, + ) + } + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Error processing drive ${siteDrive.name} from site ${siteDrive.sharePointIds?.siteId}: ${(error as Error).message}`, + ) + } + } + + loggerWithChild({ email: userEmail }).info( + `Completed processing ${siteDrives.length} drives. Total files processed: ${totalFiles}. Delta tokens collected for ${Object.keys(deltaLinks).length} drives.`, + ) + + return deltaLinks + } catch (error) { + loggerWithChild({ email: userEmail }).error( + error, + `Failed to process site drives: ${error}`, + ) + throw error + } +} diff --git a/server/integrations/microsoft/sync.ts b/server/integrations/microsoft/sync.ts index 5cfb4ebc7..7579be17d 100644 --- a/server/integrations/microsoft/sync.ts +++ b/server/integrations/microsoft/sync.ts @@ -1,11 +1,15 @@ import { Subsystem, SyncCron, + type MicrosoftServiceCredentials, type OAuthCredentials, type SyncConfig, } from "@/types" import PgBoss from "pg-boss" -import { getOAuthConnectorWithCredentials } from "@/db/connector" +import { + getMicrosoftAuthConnectorWithCredentials, + getOAuthConnectorWithCredentials, +} from "@/db/connector" import { DeleteDocument, getDocumentOrNull, @@ -51,12 +55,14 @@ import { getEventStartTime, getJoiningLink, insertContact, - loggerWithChild, } from "./index" import { MAX_ONEDRIVE_FILE_SIZE, skipMailExistCheck } from "./config" import { + getFilePermissionsSharepoint, MicrosoftMimeType, microsoftMimeTypeMap, + processFileContent, + loggerWithChild, type OneDriveFile, } from "./utils" import { chunkDocument } from "@/chunks" @@ -74,6 +80,11 @@ import { getDocumentOrSpreadsheet, } from "../google/sync" import { DriveMime } from "../google/utils" +import { + discoverSharePointSites, + discoverSiteDrives, + processSiteDrives, +} from "./sharepoint" const Logger = getLogger(Subsystem.Integrations).child({ module: "microsoft-sync", @@ -85,7 +96,9 @@ const validateMicrosoftCredentials = () => { const clientSecret = process.env.MICROSOFT_CLIENT_SECRET if (!clientId || !clientSecret) { - Logger.warn("Microsoft integration disabled: MICROSOFT_CLIENT_ID and/or MICROSOFT_CLIENT_SECRET environment variables not set") + Logger.warn( + "Microsoft integration disabled: MICROSOFT_CLIENT_ID and/or MICROSOFT_CLIENT_SECRET environment variables not set", + ) return { clientId: null, clientSecret: null } } @@ -116,6 +129,12 @@ type MicrosoftCalendarChangeToken = { lastSyncedAt: Date } +type MicrosoftSharepointChangeToken = { + type: "microsoftSharepointDeltaTokens" + deltaLinks: Record + lastSyncedAt: Date +} + // TODO: change summary to json // and store all the structured details type ChangeStats = { @@ -162,11 +181,7 @@ export const getOneDriveDelta = async ( let nextDeltaTokenUrl: string | null = null while (endpoint) { - const response = await retryWithBackoff( - () => makeGraphApiCall(graphClient, endpoint), - `Fetching OneDrive delta changes`, - Apps.MicrosoftDrive, - ) + const response = await makeGraphApiCall(graphClient, endpoint) if (Array.isArray(response.value)) { changes.push(...response.value) } @@ -281,10 +296,7 @@ const extractOneDriveFileContent = async ( if (mimeType === MicrosoftMimeType.WordDocumentModern) { try { Logger.info(`Processing DOCX file: ${file.name}`) - const fileBuffer = await downloadFileFromGraph( - graphClient.client, - file.id, - ) + const fileBuffer = await downloadFileFromGraph(graphClient, file.id) const docxResult = await extractTextAndImagesWithChunksFromDocx( new Uint8Array(fileBuffer), file.id, @@ -301,10 +313,7 @@ const extractOneDriveFileContent = async ( if (mimeType === MicrosoftMimeType.ExcelSpreadsheetModern) { try { Logger.info(`Processing XLSX file: ${file.name}`) - const fileBuffer = await downloadFileFromGraph( - graphClient.client, - file.id, - ) + const fileBuffer = await downloadFileFromGraph(graphClient, file.id) const sheetsData = await processSpreadsheetFileWithSheetInfo( fileBuffer, file.name, @@ -370,7 +379,7 @@ const extractPDFContent = async ( try { // Download the PDF file - const fileBuffer = await downloadFileFromGraph(graphClient.client, file.id) + const fileBuffer = await downloadFileFromGraph(graphClient, file.id) await fs.writeFile(tempFilePath, new Uint8Array(fileBuffer)) // Extract text using PDFLoader @@ -1138,6 +1147,298 @@ const syncMicrosoftContacts = async ( return stats } +const updateSharepointDeltaLinks = async ( + graphClient: MicrosoftGraphClient, + deltaLinks: Record, + email: string, +): Promise<{ + deletedDrives: string[] + newDeltaLinks: Record +}> => { + let sites = await discoverSharePointSites(graphClient, email) + let drives = await discoverSiteDrives(graphClient, sites, email) + + const driveSet = new Set( + drives.map((drive) => `${drive.sharePointIds?.siteId}::${drive.id}`), + ) + + let deletedDrives: string[] = [] + + //Filters out newly added drives + drives = drives.filter( + (drive) => + drive.id && + !(`${drive.sharePointIds?.siteId}::${drive.id}` in deltaLinks), + ) + + //Filters out deleted drives + for (const key in deltaLinks) { + if (!driveSet.has(key)) { + deletedDrives.push(key) + delete deltaLinks[key] + } + } + + //TODO: remove files from deleted drives + // await deleteDrives(graphClient, deletedDrives, email) + + //perform initial ingestion of new drives + const newDeltaLinks = await processSiteDrives(graphClient, drives, email) + + for (const [key, value] of Object.entries(newDeltaLinks)) + deltaLinks[key] = value + + return { deletedDrives, newDeltaLinks } +} + +const handleSharepointChanges = async ( + graphClient: MicrosoftGraphClient, + deltaLinks: Record, + email: string, +): Promise<{ + stats: ChangeStats + changesExist: boolean +}> => { + const stats = newStats() + let changesExist = false + + // For tracking newly added or removed drives + const { deletedDrives, newDeltaLinks } = await updateSharepointDeltaLinks( + graphClient, + deltaLinks, + email, + ) + + if (deletedDrives.length > 0) { + stats.summary += `Removed ${deletedDrives.length} SharePoint drives: ${deletedDrives.join(", ")}\n` + changesExist = true + } + + if (Object.keys(newDeltaLinks).length > 0) { + stats.summary += `Discovered ${Object.keys(newDeltaLinks).length} new SharePoint drives\n` + changesExist = true + } + + // Process delta changes for each existing drive + for (const [driveKey, deltaToken] of Object.entries(deltaLinks)) { + try { + // Skip newly added drives + if (driveKey in newDeltaLinks) { + continue + } + + loggerWithChild({ email }).info( + `Processing delta changes for drive: ${driveKey}`, + ) + + // Extract siteId and driveId from the composite key + const [siteId, driveId] = driveKey.split("::") + if (!siteId || !driveId) { + Logger.warn(`Invalid drive key format: ${driveKey}`) + continue + } + + const { driveStats, newDeltaLink } = + await processSharePointDriveDeltaChanges( + graphClient, + siteId, + driveId, + deltaToken, + email, + ) + if (newDeltaLink !== "") { + //update with new Link + deltaLinks[driveKey] = newDeltaLink + } + + // Merge drive stats into overall stats + stats.added += driveStats.added + stats.updated += driveStats.updated + stats.removed += driveStats.removed + stats.summary += driveStats.summary + + if ( + driveStats.added > 0 || + driveStats.updated > 0 || + driveStats.removed > 0 + ) { + changesExist = true + } + + loggerWithChild({ email }).info( + `Processed drive ${driveKey}: ${driveStats.added} added, ${driveStats.updated} updated, ${driveStats.removed} removed`, + ) + } catch (error) { + Logger.error( + error, + `Error processing SharePoint drive ${driveKey}: ${error}`, + ) + stats.summary += `Error processing drive ${driveKey}: ${getErrorMessage(error)}\n` + } + } + + return { stats, changesExist } +} + +// Process delta changes for a specific SharePoint drive +const processSharePointDriveDeltaChanges = async ( + graphClient: MicrosoftGraphClient, + siteId: string, + driveId: string, + deltaLink: string, + email: string, +): Promise<{ driveStats: ChangeStats; newDeltaLink: string }> => { + const stats = newStats() + let newDeltaLink: string = "" + + try { + // Use delta token to get changes for this specific drive + let nextLink: string | undefined = deltaLink + + loggerWithChild({ email }).info( + `Fetching delta changes from: ${nextLink.substring(0, 100)}...`, + ) + + while (nextLink) { + const response = await makeGraphApiCall(graphClient, nextLink) + + if (response.value && Array.isArray(response.value)) { + for (const item of response.value) { + try { + if (item.deleted || item["@removed"]) { + // Handle deleted files + await handleSharePointFileDelete(item.id, email, stats) + } else if (item.file) { + // Handle added/updated files (skip folders) + await handleSharePointFileChange( + graphClient, + item, + siteId, + driveId, + email, + stats, + ) + } + } catch (itemError) { + Logger.error( + itemError, + `Error processing SharePoint item ${item.id}: ${itemError}`, + ) + } + } + } + + // Check for pagination + if (response["@odata.nextLink"]) { + nextLink = response["@odata.nextLink"] + } else if (response["@odata.deltaLink"]) { + newDeltaLink = response["@odata.deltaLink"] + nextLink = undefined + } else { + nextLink = undefined + } + } + } catch (error) { + Logger.error( + error, + `Error processing delta changes for drive ${driveId} in site ${siteId}`, + ) + throw error + } + + return { + driveStats: stats, + newDeltaLink, + } +} + +// Handle SharePoint file deletion +const handleSharePointFileDelete = async ( + fileId: string, + email: string, + stats: ChangeStats, +): Promise => { + try { + const existingDoc = await getDocumentOrNull(fileSchema, fileId) + if (existingDoc) { + // "deleted or @removed" in delta implies the file was deleted, not just permission changes. + // Safe to delete from Vespa regardless of ACLs or user-specific permissions. + await DeleteDocument(fileId, fileSchema) + stats.removed += 1 + stats.summary += `Deleted SharePoint file ${fileId}\n` + } else { + throw new Error("File not found in vespa") + } + } catch (error) { + Logger.error(error, `Error deleting SharePoint file ${fileId}: ${error}`) + } +} + +// Handle SharePoint file addition/update +const handleSharePointFileChange = async ( + graphClient: MicrosoftGraphClient, + item: any, + siteId: string, + driveId: string, + email: string, + stats: ChangeStats, +): Promise => { + try { + const fileId = item.id + const existingDoc = await getDocumentOrNull(fileSchema, fileId) + + // Get file permissions + const permissions: string[] = await getFilePermissionsSharepoint( + graphClient, + fileId, + driveId, + ) + + // Process file content + const chunks = await processFileContent(graphClient, item, email) + + // Create Vespa file object + const vespaFile = { + title: item.name ?? "", + url: item.webUrl ?? "", + app: Apps.MicrosoftSharepoint, + docId: fileId, + parentId: item.parentReference?.id ?? null, + owner: item.createdBy?.user?.displayName ?? email, + photoLink: "", + ownerEmail: email, + entity: DriveEntity.Misc, + chunks, + permissions, + mimeType: item.file?.mimeType ?? "application/octet-stream", + metadata: JSON.stringify({ + size: item.size, + downloadUrl: item["@microsoft.graph.downloadUrl"], + siteId: siteId, + driveId: driveId, + parentId: item.parentReference?.id ?? "", + parentPath: item.parentReference?.path ?? "/", + eTag: item.eTag ?? "", + }), + createdAt: new Date(item.createdDateTime).getTime(), + updatedAt: new Date(item.lastModifiedDateTime).getTime(), + } + + // Insert into Vespa + await insertWithRetry(vespaFile, fileSchema) + + if (existingDoc) { + stats.updated += 1 + stats.summary += `Updated SharePoint file ${fileId}\n` + } else { + stats.added += 1 + stats.summary += `Added SharePoint file ${fileId}\n` + } + } catch (error) { + Logger.error(error, `Error processing SharePoint file ${item.id}: ${error}`) + } +} + // Main Microsoft OAuth changes handler export const handleMicrosoftOAuthChanges = async ( boss: PgBoss, @@ -1145,7 +1446,9 @@ export const handleMicrosoftOAuthChanges = async ( ) => { // Skip if Microsoft credentials are not configured if (!MICROSOFT_CLIENT_ID || !MICROSOFT_CLIENT_SECRET) { - Logger.warn("Skipping Microsoft sync job - Microsoft integration not configured") + Logger.warn( + "Skipping Microsoft sync job - Microsoft integration not configured", + ) return } @@ -1170,9 +1473,9 @@ export const handleMicrosoftOAuthChanges = async ( // Create Microsoft Graph client const graphClient = createMicrosoftGraphClient( oauthTokens.access_token, - oauthTokens.refresh_token, MICROSOFT_CLIENT_ID, MICROSOFT_CLIENT_SECRET, + oauthTokens.refresh_token, ) let config: MicrosoftDriveChangeToken = @@ -1313,9 +1616,9 @@ export const handleMicrosoftOAuthChanges = async ( const graphClient = createMicrosoftGraphClient( oauthTokens.access_token, - oauthTokens.refresh_token, MICROSOFT_CLIENT_ID, MICROSOFT_CLIENT_SECRET, + oauthTokens.refresh_token, ) let { @@ -1419,9 +1722,9 @@ export const handleMicrosoftOAuthChanges = async ( const graphClient = createMicrosoftGraphClient( oauthTokens.access_token, - oauthTokens.refresh_token, MICROSOFT_CLIENT_ID, MICROSOFT_CLIENT_SECRET, + oauthTokens.refresh_token, ) let { @@ -1505,3 +1808,105 @@ export const handleMicrosoftOAuthChanges = async ( } } } + +export async function handleMicrosoftServiceAccountChanges() { + Logger.info("handleMicrosoftServiceAccountChanges") + const syncJobs = await getAppSyncJobs( + db, + Apps.MicrosoftSharepoint, + AuthType.ServiceAccount, + ) + + for (const syncJob of syncJobs) { + let stats = newStats() + try { + const connector = await getMicrosoftAuthConnectorWithCredentials( + db, + syncJob.connectorId, + ) + + const authTokens = JSON.parse( + connector.credentials as string, + ) as MicrosoftServiceCredentials + + const graphClient = createMicrosoftGraphClient( + authTokens.access_token, + authTokens.clientId, + authTokens.clientSecret, + undefined, + authTokens.tenantId, + ) + + let config: MicrosoftSharepointChangeToken = + syncJob.config as MicrosoftSharepointChangeToken + + //handles delta changes and updates deltaLinks in-place + const { stats: changeStats, changesExist } = + await handleSharepointChanges( + graphClient, + config.deltaLinks, + syncJob.email, + ) + + if (changesExist) { + const newConfig: MicrosoftSharepointChangeToken = { + type: "microsoftSharepointDeltaTokens", + deltaLinks: config.deltaLinks, + lastSyncedAt: new Date(), + } + + // Update sync job and create sync history + await db.transaction(async (trx) => { + await updateSyncJob(trx, syncJob.id, { + config: newConfig, + lastRanOn: new Date(), + status: SyncJobStatus.Successful, + }) + + await insertSyncHistory(trx, { + workspaceId: syncJob.workspaceId, + workspaceExternalId: syncJob.workspaceExternalId, + dataAdded: changeStats.added, + dataDeleted: changeStats.removed, + dataUpdated: changeStats.updated, + authType: AuthType.ServiceAccount, + summary: { description: changeStats.summary }, + errorMessage: "", + app: Apps.MicrosoftSharepoint, + status: SyncJobStatus.Successful, + config: { + ...newConfig, + lastSyncedAt: newConfig.lastSyncedAt.toISOString(), + }, + type: SyncCron.ChangeToken, + lastRanOn: new Date(), + }) + }) + + Logger.info(`SharePoint changes synced: ${JSON.stringify(changeStats)}`) + } else { + Logger.info("No SharePoint changes to sync") + } + } catch (error) { + const errorMessage = getErrorMessage(error) + Logger.error(error, `SharePoint sync failed: ${errorMessage}`) + + // Insert failed sync history + await insertSyncHistory(db, { + workspaceId: syncJob.workspaceId, + workspaceExternalId: syncJob.workspaceExternalId, + dataAdded: stats.added, + dataDeleted: stats.removed, + dataUpdated: stats.updated, + authType: AuthType.ServiceAccount, + summary: { description: stats.summary }, + errorMessage, + app: Apps.MicrosoftSharepoint, + status: SyncJobStatus.Failed, + config: syncJob.config, + type: SyncCron.ChangeToken, + lastRanOn: new Date(), + }) + } + } +} diff --git a/server/integrations/microsoft/utils.ts b/server/integrations/microsoft/utils.ts index 1188e4f27..a0560faf7 100644 --- a/server/integrations/microsoft/utils.ts +++ b/server/integrations/microsoft/utils.ts @@ -1,13 +1,76 @@ -import { getLogger } from "@/logger" +import { getLogger, getLoggerWithChild } from "@/logger" import { Subsystem } from "@/types" import type { MicrosoftGraphClient } from "./client" -import { makeGraphApiCall } from "./client" +import { downloadFileFromGraph, makeGraphApiCall } from "./client" import { Apps, DriveEntity } from "@/shared/types" import { chunkDocument } from "@/chunks" import { MAX_ONEDRIVE_FILE_SIZE } from "./config" +import type { + AuthenticationProvider, + Client, +} from "@microsoft/microsoft-graph-client" +import { ClientSecretCredential, type AccessToken } from "@azure/identity" +import type { Permission, DriveItem } from "@microsoft/microsoft-graph-types" +import { extractTextAndImagesWithChunksFromDocx } from "@/docxChunks" +import { hashPdfFilename } from "@/utils" +import type { VespaFileWithDrivePermission } from "@xyne/vespa-ts/types" +import fs from "node:fs" +import path from "node:path" +import { unlink } from "node:fs/promises" +import type { Document } from "@langchain/core/documents" +import { PDFLoader } from "@langchain/community/document_loaders/fs/pdf" + +export const loggerWithChild = getLoggerWithChild(Subsystem.Integrations, { + module: "microsoft", +}) const Logger = getLogger(Subsystem.Integrations).child({ module: "microsoft" }) +// Download directory setup +export const downloadDir = path.resolve(__dirname, "../../downloads") + +if (process.env.NODE_ENV !== "production") { + const init = () => { + if (!fs.existsSync(downloadDir)) { + fs.mkdirSync(downloadDir, { recursive: true }) + } + } + init() +} + +// Helper function to delete files +export const deleteDocument = async (filePath: string) => { + try { + await unlink(filePath) + Logger.debug(`File at ${filePath} deleted successfully`) + } catch (err) { + Logger.error( + err, + `Error deleting file at ${filePath}: ${err} ${(err as Error).stack}`, + err, + ) + throw new Error(`Error deleting file: ${(err as Error).message}`) + } +} + +// Helper function for safer PDF loading +export async function safeLoadPDF(pdfPath: string): Promise { + try { + const loader = new PDFLoader(pdfPath) + return await loader.load() + } catch (error) { + const { name, message } = error as Error + if ( + message.includes("PasswordException") || + name.includes("PasswordException") + ) { + Logger.warn("Password protected PDF, skipping") + } else { + Logger.error(error, `PDF load error: ${error}`) + } + return [] + } +} export enum MicrosoftMimeType { WordDocumentModern = "application/vnd.openxmlformats-officedocument.wordprocessingml.document", ExcelSpreadsheetModern = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", @@ -36,6 +99,39 @@ export const MimeMapForContent: Record = { [MicrosoftMimeType.PowerPointPresentationLegacy]: true, } +// Custom authentication provider for Microsoft Graph using ClientSecretCredential +export class CustomServiceAuthProvider implements AuthenticationProvider { + private credential: ClientSecretCredential + + constructor(tenantId: string, clientId: string, clientSecret: string) { + this.credential = new ClientSecretCredential( + tenantId, + clientId, + clientSecret, + ) + } + + async getAccessToken(): Promise { + const tokenResponse = await this.credential.getToken( + "https://graph.microsoft.com/.default", + ) + if (!tokenResponse) { + throw new Error("Failed to get access token") + } + return tokenResponse.token + } + + async getAccessTokenWithExpiry(): Promise { + const tokenResponse = await this.credential.getToken( + "https://graph.microsoft.com/.default", + ) + if (!tokenResponse) { + throw new Error("Failed to get access token") + } + return tokenResponse + } +} + // Get OneDrive file metadata export const getOneDriveFile = async ( graphClient: MicrosoftGraphClient, @@ -88,7 +184,6 @@ export const getOneDriveFileContent = async ( return [] } } - // Convert OneDrive permissions to email list export const toOneDrivePermissionsList = ( permissions: any[] | undefined, @@ -183,6 +278,374 @@ export const shouldProcessChange = (change: any): boolean => { return true } +export async function getFilePermissionsSharepoint( + client: MicrosoftGraphClient, + fileId: string, + driveId: string, +): Promise { + const endpoint = `drives/${driveId}/items/${fileId}/permissions` + + try { + const response = await makeGraphApiCall(client, endpoint) + const permissions: string[] = [] + + if (response.value && Array.isArray(response.value)) { + const permissionsList = response.value as Permission[] + for (const permission of permissionsList) { + // Check for individual user permissions + if (permission.grantedToV2?.siteUser?.loginName) { + const loginName = permission.grantedToV2.siteUser.loginName + // Extract email from loginName format: "i:0#.f|membership|email@domain.com" + const emailMatch = loginName.match(/\|([^|]+@[^|]+)$/) + if (emailMatch && emailMatch[1]) { + permissions.push(emailMatch[1]) + } + } + + // For site groups, we could add the group name as a permission identifier + // if (permission.grantedToV2?.siteGroup?.displayName) { + // permissions.push(`group:${permission.grantedToV2.siteGroup.displayName}`) + // } + } + } + + // Remove duplicates and return + return Array.from(new Set(permissions)) + } catch (error) { + loggerWithChild({ email: "system" }).error( + error, + `Error fetching SharePoint file permissions for ${fileId}: ${(error as Error).message}`, + ) + return [] + } +} + +//Get permissions for a file in one-drive or sharepoint +export async function getFilePermissions( + client: MicrosoftGraphClient, + fileId: string, +): Promise { + try { + const endpoint = `me/drive/items/${fileId}/permissions` + + const response = await makeGraphApiCall(client, endpoint) + + const emails = new Set() + + if (response.value && Array.isArray(response.value)) { + for (const permission of response.value) { + // Skip link-only permissions (no user identities) + if ( + permission.link && + !permission.grantedToV2 && + !permission.grantedToIdentitiesV2 + ) { + continue + } + + // grantedToV2 (modern single user) + if (permission.grantedToV2?.siteUser?.email) { + emails.add(permission.grantedToV2.siteUser.email) + } else if (permission.grantedToV2?.user?.email) { + emails.add(permission.grantedToV2.user.email) + } else if (permission.grantedToV2?.user?.userPrincipalName) { + emails.add(permission.grantedToV2.user.userPrincipalName) + } + + // grantedToIdentitiesV2 (modern multiple users) + if (Array.isArray(permission.grantedToIdentitiesV2)) { + for (const identity of permission.grantedToIdentitiesV2) { + if (identity.siteUser?.email) { + emails.add(identity.siteUser.email) + } else if (identity.user?.email) { + emails.add(identity.user.email) + } else if (identity.user?.userPrincipalName) { + emails.add(identity.user.userPrincipalName) + } + } + } + + // grantedTo (legacy single user) + if (permission.grantedTo?.user?.email) { + emails.add(permission.grantedTo.user.email) + } else if (permission.grantedTo?.user?.userPrincipalName) { + emails.add(permission.grantedTo.user.userPrincipalName) + } + + // grantedToIdentities (legacy multiple users) + if (Array.isArray(permission.grantedToIdentities)) { + for (const identity of permission.grantedToIdentities) { + if (identity.user?.email) { + emails.add(identity.user.email) + } else if (identity.user?.userPrincipalName) { + emails.add(identity.user.userPrincipalName) + } + } + } + } + } + + return [...emails] // convert Set to array + } catch (error) { + Logger.warn( + `Failed to get permissions for file ${fileId}: ${(error as Error).message}`, + ) + return [] + } +} + +// Process Microsoft PDF files (similar to googlePDFsVespa) +async function processMicrosoftPDFs( + graphClient: MicrosoftGraphClient, + pdfFiles: DriveItem[], + userEmail: string, +): Promise { + const results: VespaFileWithDrivePermission[] = [] + + for (const file of pdfFiles) { + try { + // Download PDF content + const pdfBuffer = await downloadFileFromGraph( + graphClient, + file.id!, + file.parentReference?.driveId!, + ) + + // Save temporarily (reuse Google's download directory pattern) + const pdfFileName = `${hashPdfFilename(`${userEmail}_${file.id}_${file.name}`)}.pdf` + const pdfPath = `${downloadDir}/${pdfFileName}` + + // Write buffer to file + await fs.promises.writeFile(pdfPath, new Uint8Array(pdfBuffer)) + + // Use existing PDF processing utilities + const docs = await safeLoadPDF(pdfPath) + if (!docs || docs.length === 0) { + await deleteDocument(pdfPath) + continue + } + + // Use existing chunking utilities + const chunks = docs.flatMap((doc: Document) => + chunkDocument(doc.pageContent), + ) + + // Cleanup + await deleteDocument(pdfPath) + + // Create Vespa document structure + results.push({ + title: file.name!, + url: file.webUrl ?? "", + app: Apps.MicrosoftDrive, + docId: file.id!, + parentId: file.parentReference?.id ?? null, + owner: "", // Extract from file.createdBy if available + photoLink: "", + ownerEmail: userEmail, + entity: DriveEntity.PDF, + chunks: chunks.map((c: any) => c.chunk), + permissions: [], // Process file.permissions if available + mimeType: file.file?.mimeType ?? "", + metadata: JSON.stringify({ + parentPath: file.parentReference?.path, + size: file.size, + }), + createdAt: new Date(file.createdDateTime!).getTime(), + updatedAt: new Date(file.lastModifiedDateTime!).getTime(), + }) + } catch (error) { + console.error(`Error processing PDF ${file.name}:`, error) + continue + } + } + + return results +} + +// Process Microsoft Word documents +async function processMicrosoftWord( + graphClient: MicrosoftGraphClient, + wordFiles: DriveItem[], + userEmail: string, +): Promise { + const results: VespaFileWithDrivePermission[] = [] + + for (const file of wordFiles) { + try { + // Download DOCX content + const docxBuffer = await downloadFileFromGraph( + graphClient, + file.id!, + file.parentReference?.driveId!, + ) + + // Use existing DOCX processing utilities from server/docxChunks.ts + const extractedContent = + await extractTextAndImagesWithChunksFromDocx(docxBuffer) + + results.push({ + title: file.name!, + url: file.webUrl ?? "", + app: graphClient.refreshToken + ? Apps.MicrosoftDrive + : Apps.MicrosoftSharepoint, + docId: file.id!, + parentId: file.parentReference?.id ?? null, + owner: "", + photoLink: "", + ownerEmail: userEmail, + entity: DriveEntity.WordDocument, // Reuse Google's entity types + chunks: extractedContent.text_chunks || [], // Use text_chunks property + permissions: [], + mimeType: file.file?.mimeType ?? "", + metadata: JSON.stringify({ + parentPath: file.parentReference?.path, + size: file.size, + images: extractedContent.image_chunks?.length || 0, // Use image_chunks property + }), + createdAt: new Date(file.createdDateTime!).getTime(), + updatedAt: new Date(file.lastModifiedDateTime!).getTime(), + }) + } catch (error) { + console.error(`Error processing Word document ${file.name}:`, error) + continue + } + } + + return results +} + +// Process Microsoft Excel files +// TODO: failing for huge excel files +async function processMicrosoftExcel( + graphClient: Client, + excelFiles: DriveItem[], + userEmail: string, +): Promise { + const results: VespaFileWithDrivePermission[] = [] + + for (const file of excelFiles) { + try { + const base = file.parentReference?.driveId + ? `/drives/${file.parentReference.driveId}/items` + : `/me/drive/items` + + // Use Microsoft Graph API to get workbook data + const workbook = await graphClient + .api(`${base}/${file.id}/workbook/worksheets`) + .get() + + const chunks: string[] = [] + + for (const worksheet of workbook.value) { + try { + // Get worksheet data + const worksheetData = await graphClient + .api( + `${base}/${file.id}/workbook/worksheets/${worksheet.id}/usedRange`, + ) + .get() + + if (worksheetData.values) { + // Process similar to Google Sheets - filter textual content + const textualContent = worksheetData.values + .flat() + .filter( + (cell: any) => + cell && typeof cell === "string" && isNaN(Number(cell)), + ) + .join(" ") + + if (textualContent.length > 0) { + const worksheetChunks = chunkDocument(textualContent) + chunks.push(...worksheetChunks.map((c: any) => c.chunk)) + } + } + } catch (worksheetError) { + console.error( + `Error processing worksheet ${worksheet.name}:`, + worksheetError, + ) + continue + } + } + + results.push({ + title: file.name!, + url: file.webUrl ?? "", + app: Apps.MicrosoftDrive, + docId: file.id!, + parentId: file.parentReference?.id ?? null, + owner: "", + photoLink: "", + ownerEmail: userEmail, + entity: DriveEntity.ExcelSpreadsheet, + chunks, + permissions: [], + mimeType: file.file?.mimeType ?? "", + metadata: JSON.stringify({ + parentPath: file.parentReference?.path, + size: file.size, + worksheetCount: workbook.value?.length || 0, + }), + createdAt: new Date(file.createdDateTime!).getTime(), + updatedAt: new Date(file.lastModifiedDateTime!).getTime(), + }) + } catch (error) { + console.error(`Error processing Excel file ${file.name}:`, error) + continue + } + } + + return results +} + +export async function processFileContent( + graphClient: MicrosoftGraphClient, + file: DriveItem, + userEmail: string, +): Promise { + const mimeType = file.file?.mimeType + + try { + switch (mimeType) { + case "application/pdf": + const pdfResults = await processMicrosoftPDFs( + graphClient, + [file], + userEmail, + ) + return pdfResults[0]?.chunks || [] + + case "application/vnd.openxmlformats-officedocument.wordprocessingml.document": + const wordResults = await processMicrosoftWord( + graphClient, + [file], + userEmail, + ) + return wordResults[0]?.chunks || [] + + case "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": + //TODO: breaking for huge excel files, response limit reached + // const excelResults = await processMicrosoftExcel( + // graphClient.client, + // [file], + // userEmail, + // ) + // return excelResults[0]?.chunks || [] + return [] + + default: + // For unsupported file types, return empty chunks (metadata only) + return [] + } + } catch (error) { + console.error(`Error processing file content for ${file.name}:`, error) + return [] // Fallback to metadata only on error + } +} + // Log sync progress export const logSyncProgress = ( operation: string, diff --git a/server/package.json b/server/package.json index abc884857..6fb138406 100644 --- a/server/package.json +++ b/server/package.json @@ -51,6 +51,7 @@ "@aws-sdk/client-bedrock": "^3.797.0", "@aws-sdk/client-bedrock-runtime": "^3.797.0", "@aws-sdk/client-ses": "^3.879.0", + "@azure/identity": "^4.12.0", "@azure/msal-node": "^3.7.3", "@google-cloud/vertexai": "^1.10.0", "@google/genai": "^1.9.0", diff --git a/server/queue/index.ts b/server/queue/index.ts index 61e461820..91ba44c1b 100644 --- a/server/queue/index.ts +++ b/server/queue/index.ts @@ -24,7 +24,10 @@ import { syncJobSuccess, } from "@/metrics/sync/sync-metrics" import { Auth } from "googleapis" -import { handleMicrosoftOAuthChanges } from "@/integrations/microsoft/sync" +import { + handleMicrosoftOAuthChanges, + handleMicrosoftServiceAccountChanges, +} from "@/integrations/microsoft/sync" const Logger = getLogger(Subsystem.Queue) const JobExpiryHours = config.JobExpiryHours @@ -218,6 +221,7 @@ const initWorkers = async () => { const startTime = Date.now() try { await handleGoogleServiceAccountChanges(boss, job) + await handleMicrosoftServiceAccountChanges() const endTime = Date.now() syncJobSuccess.inc( { diff --git a/server/server.ts b/server/server.ts index ed6b87747..ac6b1d47c 100644 --- a/server/server.ts +++ b/server/server.ts @@ -41,6 +41,7 @@ import { deleteUserDataSchema, ingestMoreChannelSchema, startSlackIngestionSchema, + microsoftServiceSchema, UserRoleChangeSchema, } from "@/types" import { @@ -73,6 +74,7 @@ import { adminQuerySchema, userAgentLeaderboardQuerySchema, agentAnalysisQuerySchema, + AddServiceConnectionMicrosoft, UpdateUser, HandlePerUserSlackSync, HandlePerUserGoogleWorkSpaceSync, @@ -1057,6 +1059,11 @@ export const AppRoutes = app zValidator("form", createOAuthProvider), CreateOAuthProvider, ) + .post( + "/microsoft/service_account", + zValidator("form", microsoftServiceSchema), + AddServiceConnectionMicrosoft, + ) .post( "/slack/ingest_more_channel", zValidator("json", ingestMoreChannelSchema), diff --git a/server/types.ts b/server/types.ts index 0cc9638a8..5037914a1 100644 --- a/server/types.ts +++ b/server/types.ts @@ -225,6 +225,13 @@ export const createOAuthProvider = z } }) +export const microsoftServiceSchema = z.object({ + clientId: z.string(), + clientSecret: z.string(), + tenantId: z.string(), + app: z.nativeEnum(Apps), +}) + export const deleteConnectorSchema = z.object({ connectorId: z.string(), }) @@ -285,6 +292,7 @@ export const deleteUserDataSchema = z.object({ export type DeleteUserDataPayload = z.infer export type OAuthProvider = z.infer +export type microsoftService = z.infer export type SaaSJob = { connectorId: number @@ -373,6 +381,11 @@ const MicrosoftCalendarDeltaTokenSchema = z.object({ calendarDeltaToken: z.string(), lastSyncedAt: z.coerce.date(), }) +const MicrosoftSharepointDeltaTokenSchema = z.object({ + type: z.literal("microsoftSharepointDeltaTokens"), + deltaLinks: z.record(z.string(), z.string()).optional(), + lastSyncedAt: z.coerce.date(), +}) const ChangeTokenSchema = z.discriminatedUnion("type", [ DefaultTokenSchema, @@ -382,6 +395,7 @@ const ChangeTokenSchema = z.discriminatedUnion("type", [ MicrosoftDriveDeltaTokenSchema, MicrosoftOutlookDeltaTokenSchema, MicrosoftCalendarDeltaTokenSchema, + MicrosoftSharepointDeltaTokenSchema, ]) // Define UpdatedAtVal schema @@ -444,6 +458,15 @@ export type GoogleServiceAccount = { private_key: string } +export type MicrosoftServiceCredentials = { + tenantId: string + clientId: string + clientSecret: string + scopes: string[] + access_token: string + expires_at: string +} + export enum MessageTypes { JwtParams = "JwtParams", } diff --git a/server/utils.ts b/server/utils.ts index c96a058ce..b3d20fcb3 100644 --- a/server/utils.ts +++ b/server/utils.ts @@ -5,10 +5,24 @@ import fs from "node:fs/promises" import { getLogger } from "@/logger" import { Subsystem } from "@/types" import { stopwords as englishStopwords } from "@orama/stopwords/english" -import { Apps } from "@xyne/vespa-ts/types" -import type { OAuth2Client } from "google-auth-library" +import { OAuth2Client } from "google-auth-library" +import { Apps, AuthType } from "./shared/types" import crypto from "node:crypto" import type { QueryRouterResponse, TemporalClassifier } from "@/ai/types" +import { + Client, + CustomAuthenticationProvider, +} from "@microsoft/microsoft-graph-client" +import { OAuthClientInformationFullSchema } from "@modelcontextprotocol/sdk/shared/auth.js" +import { + updateMicrosoftGraphClient, + type MicrosoftClient, + type MicrosoftGraphClient, +} from "./integrations/microsoft/client" +import { CustomServiceAuthProvider } from "./integrations/microsoft/utils" +import { scopes } from "./integrations/microsoft/config" +import { MicrosoftEntraId } from "arctic" +import config from "./config" const Logger = getLogger(Subsystem.Utils) @@ -134,7 +148,7 @@ export const retryWithBackoff = async ( context: string, app: Apps, retries = 0, - googleOauth2Client?: OAuth2Client, + authClient?: OAuth2Client | MicrosoftGraphClient, ): Promise => { try { return await fn() // Attempt the function @@ -180,19 +194,64 @@ export const retryWithBackoff = async ( )}ms (Attempt ${retries + 1}/${MAX_RETRIES})`, ) await delay(waitTime) - return retryWithBackoff(fn, context, app, retries + 1, googleOauth2Client) // Retry recursively - } else if (error.code === 401 && retries < MAX_RETRIES) { - if (IsGoogleApp(app)) { - Logger.info(`401 encountered, refreshing OAuth access token...`) - const { credentials } = await googleOauth2Client?.refreshAccessToken()! - googleOauth2Client?.setCredentials(credentials) - return retryWithBackoff( - fn, - context, - app, - retries + 1, - googleOauth2Client, - ) + return retryWithBackoff(fn, context, app, retries + 1, authClient) // Retry recursively + } else if ( + authClient && + (error.code === 401 || error.code === "InvalidAuthenticationToken") && + retries < MAX_RETRIES + ) { + if (authClient instanceof OAuth2Client) { + if (IsGoogleApp(app)) { + Logger.info(`401 encountered, refreshing OAuth access token...`) + const { credentials } = await authClient?.refreshAccessToken()! + authClient?.setCredentials(credentials) + return retryWithBackoff( + fn, + context, + app, + retries + 1, + authClient, + ) + } + else { + throw new Error("Provided AppType is not google") + } + } else if (IsMicrosoftApp(app)) { + if (authClient.refreshToken) { + // OAuth/Delegated authentication + const microsoft = new MicrosoftEntraId( + "common", + authClient.clientId, + authClient.clientSecret, + `${config.host}/oauth/callback`, + ) + + const refreshedTokens = await microsoft.refreshAccessToken( + authClient.refreshToken, + scopes, + ) + updateMicrosoftGraphClient( + authClient, + refreshedTokens.accessToken(), + refreshedTokens.refreshToken(), + ) + } else if (authClient.tenantId) { + // Service/app-only authentication + const authProvider = new CustomServiceAuthProvider( + authClient.tenantId, + authClient.clientId, + authClient.clientSecret, + ) + updateMicrosoftGraphClient( + authClient, + await authProvider.getAccessToken(), + ) + } else { + throw new Error( + "Not enough credentials provided for getting access token after expiry", + ) + } + return retryWithBackoff(fn, context, app, retries + 1, authClient) } else { throw new Error("401 error for unsupported app") } @@ -254,7 +313,8 @@ export const IsMicrosoftApp = (app: Apps) => { return ( app === Apps.MicrosoftDrive || app === Apps.MicrosoftOutlook || - app === Apps.MicrosoftCalendar + app === Apps.MicrosoftCalendar || + app === Apps.MicrosoftSharepoint ) }