Skip to content

Commit 32a3d3b

Browse files
authored
Run email triggers in jobs (#1032)
1 parent c68ad82 commit 32a3d3b

File tree

5 files changed

+193
-64
lines changed

5 files changed

+193
-64
lines changed

packages/core/src/jobs/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export enum Jobs {
3434
autoScaleJob = 'autoScaleJob',
3535
updateMcpServerLastUsedJob = 'updateMcpServerLastUsedJob',
3636
scaleDownMcpServerJob = 'scaleDownMcpServerJob',
37+
runEmailTriggerJob = 'runEmailTriggerJob',
3738
}
3839

3940
export const QUEUES = {
@@ -53,6 +54,7 @@ export const QUEUES = {
5354
'requestDocumentSuggestionsJob',
5455
'checkScheduledDocumentTriggersJob',
5556
'processScheduledTriggerJob',
57+
'runEmailTriggerJob',
5658
],
5759
},
5860
[Queues.maintenanceQueue]: {
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import { checkScheduledDocumentTriggersJob } from './checkScheduledDocumentTriggersJob'
22
import { processScheduledTriggerJob } from './processScheduledTriggerJob'
3+
import { runEmailTriggerJob } from './runEmailTriggerJob'
34

4-
export { checkScheduledDocumentTriggersJob, processScheduledTriggerJob }
5+
export {
6+
checkScheduledDocumentTriggersJob,
7+
processScheduledTriggerJob,
8+
runEmailTriggerJob,
9+
}

packages/core/src/services/documentTriggers/handlers/email/getResponse.ts renamed to packages/core/src/jobs/job-definitions/documentTriggers/runEmailTriggerJob/getResponse.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
DocumentVersionsRepository,
1717
} from '../../../../repositories'
1818
import { database } from '../../../../client'
19-
import { runDocumentAtCommit } from '../../../commits/runDocumentAtCommit'
19+
import { runDocumentAtCommit } from '../../../../services/commits'
2020
import { unsafelyFindWorkspace } from '../../../../data-access'
2121
import { DocumentTrigger, Workspace } from '../../../../browser'
2222
import {
@@ -25,9 +25,9 @@ import {
2525
type AssistantMessage,
2626
type UserMessage,
2727
} from '@latitude-data/compiler'
28-
import { addMessages } from '../../../documentLogs'
29-
import { uploadFile } from '../../../files'
30-
import { EmailTriggerConfiguration } from '../../helpers/schema'
28+
import { addMessages } from '../../../../services/documentLogs'
29+
import { uploadFile } from '../../../../services/files'
30+
import { EmailTriggerConfiguration } from '../../../../services/documentTriggers/helpers/schema'
3131

3232
async function getNewTriggerResponse(
3333
{
@@ -216,7 +216,7 @@ export async function getEmailResponse(
216216
senderName,
217217
subject,
218218
body,
219-
attachments: attachedFiles,
219+
attachments,
220220
}: {
221221
documentUuid: string
222222
trigger: DocumentTrigger
@@ -226,7 +226,7 @@ export async function getEmailResponse(
226226
senderName: string | undefined
227227
subject: string
228228
body: string
229-
attachments?: File[]
229+
attachments?: PromptLFile[]
230230
},
231231
db = database,
232232
): PromisedResult<AssistantMessage, LatitudeError> {
@@ -235,13 +235,6 @@ export async function getEmailResponse(
235235
db,
236236
)) as Workspace
237237

238-
const attachmentsResult = await uploadAttachments({
239-
workspace,
240-
attachments: attachedFiles ?? [],
241-
})
242-
if (attachmentsResult.error) return attachmentsResult
243-
const attachments = attachmentsResult.unwrap()
244-
245238
const referencedLogResult = await findReferencedLog(
246239
{
247240
workspace,
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { Job } from 'bullmq'
2+
import { EMAIL_TRIGGER_DOMAIN } from '@latitude-data/constants'
3+
import { PromptLFile } from 'promptl-ai'
4+
import { unsafelyFindWorkspace } from '../../../../data-access'
5+
import { DocumentTrigger, HEAD_COMMIT, Workspace } from '../../../../browser'
6+
import {
7+
DocumentTriggersRepository,
8+
DocumentVersionsRepository,
9+
} from '../../../../repositories'
10+
import { getEmailResponse } from './getResponse'
11+
import { EmailTriggerConfiguration } from '../../../../services/documentTriggers/helpers/schema'
12+
import { LatitudeError, PromisedResult, Result } from '../../../../lib'
13+
import { DocumentTriggerMailer } from '../../../../mailers'
14+
15+
export type RunEmailTriggerJobData = {
16+
workspaceId: number
17+
triggerId: number
18+
recipient: string
19+
senderEmail: string
20+
senderName: string | undefined
21+
messageId?: string
22+
parentMessageIds?: string[]
23+
subject: string
24+
body: string
25+
attachments: PromptLFile[]
26+
}
27+
28+
async function getTriggerName(
29+
trigger: DocumentTrigger,
30+
): PromisedResult<string, LatitudeError> {
31+
const configName = (
32+
trigger.configuration as EmailTriggerConfiguration
33+
).name?.trim()
34+
if (configName?.length) {
35+
return Result.ok(configName)
36+
}
37+
38+
const docsScope = new DocumentVersionsRepository(trigger.workspaceId)
39+
const documentResult = await docsScope.getDocumentAtCommit({
40+
projectId: trigger.projectId,
41+
commitUuid: HEAD_COMMIT,
42+
documentUuid: trigger.documentUuid,
43+
})
44+
if (documentResult.error) return documentResult
45+
46+
const document = documentResult.unwrap()
47+
const docName = document.path.split('/').pop()!
48+
return Result.ok(docName)
49+
}
50+
51+
export const runEmailTriggerJob = async (job: Job<RunEmailTriggerJobData>) => {
52+
const {
53+
workspaceId,
54+
triggerId,
55+
senderEmail,
56+
senderName,
57+
messageId,
58+
parentMessageIds,
59+
subject,
60+
body,
61+
attachments,
62+
} = job.data
63+
64+
const workspace = (await unsafelyFindWorkspace(workspaceId)) as Workspace
65+
66+
const triggerScope = new DocumentTriggersRepository(workspace.id)
67+
const triggerResult = await triggerScope.find(triggerId)
68+
const trigger = triggerResult.unwrap()
69+
70+
const nameResult = await getTriggerName(trigger)
71+
const name = nameResult.unwrap()
72+
73+
const responseResult = await getEmailResponse({
74+
documentUuid: trigger.documentUuid,
75+
trigger,
76+
messageId,
77+
parentMessageIds,
78+
senderEmail,
79+
senderName,
80+
subject,
81+
body,
82+
attachments,
83+
})
84+
85+
const configuration = trigger.configuration as EmailTriggerConfiguration
86+
if (configuration.replyWithResponse === false) {
87+
return
88+
}
89+
90+
const from = `${JSON.stringify(name)} <${trigger.documentUuid}@${EMAIL_TRIGGER_DOMAIN}>`
91+
92+
const references = [
93+
...(parentMessageIds ?? []),
94+
...(messageId ? [messageId] : []),
95+
].join(' ')
96+
97+
const headers = messageId
98+
? {
99+
'In-Reply-To': messageId,
100+
References: references,
101+
// It seems like nodemailer-mailgun-transport is ignoring the "References" header.
102+
// Maybe some of these will work:
103+
'X-Mailgun-References': references,
104+
'h:References': references,
105+
'h:X-Mailgun-References': references,
106+
}
107+
: undefined
108+
109+
const mailer = new DocumentTriggerMailer(responseResult, {
110+
to: senderEmail,
111+
from,
112+
inReplyTo: messageId,
113+
references,
114+
subject: 'Re: ' + subject,
115+
headers,
116+
})
117+
118+
const result = await mailer.send()
119+
result.unwrap()
120+
}

packages/core/src/services/documentTriggers/handlers/email/index.ts

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ import {
1010
EMAIL_TRIGGER_DOMAIN,
1111
} from '@latitude-data/constants'
1212
import { database } from '../../../../client'
13-
import { DocumentTrigger, HEAD_COMMIT } from '../../../../browser'
14-
import { DocumentTriggerMailer } from '../../../../mailers'
15-
import { getEmailResponse } from './getResponse'
13+
import { DocumentTrigger, HEAD_COMMIT, Workspace } from '../../../../browser'
1614
import { DocumentVersionsRepository } from '../../../../repositories'
1715
import { EmailTriggerConfiguration } from '../../helpers/schema'
16+
import { PromptLFile } from 'promptl-ai'
17+
import { uploadFile } from '../../../files'
18+
import { unsafelyFindWorkspace } from '../../../../data-access'
19+
import { setupQueues } from '../../../../jobs'
20+
import { RunEmailTriggerJobData } from '../../../../jobs/job-definitions/documentTriggers/runEmailTriggerJob'
1821

1922
async function getTriggerName(
2023
trigger: DocumentTrigger,
@@ -67,6 +70,27 @@ export async function assertTriggerFilters({
6770
return Result.error(new BadRequestError('Sender is not in whitelist'))
6871
}
6972

73+
export async function uploadAttachments({
74+
workspace,
75+
attachments,
76+
}: {
77+
workspace: Workspace
78+
attachments: File[]
79+
}): PromisedResult<PromptLFile[], LatitudeError> {
80+
const results = await Promise.all(
81+
attachments.map(async (file) => {
82+
return await uploadFile({ file, workspace })
83+
}),
84+
)
85+
86+
const errors = results.filter((result) => result.error)
87+
if (errors.length) {
88+
return Result.error(errors[0]!.error! as LatitudeError)
89+
}
90+
91+
return Result.ok(results.map((result) => result.unwrap()))
92+
}
93+
7094
export async function handleEmailTrigger(
7195
{
7296
recipient,
@@ -110,56 +134,41 @@ export async function handleEmailTrigger(
110134
})
111135
if (assertFilterResult.error) return assertFilterResult
112136

113-
const name = await getTriggerName(trigger, db)
114-
if (name.error) return name
115-
116-
const responseResult = await getEmailResponse(
117-
{
118-
documentUuid: documentUuid!,
119-
trigger,
120-
messageId,
121-
parentMessageIds,
122-
senderEmail,
123-
senderName,
124-
subject,
125-
body,
126-
attachments,
127-
},
137+
const workspace = (await unsafelyFindWorkspace(
138+
trigger.workspaceId,
128139
db,
129-
)
140+
)) as Workspace
130141

131-
const configuration = trigger.configuration as EmailTriggerConfiguration
132-
if (configuration.replyWithResponse === false) return Result.nil()
133-
134-
const from = `${JSON.stringify(name.unwrap())} <${trigger.documentUuid}@${EMAIL_TRIGGER_DOMAIN}>`
135-
136-
const references = [
137-
...(parentMessageIds ?? []),
138-
...(messageId ? [messageId] : []),
139-
].join(' ')
140-
141-
const headers = messageId
142-
? {
143-
'In-Reply-To': messageId,
144-
References: references,
145-
// It seems like nodemailer-mailgun-transport is ignoring the "References" header.
146-
// Maybe some of these will work:
147-
'X-Mailgun-References': references,
148-
'h:References': references,
149-
'h:X-Mailgun-References': references,
150-
}
151-
: undefined
152-
153-
const mailer = new DocumentTriggerMailer(responseResult, {
154-
to: senderEmail,
155-
from,
156-
inReplyTo: messageId,
157-
references,
158-
subject: 'Re: ' + subject,
159-
headers,
142+
const nameResult = await getTriggerName(trigger, db)
143+
if (nameResult.error) return nameResult
144+
145+
const uploadResult = await uploadAttachments({
146+
workspace,
147+
attachments: attachments ?? [],
160148
})
149+
if (uploadResult.error) return uploadResult
150+
const uploadedFiles = uploadResult.unwrap()
151+
152+
const jobQueues = await setupQueues()
153+
154+
const runJobData: RunEmailTriggerJobData = {
155+
workspaceId: workspace.id,
156+
triggerId: trigger.id,
157+
recipient: recipient,
158+
senderEmail: senderEmail,
159+
senderName: senderName,
160+
messageId: messageId,
161+
parentMessageIds: parentMessageIds,
162+
subject: subject,
163+
body: body,
164+
attachments: uploadedFiles,
165+
}
166+
167+
const job =
168+
await jobQueues.defaultQueue.jobs.enqueueRunEmailTriggerJob(runJobData)
169+
if (!job.id) {
170+
return Result.error(new LatitudeError('Failed to enqueue job'))
171+
}
161172

162-
const sendResult = await mailer.send()
163-
if (sendResult.error) return sendResult
164173
return Result.nil()
165174
}

0 commit comments

Comments
 (0)