Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
2 changes: 2 additions & 0 deletions apps/api/src/app/inbox/inbox.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { OutboundWebhooksModule } from '../outbound-webhooks/outbound-webhooks.m
import { PreferencesModule } from '../preferences';
import { SharedModule } from '../shared/shared.module';
import { SubscribersV1Module } from '../subscribers/subscribersV1.module';
import { TopicsV2Module } from '../topics-v2/topics-v2.module';
import { InboxController } from './inbox.controller';
import { USE_CASES } from './usecases';

Expand All @@ -19,6 +20,7 @@ import { USE_CASES } from './usecases';
PreferencesModule,
OrganizationModule,
OutboundWebhooksModule.forRoot(),
TopicsV2Module,
],
providers: [...USE_CASES, CommunityOrganizationRepository, ContextRepository],
exports: [...USE_CASES],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AddSubscribersUseCase {

if (subscribersAvailableToAdd.length > 0) {
const topicSubscribers = this.mapSubscribersToTopic(topic, subscribersAvailableToAdd);
await this.topicSubscribersRepository.addSubscribers(topicSubscribers);
await this.topicSubscribersRepository.createSubscriptions(topicSubscribers);
}

return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsOptional, IsString } from 'class-validator';
import { IsObject, IsOptional, IsString } from 'class-validator';

export class TopicDto {
@ApiProperty({
Expand Down Expand Up @@ -110,6 +110,16 @@ export class SubscriptionDto {
})
subscriber: SubscriberDto | null;

@ApiPropertyOptional({
description:
'JSONLogic filter conditions for conditional subscription. Only notifications matching these conditions will be delivered.',
type: 'object',
additionalProperties: true,
})
@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;

@ApiProperty({
description: 'The creation date of the subscription',
example: '2025-04-24T05:40:21Z',
Expand Down
34 changes: 32 additions & 2 deletions apps/api/src/app/topics-v2/dtos/create-topic-subscriptions.dto.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiProperty } from '@nestjs/swagger';
import { ArrayMaxSize, ArrayMinSize, IsArray, IsDefined } from 'class-validator';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { ArrayMaxSize, ArrayMinSize, IsArray, IsDefined, IsObject, IsOptional } from 'class-validator';

export class CreateTopicSubscriptionsRequestDto {
@ApiProperty({
Expand All @@ -12,4 +12,34 @@ export class CreateTopicSubscriptionsRequestDto {
@ArrayMaxSize(100, { message: 'Cannot subscribe more than 100 subscribers at once' })
@ArrayMinSize(1, { message: 'At least one subscriber identifier is required' })
subscriberIds: string[];

@ApiPropertyOptional({
description:
'JSONLogic filter conditions for conditional subscription. Supports complex logical operations with AND, OR, and comparison operators. See https://jsonlogic.com/ for full typing reference.',
type: 'object',
example: {
and: [
{
'==': [
{
var: 'payload.status',
},
'Completed',
],
},
{
'>': [
{
var: 'payload.price',
},
100,
],
},
],
},
additionalProperties: true,
})
@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;
}
1 change: 1 addition & 0 deletions apps/api/src/app/topics-v2/topics.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export class TopicsController {
userId: user._id,
topicKey,
subscriberIds: body.subscriberIds,
conditions: body.conditions as any,
})
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IsArray, IsDefined, IsString } from 'class-validator';
import { IsArray, IsDefined, IsObject, IsOptional, IsString } from 'class-validator';
import { EnvironmentWithUserCommand } from '../../../shared/commands/project.command';

export class CreateTopicSubscriptionsCommand extends EnvironmentWithUserCommand {
Expand All @@ -9,4 +9,8 @@ export class CreateTopicSubscriptionsCommand extends EnvironmentWithUserCommand
@IsArray()
@IsDefined()
subscriberIds: string[];

@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase } from '@novu/application-generic';
import { generateConditionHash, InstrumentUsecase } from '@novu/application-generic';
import {
BulkAddTopicSubscribersResult,
CreateTopicSubscribersEntity,
SubscriberEntity,
SubscriberRepository,
Expand Down Expand Up @@ -28,7 +29,6 @@ export class CreateTopicSubscriptionsUsecase {

@InstrumentUsecase()
async execute(command: CreateTopicSubscriptionsCommand): Promise<CreateTopicSubscriptionsResponseDto> {
// Use upsert topic usecase to create the topic if it doesn't exist
await this.upsertTopicUseCase.execute({
environmentId: command.environmentId,
organizationId: command.organizationId,
Expand Down Expand Up @@ -79,25 +79,55 @@ export class CreateTopicSubscriptionsUsecase {
};
}

// Check for existing subscriptions to make the operation idempotent
const existingSubscriptions = await this.topicSubscribersRepository.find({
const conditionHash = generateConditionHash(command.conditions);

const existingSubscriptionsQuery: {
_environmentId: string;
_organizationId: string;
_topicId: string;
_subscriberId: { $in: string[] };
conditionHash?: string;
$or?: Array<{ conditionHash: { $exists: boolean } } | { conditionHash: null }>;
} = {
_environmentId: command.environmentId,
_organizationId: command.organizationId,
_topicId: topic._id,
_subscriberId: { $in: foundSubscribers.map((sub) => sub._id) },
});
};

if (conditionHash !== undefined) {
existingSubscriptionsQuery.conditionHash = conditionHash;
} else {
existingSubscriptionsQuery.$or = [{ conditionHash: { $exists: false } }, { conditionHash: null }];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate if we ca use rooted $or

}

const existingSubscriptions = await this.topicSubscribersRepository.find(existingSubscriptionsQuery as any);

// Create topic subscriptions for subscribers that don't already have a subscription
const existingSubscriberIds = existingSubscriptions.map((sub) => sub._subscriberId.toString());
const subscribersToCreate = foundSubscribers.filter((sub) => !existingSubscriberIds.includes(sub._id.toString()));

let newSubscriptions: TopicSubscribersEntity[] = [];
if (subscribersToCreate.length > 0) {
const topicSubscribersToCreate = this.mapSubscribersToTopic(topic, subscribersToCreate);
newSubscriptions = await this.topicSubscribersRepository.addSubscribers(topicSubscribersToCreate);
const topicSubscribersToCreate = this.mapSubscribersToTopic(
topic,
subscribersToCreate,
command.conditions,
conditionHash
);
const bulkResult: BulkAddTopicSubscribersResult =
await this.topicSubscribersRepository.createSubscriptions(topicSubscribersToCreate);

newSubscriptions = [...bulkResult.created, ...bulkResult.updated];

for (const failure of bulkResult.failed) {
errors.push({
subscriberId: failure.subscriberId,
code: 'SUBSCRIPTION_CREATION_FAILED',
message: failure.message,
});
}
}

// Combine existing and new subscriptions for the response
const allSubscriptions = [...existingSubscriptions, ...newSubscriptions];
// Map subscriptions to response format
for (const subscription of allSubscriptions) {
Expand All @@ -122,6 +152,7 @@ export class CreateTopicSubscriptionsUsecase {
updatedAt: subscriber.updatedAt,
}
: null,
conditions: subscription.conditions,
createdAt: subscription.createdAt ?? '',
updatedAt: subscription.updatedAt ?? '',
});
Expand All @@ -138,14 +169,21 @@ export class CreateTopicSubscriptionsUsecase {
};
}

private mapSubscribersToTopic(topic: TopicEntity, subscribers: SubscriberEntity[]): CreateTopicSubscribersEntity[] {
private mapSubscribersToTopic(
topic: TopicEntity,
subscribers: SubscriberEntity[],
conditions?: Record<string, unknown>,
conditionHash?: string
): CreateTopicSubscribersEntity[] {
return subscribers.map((subscriber) => ({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
_subscriberId: subscriber._id,
_topicId: topic._id,
topicKey: topic.key,
externalSubscriberId: subscriber.subscriberId,
conditions,
conditionHash,
}));
}
}
1 change: 1 addition & 0 deletions libs/application-generic/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"handlebars": "^4.7.7",
"i18next": "^23.7.6",
"ioredis": "^5.2.4",
"json-logic-js": "^2.0.5",
"jsonwebtoken": "9.0.0",
"lodash": "^4.17.15",
"mixpanel": "^0.17.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
TriggerRecipientSubscriber,
TriggerRecipientsTypeEnum,
} from '@novu/shared';

import jsonLogic, { type AdditionalOperation, type RulesLogic } from 'json-logic-js';
import { PinoLogger } from 'nestjs-pino';
import { InstrumentUsecase } from '../../instrumentation';
import { CacheService, FeatureFlagsService } from '../../services';
Expand All @@ -21,8 +21,6 @@ import { TriggerMulticastCommand } from './trigger-multicast.command';
const QUEUE_CHUNK_SIZE = Number(process.env.MULTICAST_QUEUE_CHUNK_SIZE) || 100;
const SUBSCRIBER_TOPIC_DISTINCT_BATCH_SIZE = Number(process.env.SUBSCRIBER_TOPIC_DISTINCT_BATCH_SIZE) || 100;

const isNotTopic = (recipient: TriggerRecipient): recipient is TriggerRecipientSubscriber => !isTopic(recipient);

const isTopic = (recipient: TriggerRecipient): recipient is ITopic =>
(recipient as ITopic).type && (recipient as ITopic).type === TriggerRecipientsTypeEnum.TOPIC;

Expand Down Expand Up @@ -66,8 +64,10 @@ export class TriggerMulticast extends TriggerBase {
const allTopicExcludedSubscribers = Array.from(
new Set([...Array.from(topicExclusions.values()).flatMap((set) => Array.from(set))])
);
let subscribersList: { subscriberId: string; topics: Pick<TopicEntity, '_id' | 'key'>[] }[] = [];
const getTopicDistinctSubscribersGenerator = this.topicSubscribersRepository.getTopicDistinctSubscribers({
let totalSubscriptionsEvaluated = 0;
let totalSubscriptionsFiltered = 0;

const getTopicSubscriptionsGenerator = this.topicSubscribersRepository.getTopicSubscriptionsWithConditions({
query: {
_organizationId: organizationId,
_environmentId: environmentId,
Expand All @@ -77,26 +77,50 @@ export class TriggerMulticast extends TriggerBase {
batchSize: SUBSCRIBER_TOPIC_DISTINCT_BATCH_SIZE,
});

for await (const externalSubscriberIdGroup of getTopicDistinctSubscribersGenerator) {
const externalSubscriberId = externalSubscriberIdGroup._id;
const subscribersMap = new Map<string, { subscriberId: string; topics: Pick<TopicEntity, '_id' | 'key'>[] }>();

if (actor && actor.subscriberId === externalSubscriberId) {
continue;
}
for await (const subscriptionsBatch of getTopicSubscriptionsGenerator) {
totalSubscriptionsEvaluated += subscriptionsBatch.length;

subscribersList.push({
subscriberId: externalSubscriberId,
topics: topics?.map((topic) => ({ _id: topic._id, key: topic.key })),
const passingSubscriptions = subscriptionsBatch.filter((subscription) => {
return this.evaluateConditions(
subscription.conditions as Record<string, unknown>,
{ payload: command.payload } as Record<string, unknown>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we will need to add workflow entity variable as well

);
});

if (subscribersList.length === SUBSCRIBER_TOPIC_DISTINCT_BATCH_SIZE) {
await this.sendToProcessSubscriberService(command, subscribersList, SubscriberSourceEnum.TOPIC);
totalProcessed += subscribersList.length;
totalSubscriptionsFiltered += subscriptionsBatch.length - passingSubscriptions.length;

for (const subscription of passingSubscriptions) {
const externalSubscriberId = subscription.externalSubscriberId;

if (actor && actor.subscriberId === externalSubscriberId) {
continue;
}

if (!subscribersMap.has(externalSubscriberId)) {
subscribersMap.set(externalSubscriberId, {
subscriberId: externalSubscriberId,
topics: topics?.map((topic) => ({ _id: topic._id, key: topic.key })),
});
}
}

if (subscribersMap.size >= SUBSCRIBER_TOPIC_DISTINCT_BATCH_SIZE) {
const batchToProcess = Array.from(subscribersMap.values());
await this.sendToProcessSubscriberService(command, batchToProcess, SubscriberSourceEnum.TOPIC);
totalProcessed += batchToProcess.length;

subscribersList = [];
subscribersMap.clear();
}
}

if (subscribersMap.size > 0) {
const finalBatch = Array.from(subscribersMap.values());
await this.sendToProcessSubscriberService(command, finalBatch, SubscriberSourceEnum.TOPIC);
totalProcessed += finalBatch.length;
}

await this.createMulticastTrace(
command,
'request_subscriber_processing_completed',
Expand All @@ -109,13 +133,10 @@ export class TriggerMulticast extends TriggerBase {
singleSubscribers: subscribersToProcess.length,
topicSubscribers: totalProcessed - subscribersToProcess.length,
topicsUsed: topics.length,
subscriptionsEvaluated: totalSubscriptionsEvaluated,
subscriptionsFiltered: totalSubscriptionsFiltered,
}
);

if (subscribersList.length > 0) {
await this.sendToProcessSubscriberService(command, subscribersList, SubscriberSourceEnum.TOPIC);
totalProcessed += subscribersList.length;
}
} catch (e) {
const error = e as Error;
await this.createMulticastTrace(
Expand Down Expand Up @@ -146,6 +167,23 @@ export class TriggerMulticast extends TriggerBase {
}
}

private evaluateConditions(
conditions: Record<string, unknown> | undefined,
payload: Record<string, unknown>
): boolean {
if (!conditions) {
return true;
}

try {
const result = jsonLogic.apply(conditions as RulesLogic<AdditionalOperation>, payload);

return typeof result === 'boolean' ? result : false;
} catch {
return false;
}
}

private async createMulticastTrace(
command: TriggerMulticastCommand,
eventType: EventType,
Expand Down
11 changes: 11 additions & 0 deletions libs/application-generic/src/utils/condition-hash.util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { createHash } from 'crypto';

export function generateConditionHash(condition?: Record<string, unknown>): string | undefined {
if (!condition) {
return undefined;
}

const normalizedJson = JSON.stringify(condition, Object.keys(condition).sort());

return createHash('sha256').update(normalizedJson).digest('hex');
}
1 change: 1 addition & 0 deletions libs/application-generic/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './base62';
export * from './bridge';
export * from './buildBridgeEndpointUrl';
export * from './condition-hash.util';
export * from './deepmerge';
export * from './digest';
export * from './email-normalization';
Expand Down
2 changes: 2 additions & 0 deletions libs/dal/src/repositories/topic/topic-subscribers.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export class TopicSubscribersEntity {
topicKey: TopicKey;
// TODO: Rename to subscriberId, to align with workflowId and stepId that are also externally provided identifiers by Novu users
externalSubscriberId: ExternalSubscriberId;
conditions?: Record<string, unknown>;
conditionHash?: string;

createdAt?: string;
updatedAt?: string;
Expand Down
Loading
Loading