-
Notifications
You must be signed in to change notification settings - Fork 4.2k
feat(api,worker): skip sending messages outside of the subscribers schedule fixes NV-6618 #9126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1039b96
8cc0e4a
6b6a5ff
4faf036
fc0191a
d9b463c
592725e
f8f2a6d
ae05b01
4a3cd91
2ea98ee
fdd6624
50414a9
d49cb44
ee09c0f
1377585
a19b983
90f7cc7
cf683a8
02969f9
c277fb9
a3e2d61
81ea9d0
7bbd110
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,3 +92,4 @@ IS_TRACE_LOGS_ENABLED=true | |
| IS_STEP_RUN_LOGS_WRITE_ENABLED=true | ||
| IS_WORKFLOW_RUN_LOGS_WRITE_ENABLED=true | ||
| IS_NOTIFICATION_SEVERITY_ENABLED=true | ||
| IS_SUBSCRIBERS_SCHEDULE_ENABLED=true | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the worker logic to work when running e2e tests |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,9 @@ import { | |
| CreateExecutionDetails, | ||
| CreateExecutionDetailsCommand, | ||
| DetailEnum, | ||
| FeatureFlagsService, | ||
| GetSubscriberSchedule, | ||
| GetSubscriberScheduleCommand, | ||
| getJobDigest, | ||
| Instrument, | ||
| InstrumentUsecase, | ||
|
|
@@ -12,8 +15,20 @@ import { | |
| WorkflowRunService, | ||
| WorkflowRunStatusEnum, | ||
| } from '@novu/application-generic'; | ||
| import { JobEntity, JobRepository, JobStatusEnum, NotificationRepository } from '@novu/dal'; | ||
| import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum } from '@novu/shared'; | ||
| import { | ||
| JobEntity, | ||
| JobRepository, | ||
| JobStatusEnum, | ||
| NotificationEntity, | ||
| NotificationRepository, | ||
| SubscriberRepository, | ||
| } from '@novu/dal'; | ||
| import { | ||
| ExecutionDetailsSourceEnum, | ||
| ExecutionDetailsStatusEnum, | ||
| FeatureFlagsKeysEnum, | ||
| StepTypeEnum, | ||
| } from '@novu/shared'; | ||
| import { setUser } from '@sentry/node'; | ||
| import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, PlatformException, shouldHaltOnStepFailure } from '../../../shared/utils'; | ||
| import { AddJob } from '../add-job'; | ||
|
|
@@ -23,6 +38,7 @@ import { SendMessageStatus } from '../send-message/send-message-type.usecase'; | |
| import { SetJobAsFailedCommand } from '../update-job-status/set-job-as.command'; | ||
| import { SetJobAsFailed } from '../update-job-status/set-job-as-failed.usecase'; | ||
| import { RunJobCommand } from './run-job.command'; | ||
| import { isWithinSchedule } from './schedule-validator'; | ||
|
|
||
| const nr = require('newrelic'); | ||
|
|
||
|
|
@@ -41,7 +57,10 @@ export class RunJob { | |
| private stepRunRepository: StepRunRepository, | ||
| private workflowRunService: WorkflowRunService, | ||
| private createExecutionDetails: CreateExecutionDetails, | ||
| private logger: PinoLogger | ||
| private getSubscriberSchedule: GetSubscriberSchedule, | ||
| private logger: PinoLogger, | ||
| private subscriberRepository: SubscriberRepository, | ||
| private featureFlagsService: FeatureFlagsService | ||
| ) { | ||
| this.logger.setContext(this.constructor.name); | ||
| } | ||
|
|
@@ -90,7 +109,7 @@ export class RunJob { | |
| }); | ||
|
|
||
| let shouldQueueNextJob = true; | ||
| let error: any; | ||
| let error: Error | undefined; | ||
|
|
||
| try { | ||
| await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.RUNNING); | ||
|
|
@@ -106,6 +125,63 @@ export class RunJob { | |
| throw new PlatformException(`Notification with id ${job._notificationId} not found`); | ||
| } | ||
|
|
||
| const isSubscribersScheduleEnabled = await this.featureFlagsService.getFlag({ | ||
| key: FeatureFlagsKeysEnum.IS_SUBSCRIBERS_SCHEDULE_ENABLED, | ||
| defaultValue: false, | ||
| organization: { _id: job._organizationId }, | ||
| environment: { _id: job._environmentId }, | ||
| }); | ||
|
|
||
| if (isSubscribersScheduleEnabled && !this.shouldSkipScheduleCheck(job, notification)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the |
||
| const schedule = await this.getSubscriberSchedule.execute( | ||
| GetSubscriberScheduleCommand.create({ | ||
| environmentId: job._environmentId, | ||
| organizationId: job._organizationId, | ||
| _subscriberId: job._subscriberId, | ||
| }) | ||
| ); | ||
|
|
||
| const subscriber = await this.subscriberRepository.findOne( | ||
| { | ||
| _id: job._subscriberId, | ||
| _environmentId: job._environmentId, | ||
| _organizationId: job._organizationId, | ||
| }, | ||
| 'timezone', | ||
| { readPreference: 'secondaryPreferred' } | ||
| ); | ||
|
Comment on lines
+136
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a general note, if the subscriber is refetched again in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those two were made to be lightweight:
|
||
|
|
||
| if (!isWithinSchedule(schedule, new Date(), subscriber?.timezone)) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If not within the schedule (outside), then cancel the job and provide the step_run and trace details |
||
| this.logger.info( | ||
| { | ||
| jobId: job._id, | ||
| subscriberId: job.subscriberId, | ||
| stepType: job.type, | ||
| }, | ||
| "The step was skipped as it fell outside the subscriber's schedule" | ||
| ); | ||
|
|
||
| await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.CANCELED); | ||
|
|
||
| await this.stepRunRepository.create(job, { | ||
| status: JobStatusEnum.CANCELED, | ||
| }); | ||
|
|
||
| await this.createExecutionDetails.execute( | ||
| CreateExecutionDetailsCommand.create({ | ||
| ...CreateExecutionDetailsCommand.getDetailsFromJob(job), | ||
| detail: DetailEnum.SKIPPED_STEP_OUTSIDE_OF_THE_SCHEDULE, | ||
| source: ExecutionDetailsSourceEnum.INTERNAL, | ||
| status: ExecutionDetailsStatusEnum.SUCCESS, | ||
| isTest: false, | ||
| isRetry: false, | ||
| }) | ||
| ); | ||
|
|
||
| return; | ||
| } | ||
| } | ||
|
|
||
| if (this.isUnsnoozeJob(job)) { | ||
| await this.processUnsnoozeJob.execute( | ||
| ProcessUnsnoozeJobCommand.create({ | ||
|
|
@@ -191,15 +267,15 @@ export class RunJob { | |
| status: JobStatusEnum.CANCELED, | ||
| }); | ||
| } | ||
| } catch (caughtError: any) { | ||
| error = caughtError; | ||
| } catch (caughtError: unknown) { | ||
| error = caughtError as Error; | ||
| await this.stepRunRepository.create(job, { | ||
| status: JobStatusEnum.FAILED, | ||
| errorCode: 'execution_error', | ||
| errorMessage: caughtError.message, | ||
| errorMessage: error.message, | ||
| }); | ||
|
|
||
| if (shouldHaltOnStepFailure(job) && !this.shouldBackoff(caughtError)) { | ||
| if (shouldHaltOnStepFailure(job) && !this.shouldBackoff(error)) { | ||
| await this.jobRepository.cancelPendingJobs({ | ||
| transactionId: job.transactionId, | ||
| _environmentId: job._environmentId, | ||
|
|
@@ -208,7 +284,7 @@ export class RunJob { | |
| }); | ||
| } | ||
|
|
||
| if (shouldHaltOnStepFailure(job) || this.shouldBackoff(caughtError)) { | ||
| if (shouldHaltOnStepFailure(job) || this.shouldBackoff(error)) { | ||
| shouldQueueNextJob = false; | ||
| } | ||
| throw caughtError; | ||
|
|
@@ -316,7 +392,7 @@ export class RunJob { | |
| subscriberId: nextJob._subscriberId, | ||
| }); | ||
| } | ||
| } catch (error: any) { | ||
| } catch (error: unknown) { | ||
| if (!nextJob) { | ||
| // Fallback: update workflow run status if nextJob is unexpectedly missing | ||
| // (should not occur due to prior nextJob check in loop) | ||
|
|
@@ -336,10 +412,10 @@ export class RunJob { | |
| organizationId: nextJob._organizationId, | ||
| userId: nextJob._userId, | ||
| }), | ||
| error | ||
| error as Error | ||
| ); | ||
|
|
||
| if (shouldHaltOnStepFailure(nextJob) && !this.shouldBackoff(error)) { | ||
| if (shouldHaltOnStepFailure(nextJob) && !this.shouldBackoff(error as Error)) { | ||
| // Update workflow run status based on step runs when halting on step failure | ||
| await this.workflowRunService.updateDeliveryLifecycle({ | ||
| notificationId: nextJob._notificationId, | ||
|
|
@@ -356,7 +432,7 @@ export class RunJob { | |
| }); | ||
| } | ||
|
|
||
| if (shouldHaltOnStepFailure(nextJob) || this.shouldBackoff(error)) { | ||
| if (shouldHaltOnStepFailure(nextJob) || this.shouldBackoff(error as Error)) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -369,7 +445,7 @@ export class RunJob { | |
| } | ||
| } | ||
|
|
||
| private assignLogger(job) { | ||
| private assignLogger(job: JobEntity) { | ||
| try { | ||
| if (this.logger) { | ||
| this.logger.assign({ | ||
|
|
@@ -442,4 +518,20 @@ export class RunJob { | |
| public shouldBackoff(error: Error): boolean { | ||
| return error?.message?.includes(EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER); | ||
| } | ||
|
|
||
| private shouldSkipScheduleCheck(job: JobEntity, notification: NotificationEntity): boolean { | ||
| // always deliver in-app messages or critical messages | ||
| // let trigger,digest and delay finish their execution | ||
| if ( | ||
| job.type === StepTypeEnum.TRIGGER || | ||
| job.type === StepTypeEnum.IN_APP || | ||
| job.type === StepTypeEnum.DELAY || | ||
| job.type === StepTypeEnum.DIGEST || | ||
| notification.critical | ||
| ) { | ||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to the application-generic