Skip to content
Open
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AddSubscribersUseCase {

if (subscribersAvailableToAdd.length > 0) {
const topicSubscribers = this.mapSubscribersToTopic(topic, subscribersAvailableToAdd);
await this.topicSubscribersRepository.addSubscribers(topicSubscribers);
await this.topicSubscribersRepository.createSubscriptions(topicSubscribers);
}

return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsOptional, IsString } from 'class-validator';
import { Type } from 'class-transformer';
import { IsArray, IsBoolean, IsObject, IsOptional, IsString, ValidateNested } from 'class-validator';

export class TopicDto {
@ApiProperty({
Expand Down Expand Up @@ -89,6 +90,22 @@ export class SubscriberDto {
updatedAt?: string;
}

export class SubscriptionWorkflowDto {
@ApiProperty({
description: 'The workflow identifier',
example: 'workflow-1',
})
@IsString()
id: string;

@ApiProperty({
description: 'Whether the workflow is enabled for this subscription',
example: true,
})
@IsBoolean()
enabled: boolean;
}

export class SubscriptionDto {
@ApiProperty({
description: 'The unique identifier of the subscription',
Expand All @@ -110,6 +127,26 @@ export class SubscriptionDto {
})
subscriber: SubscriberDto | null;

@ApiPropertyOptional({
description:
'JSONLogic filter conditions for conditional subscription. Only notifications matching these conditions will be delivered.',
type: 'object',
additionalProperties: true,
})
@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;

@ApiPropertyOptional({
description: 'The workflows associated with the subscription',
type: [SubscriptionWorkflowDto],
})
@IsArray()
@ValidateNested({ each: true })
@Type(() => SubscriptionWorkflowDto)
@IsOptional()
workflows?: SubscriptionWorkflowDto[];

@ApiProperty({
description: 'The creation date of the subscription',
example: '2025-04-24T05:40:21Z',
Expand Down
58 changes: 56 additions & 2 deletions apps/api/src/app/topics-v2/dtos/create-topic-subscriptions.dto.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import { ApiProperty } from '@nestjs/swagger';
import { ArrayMaxSize, ArrayMinSize, IsArray, IsDefined } from 'class-validator';
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { Type } from 'class-transformer';
import { ArrayMaxSize, ArrayMinSize, IsArray, IsDefined, IsObject, IsOptional, ValidateNested } from 'class-validator';

export class SubscriptionWorkflowsDto {
@ApiProperty({
description: 'List of workflow identifiers',
example: ['workflow-id-1', 'workflow-id-2'],
type: [String],
})
@IsArray()
@IsDefined()
ids: string[];
}

export class CreateTopicSubscriptionsRequestDto {
@ApiProperty({
Expand All @@ -12,4 +24,46 @@ export class CreateTopicSubscriptionsRequestDto {
@ArrayMaxSize(100, { message: 'Cannot subscribe more than 100 subscribers at once' })
@ArrayMinSize(1, { message: 'At least one subscriber identifier is required' })
subscriberIds: string[];

@ApiPropertyOptional({
description:
'JSONLogic filter conditions for conditional subscription. Supports complex logical operations with AND, OR, and comparison operators. See https://jsonlogic.com/ for full typing reference.',
type: 'object',
example: {
and: [
{
'==': [
{
var: 'payload.status',
},
'Completed',
],
},
{
'>': [
{
var: 'payload.price',
},
100,
],
},
],
},
additionalProperties: true,
})
@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;

@ApiPropertyOptional({
description: 'List of workflow IDs to associate with the subscription',
type: () => SubscriptionWorkflowsDto,
example: {
ids: ['workflow-id-1', 'workflow-id-2'],
},
})
@IsOptional()
@ValidateNested()
@Type(() => SubscriptionWorkflowsDto)
workflows?: SubscriptionWorkflowsDto;
}
64 changes: 64 additions & 0 deletions apps/api/src/app/topics-v2/e2e/create-topic-subscriptions.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,68 @@ describe('Create topic subscriptions - /v2/topics/:topicKey/subscriptions (POST)
expect(subscribers.length).to.equal(1);
expect(subscribers[0]?._subscriberId).to.equal(subscriber1._id);
});

it('should create multiple subscriptions for the same subscriber with different conditions', async () => {
const topicKey = `topic-key-conditions-${Date.now()}`;
const conditionsA = { status: 'active', priority: 'high' };
const responseA = await novuClient.topics.subscriptions.create(
{
subscriberIds: [subscriber1.subscriberId],
conditions: conditionsA,
workflows: { ids: ['workflow-1', 'workflow-2'] },
} as any,
topicKey
);

expect(responseA.result.data[0].workflows?.length).to.equal(2);
expect(responseA.result.data[0].workflows?.[0]?.id).to.be.oneOf(['workflow-1', 'workflow-2']);
expect(responseA.result.data[0].workflows?.[0]?.enabled).to.equal(true);
expect(responseA.result.data[0].workflows?.[1]?.id).to.be.oneOf(['workflow-1', 'workflow-2']);
expect(responseA.result.data[0].workflows?.[1]?.enabled).to.equal(true);

const conditionsB = { status: 'pending', priority: 'low' };
const responseB = await novuClient.topics.subscriptions.create(
{
subscriberIds: [subscriber1.subscriberId],
conditions: conditionsB,
workflows: { ids: ['workflow-3', 'workflow-4'] },
} as any,
topicKey
);

expect(responseB.result.data[0].workflows?.length).to.equal(2);
expect(responseB.result.data[0].workflows?.[0]?.id).to.be.oneOf(['workflow-3', 'workflow-4']);
expect(responseB.result.data[0].workflows?.[0]?.enabled).to.equal(true);
expect(responseB.result.data[0].workflows?.[1]?.id).to.be.oneOf(['workflow-3', 'workflow-4']);
expect(responseB.result.data[0].workflows?.[1]?.enabled).to.equal(true);

const subscriptions = await topicSubscribersRepository.find({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
topicKey,
externalSubscriberId: subscriber1.subscriberId,
});

expect(subscriptions.length).to.equal(2);

const hashes = subscriptions.map((s) => s.conditionHash);
expect(new Set(hashes).size).to.equal(2);

await novuClient.topics.subscriptions.create(
{
subscriberIds: [subscriber1.subscriberId],
conditions: conditionsA,
workflows: { ids: ['workflow-1', 'workflow-2'] },
} as any,
topicKey
);

const subscriptionsAfterDuplicate = await topicSubscribersRepository.find({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
topicKey,
externalSubscriberId: subscriber1.subscriberId,
});
expect(subscriptionsAfterDuplicate.length).to.equal(2);
});
});
2 changes: 2 additions & 0 deletions apps/api/src/app/topics-v2/topics.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ export class TopicsController {
userId: user._id,
topicKey,
subscriberIds: body.subscriberIds,
conditions: body.conditions as any,
workflows: body.workflows,
})
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IsArray, IsDefined, IsString } from 'class-validator';
import { IsArray, IsDefined, IsObject, IsOptional, IsString } from 'class-validator';
import { EnvironmentWithUserCommand } from '../../../shared/commands/project.command';

export class CreateTopicSubscriptionsCommand extends EnvironmentWithUserCommand {
Expand All @@ -9,4 +9,12 @@ export class CreateTopicSubscriptionsCommand extends EnvironmentWithUserCommand
@IsArray()
@IsDefined()
subscriberIds: string[];

@IsObject()
@IsOptional()
conditions?: Record<string, unknown>;

@IsOptional()
@IsObject()
workflows?: { ids: string[] };
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase } from '@novu/application-generic';
import { generateConditionHash, InstrumentUsecase } from '@novu/application-generic';
import {
BulkAddTopicSubscribersResult,
CreateTopicSubscribersEntity,
SubscriberEntity,
SubscriberRepository,
Expand All @@ -9,6 +10,7 @@ import {
TopicSubscribersEntity,
TopicSubscribersRepository,
} from '@novu/dal';
import { SubscriptionWorkflowsDto } from '../../dtos/create-topic-subscriptions.dto';
import {
CreateTopicSubscriptionsResponseDto,
SubscriptionDto,
Expand All @@ -28,7 +30,6 @@ export class CreateTopicSubscriptionsUsecase {

@InstrumentUsecase()
async execute(command: CreateTopicSubscriptionsCommand): Promise<CreateTopicSubscriptionsResponseDto> {
// Use upsert topic usecase to create the topic if it doesn't exist
await this.upsertTopicUseCase.execute({
environmentId: command.environmentId,
organizationId: command.organizationId,
Expand Down Expand Up @@ -79,25 +80,62 @@ export class CreateTopicSubscriptionsUsecase {
};
}

// Check for existing subscriptions to make the operation idempotent
const existingSubscriptions = await this.topicSubscribersRepository.find({
const subscriptionsWorkflows = command.workflows?.ids
? command.workflows.ids.map((id) => ({ _id: id, enabled: true }))
: undefined;
const conditionHash = generateConditionHash({
conditions: command.conditions || null,
workflows: subscriptionsWorkflows || null,
});

const existingSubscriptionsQuery: {
_environmentId: string;
_organizationId: string;
_topicId: string;
_subscriberId: { $in: string[] };
conditionHash?: string | { $exists: boolean };
$or?: Array<{ conditionHash: { $exists: boolean } } | { conditionHash: null }>;
} = {
_environmentId: command.environmentId,
_organizationId: command.organizationId,
_topicId: topic._id,
_subscriberId: { $in: foundSubscribers.map((sub) => sub._id) },
});
};

if (conditionHash !== undefined) {
existingSubscriptionsQuery.conditionHash = conditionHash;
} else {
existingSubscriptionsQuery.conditionHash = { $exists: false };
}

const existingSubscriptions = await this.topicSubscribersRepository.find(existingSubscriptionsQuery as any);

// Create topic subscriptions for subscribers that don't already have a subscription
const existingSubscriberIds = existingSubscriptions.map((sub) => sub._subscriberId.toString());
const subscribersToCreate = foundSubscribers.filter((sub) => !existingSubscriberIds.includes(sub._id.toString()));

let newSubscriptions: TopicSubscribersEntity[] = [];
if (subscribersToCreate.length > 0) {
const topicSubscribersToCreate = this.mapSubscribersToTopic(topic, subscribersToCreate);
newSubscriptions = await this.topicSubscribersRepository.addSubscribers(topicSubscribersToCreate);
const subscriptionsToCreate = this.buildSubscriptionEntity(
topic,
subscribersToCreate,
command.conditions,
conditionHash,
subscriptionsWorkflows
);
const bulkResult: BulkAddTopicSubscribersResult =
await this.topicSubscribersRepository.createSubscriptions(subscriptionsToCreate);

newSubscriptions = [...bulkResult.created, ...bulkResult.updated];

for (const failure of bulkResult.failed) {
errors.push({
subscriberId: failure.subscriberId,
code: 'SUBSCRIPTION_CREATION_FAILED',
message: failure.message,
});
}
}

// Combine existing and new subscriptions for the response
const allSubscriptions = [...existingSubscriptions, ...newSubscriptions];
// Map subscriptions to response format
for (const subscription of allSubscriptions) {
Expand All @@ -122,6 +160,11 @@ export class CreateTopicSubscriptionsUsecase {
updatedAt: subscriber.updatedAt,
}
: null,
conditions: subscription.conditions,
workflows: subscription.workflows?.map((workflow) => ({
id: workflow._id,
enabled: workflow.enabled,
})),
createdAt: subscription.createdAt ?? '',
updatedAt: subscription.updatedAt ?? '',
});
Expand All @@ -138,14 +181,23 @@ export class CreateTopicSubscriptionsUsecase {
};
}

private mapSubscribersToTopic(topic: TopicEntity, subscribers: SubscriberEntity[]): CreateTopicSubscribersEntity[] {
private buildSubscriptionEntity(
topic: TopicEntity,
subscribers: SubscriberEntity[],
conditions?: Record<string, unknown>,
conditionHash?: string,
subscriptionsWorkflows?: { _id: string; enabled: boolean }[]
): CreateTopicSubscribersEntity[] {
return subscribers.map((subscriber) => ({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
_subscriberId: subscriber._id,
_topicId: topic._id,
topicKey: topic.key,
externalSubscriberId: subscriber.subscriberId,
conditions,
conditionHash,
workflows: subscriptionsWorkflows,
}));
}
}
1 change: 1 addition & 0 deletions libs/application-generic/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"handlebars": "^4.7.7",
"i18next": "^23.7.6",
"ioredis": "^5.2.4",
"json-logic-js": "^2.0.5",
"jsonwebtoken": "9.0.0",
"lodash": "^4.17.15",
"mixpanel": "^0.17.0",
Expand Down
Loading
Loading