Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1039b96
feat(api-service): subscribers schedule api
LetItRock Sep 5, 2025
8cc0e4a
Merge branch 'next' into nv-6614-subscribers-schedule-api
LetItRock Sep 5, 2025
6b6a5ff
chore(api-service): remove unused import
LetItRock Sep 5, 2025
4faf036
chore(api-service): fix function name
LetItRock Sep 5, 2025
fc0191a
feat(js): schedule sub module
LetItRock Sep 5, 2025
d9b463c
Merge branch 'next' into nv-6614-subscribers-schedule-api
LetItRock Sep 8, 2025
592725e
chore(api-service): fixed failing unit tests
LetItRock Sep 8, 2025
f8f2a6d
Merge branch 'nv-6614-subscribers-schedule-api' into nv-6615-inbox-js…
LetItRock Sep 8, 2025
ae05b01
Merge branch 'next' into nv-6615-inbox-js-global-preference-schedule
LetItRock Sep 8, 2025
4a3cd91
feat(js): inbox subscribers schedule
LetItRock Sep 9, 2025
2ea98ee
Merge branch 'next' into nv-6615-inbox-js-global-preference-schedule
LetItRock Sep 9, 2025
fdd6624
Merge branch 'nv-6615-inbox-js-global-preference-schedule' into nv-66…
LetItRock Sep 9, 2025
50414a9
chore(root): satisfy cspell
LetItRock Sep 10, 2025
d49cb44
feat(react,js): default schedule and useSchedule hook
LetItRock Sep 10, 2025
ee09c0f
feat(dashboard): allow updating subscribers schedule
LetItRock Sep 11, 2025
1377585
Merge branch 'next' into nv-6616-inbox-schedule
LetItRock Sep 11, 2025
a19b983
Merge branch 'nv-6616-inbox-schedule' into nv-6616-react-use-schedule…
LetItRock Sep 11, 2025
90f7cc7
Merge branch 'nv-6616-react-use-schedule-and-default-schedule' into n…
LetItRock Sep 11, 2025
cf683a8
chore(dashboard): remove console.log
LetItRock Sep 11, 2025
02969f9
Merge branch 'next' into nv-6617-dashboard-subscribers-schedule
LetItRock Sep 11, 2025
c277fb9
chore(dashboard): suggestions from the pr
LetItRock Sep 11, 2025
a3e2d61
feat(api,worker): skip sending messages outside of the subscribers sc…
LetItRock Sep 11, 2025
81ea9d0
Merge branch 'next' into nv-6617-dashboard-subscribers-schedule
LetItRock Sep 11, 2025
7bbd110
Merge branch 'nv-6617-dashboard-subscribers-schedule' into nv-6618-sk…
LetItRock Sep 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
638 changes: 638 additions & 0 deletions apps/api/src/app/events/e2e/trigger-event.e2e.ts

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion apps/api/src/app/inbox/usecases/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
GetSubscriberSchedule,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to the application-generic

GetSubscriberTemplatePreference,
GetWorkflowByIdsUseCase,
MessageInteractionService,
Expand All @@ -11,7 +12,6 @@ import { GenerateUniqueApiKey } from '../../environments-v1/usecases/generate-un
import { ParseEventRequest } from '../../events/usecases/parse-event-request';
import { VerifyPayload } from '../../events/usecases/verify-payload';
import { GetSubscriberGlobalPreference } from '../../subscribers/usecases/get-subscriber-global-preference';
import { GetSubscriberSchedule } from '../../subscribers/usecases/get-subscriber-schedule';
import { BulkUpdatePreferences } from './bulk-update-preferences/bulk-update-preferences.usecase';
import { DeleteAllNotifications } from './delete-all-notifications/delete-all-notifications.usecase';
import { DeleteManyNotifications } from './delete-many-notifications/delete-many-notifications.usecase';
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/app/inbox/usecases/session/session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
AnalyticsService,
CreateOrUpdateSubscriberUseCase,
FeatureFlagsService,
GetSubscriberSchedule,
PinoLogger,
SelectIntegration,
UpsertControlValuesUseCase,
Expand All @@ -24,7 +25,6 @@ import { AuthService } from '../../../auth/services/auth.service';
import { GenerateUniqueApiKey } from '../../../environments-v1/usecases/generate-unique-api-key/generate-unique-api-key.usecase';
import { CreateNovuIntegrations } from '../../../integrations/usecases/create-novu-integrations/create-novu-integrations.usecase';
import { GetOrganizationSettings } from '../../../organization/usecases/get-organization-settings/get-organization-settings.usecase';
import { GetSubscriberSchedule } from '../../../subscribers/usecases/get-subscriber-schedule/get-subscriber-schedule.usecase';
import { SubscriberSessionResponseDto } from '../../dtos/subscriber-session-response.dto';
import { AnalyticsEventsEnum } from '../../utils';
import * as encryption from '../../utils/encryption';
Expand Down
6 changes: 2 additions & 4 deletions apps/api/src/app/inbox/usecases/session/session.usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
CreateOrUpdateSubscriberUseCase,
encryptApiKey,
FeatureFlagsService,
GetSubscriberSchedule,
GetSubscriberScheduleCommand,
generateTimestampHex,
LogDecorator,
PinoLogger,
Expand Down Expand Up @@ -59,10 +61,6 @@ import { GetOrganizationSettingsCommand } from '../../../organization/usecases/g
import { GetOrganizationSettings } from '../../../organization/usecases/get-organization-settings/get-organization-settings.usecase';
import { ScheduleDto } from '../../../shared/dtos/schedule';
import { isHmacValid } from '../../../shared/helpers/is-valid-hmac';
import {
GetSubscriberSchedule,
GetSubscriberScheduleCommand,
} from '../../../subscribers/usecases/get-subscriber-schedule';
import { SubscriberDto, SubscriberSessionRequestDto } from '../../dtos/subscriber-session-request.dto';
import { SubscriberSessionResponseDto } from '../../dtos/subscriber-session-response.dto';
import { AnalyticsEventsEnum } from '../../utils';
Expand Down
1 change: 1 addition & 0 deletions apps/worker/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ IS_TRACE_LOGS_ENABLED=true
IS_STEP_RUN_LOGS_WRITE_ENABLED=true
IS_WORKFLOW_RUN_LOGS_WRITE_ENABLED=true
IS_NOTIFICATION_SEVERITY_ENABLED=true
IS_SUBSCRIBERS_SCHEDULE_ENABLED=true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the worker logic to work when running e2e tests

120 changes: 106 additions & 14 deletions apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import {
CreateExecutionDetails,
CreateExecutionDetailsCommand,
DetailEnum,
FeatureFlagsService,
GetSubscriberSchedule,
GetSubscriberScheduleCommand,
getJobDigest,
Instrument,
InstrumentUsecase,
Expand All @@ -12,8 +15,20 @@ import {
WorkflowRunService,
WorkflowRunStatusEnum,
} from '@novu/application-generic';
import { JobEntity, JobRepository, JobStatusEnum, NotificationRepository } from '@novu/dal';
import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum } from '@novu/shared';
import {
JobEntity,
JobRepository,
JobStatusEnum,
NotificationEntity,
NotificationRepository,
SubscriberRepository,
} from '@novu/dal';
import {
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
FeatureFlagsKeysEnum,
StepTypeEnum,
} from '@novu/shared';
import { setUser } from '@sentry/node';
import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, PlatformException, shouldHaltOnStepFailure } from '../../../shared/utils';
import { AddJob } from '../add-job';
Expand All @@ -23,6 +38,7 @@ import { SendMessageStatus } from '../send-message/send-message-type.usecase';
import { SetJobAsFailedCommand } from '../update-job-status/set-job-as.command';
import { SetJobAsFailed } from '../update-job-status/set-job-as-failed.usecase';
import { RunJobCommand } from './run-job.command';
import { isWithinSchedule } from './schedule-validator';

const nr = require('newrelic');

Expand All @@ -41,7 +57,10 @@ export class RunJob {
private stepRunRepository: StepRunRepository,
private workflowRunService: WorkflowRunService,
private createExecutionDetails: CreateExecutionDetails,
private logger: PinoLogger
private getSubscriberSchedule: GetSubscriberSchedule,
private logger: PinoLogger,
private subscriberRepository: SubscriberRepository,
private featureFlagsService: FeatureFlagsService
) {
this.logger.setContext(this.constructor.name);
}
Expand Down Expand Up @@ -90,7 +109,7 @@ export class RunJob {
});

let shouldQueueNextJob = true;
let error: any;
let error: Error | undefined;

try {
await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.RUNNING);
Expand All @@ -106,6 +125,63 @@ export class RunJob {
throw new PlatformException(`Notification with id ${job._notificationId} not found`);
}

const isSubscribersScheduleEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_SUBSCRIBERS_SCHEDULE_ENABLED,
defaultValue: false,
organization: { _id: job._organizationId },
environment: { _id: job._environmentId },
});

if (isSubscribersScheduleEnabled && !this.shouldSkipScheduleCheck(job, notification)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the trigger, in-app, digest, delay and critical workflows will be "skipped" when checking the schedule

const schedule = await this.getSubscriberSchedule.execute(
GetSubscriberScheduleCommand.create({
environmentId: job._environmentId,
organizationId: job._organizationId,
_subscriberId: job._subscriberId,
})
);

const subscriber = await this.subscriberRepository.findOne(
{
_id: job._subscriberId,
_environmentId: job._environmentId,
_organizationId: job._organizationId,
},
'timezone',
{ readPreference: 'secondaryPreferred' }
);
Comment on lines +136 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general note, if the subscriber is refetched again in the schedule usecase, or soemwhere up the scope, better to reuse it to avoid a duplicate db call if possible. (not sure if it's the case here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two were made to be lightweight:

  • the getSubscriberSchedule only uses one request to read preferences, nothing more
  • the this.subscriberRepository.findOne is done to secondary replica and only fetch timezone


if (!isWithinSchedule(schedule, new Date(), subscriber?.timezone)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not within the schedule (outside), then cancel the job and provide the step_run and trace details

this.logger.info(
{
jobId: job._id,
subscriberId: job.subscriberId,
stepType: job.type,
},
"The step was skipped as it fell outside the subscriber's schedule"
);

await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.CANCELED);

await this.stepRunRepository.create(job, {
status: JobStatusEnum.CANCELED,
});

await this.createExecutionDetails.execute(
CreateExecutionDetailsCommand.create({
...CreateExecutionDetailsCommand.getDetailsFromJob(job),
detail: DetailEnum.SKIPPED_STEP_OUTSIDE_OF_THE_SCHEDULE,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.SUCCESS,
isTest: false,
isRetry: false,
})
);

return;
}
}

if (this.isUnsnoozeJob(job)) {
await this.processUnsnoozeJob.execute(
ProcessUnsnoozeJobCommand.create({
Expand Down Expand Up @@ -191,15 +267,15 @@ export class RunJob {
status: JobStatusEnum.CANCELED,
});
}
} catch (caughtError: any) {
error = caughtError;
} catch (caughtError: unknown) {
error = caughtError as Error;
await this.stepRunRepository.create(job, {
status: JobStatusEnum.FAILED,
errorCode: 'execution_error',
errorMessage: caughtError.message,
errorMessage: error.message,
});

if (shouldHaltOnStepFailure(job) && !this.shouldBackoff(caughtError)) {
if (shouldHaltOnStepFailure(job) && !this.shouldBackoff(error)) {
await this.jobRepository.cancelPendingJobs({
transactionId: job.transactionId,
_environmentId: job._environmentId,
Expand All @@ -208,7 +284,7 @@ export class RunJob {
});
}

if (shouldHaltOnStepFailure(job) || this.shouldBackoff(caughtError)) {
if (shouldHaltOnStepFailure(job) || this.shouldBackoff(error)) {
shouldQueueNextJob = false;
}
throw caughtError;
Expand Down Expand Up @@ -316,7 +392,7 @@ export class RunJob {
subscriberId: nextJob._subscriberId,
});
}
} catch (error: any) {
} catch (error: unknown) {
if (!nextJob) {
// Fallback: update workflow run status if nextJob is unexpectedly missing
// (should not occur due to prior nextJob check in loop)
Expand All @@ -336,10 +412,10 @@ export class RunJob {
organizationId: nextJob._organizationId,
userId: nextJob._userId,
}),
error
error as Error
);

if (shouldHaltOnStepFailure(nextJob) && !this.shouldBackoff(error)) {
if (shouldHaltOnStepFailure(nextJob) && !this.shouldBackoff(error as Error)) {
// Update workflow run status based on step runs when halting on step failure
await this.workflowRunService.updateDeliveryLifecycle({
notificationId: nextJob._notificationId,
Expand All @@ -356,7 +432,7 @@ export class RunJob {
});
}

if (shouldHaltOnStepFailure(nextJob) || this.shouldBackoff(error)) {
if (shouldHaltOnStepFailure(nextJob) || this.shouldBackoff(error as Error)) {
return;
}

Expand All @@ -369,7 +445,7 @@ export class RunJob {
}
}

private assignLogger(job) {
private assignLogger(job: JobEntity) {
try {
if (this.logger) {
this.logger.assign({
Expand Down Expand Up @@ -442,4 +518,20 @@ export class RunJob {
public shouldBackoff(error: Error): boolean {
return error?.message?.includes(EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER);
}

private shouldSkipScheduleCheck(job: JobEntity, notification: NotificationEntity): boolean {
// always deliver in-app messages or critical messages
// let trigger,digest and delay finish their execution
if (
job.type === StepTypeEnum.TRIGGER ||
job.type === StepTypeEnum.IN_APP ||
job.type === StepTypeEnum.DELAY ||
job.type === StepTypeEnum.DIGEST ||
notification.critical
) {
return true;
}

return false;
}
}
Loading
Loading