Skip to content

Error when use @RabbitRPC in one controller and listten exchange and queue that same for all actions with diffirent routing keys #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
sur-ser opened this issue Jun 28, 2024 · 18 comments
Labels

Comments

@sur-ser
Copy link

sur-ser commented Jun 28, 2024

when i use RabbitRPC in one controller with multiple actions like this

@RabbitRPC({
    exchange: AuthRegister.exchange,
    routingKey: AuthRegister.topic,
    queue: AuthRegister.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async register(@Body() dto: AuthRegister.Request) : Promise<AuthRegister.Response> {
    const newUser = await this.authService.register(dto);
    return { email: newUser.email };
  }


  @RabbitRPC({
    exchange: AuthJWTLogin.exchange,
    routingKey: AuthJWTLogin.topic,
    queue: AuthJWTLogin.queue,
    errorBehavior: MessageHandlerErrorBehavior.NACK,
    errorHandler: rpcErrorHandler,
  })
  async login(@Body() dto: AuthJWTLogin.Request): Promise<AuthJWTLogin.Response> {
    const { id } = await this.authService.validateUser(dto.email, dto.password);
    return this.authService.login(id);
  }

and create request like this

return await this.amqpConnection.request<AuthRegister.Response>({
       exchange: AuthRegister.exchange,
       routingKey: AuthRegister.topic,
       payload: dto,
       timeout: 10000
     })
  one time it works fine and second time i have got error like this

[Nest] 119672 - 06/27/2024, 7:57:17 PM ERROR [AmqpConnection] Received message with invalid routing key: sso.auth.register.command

but if i keep one action it works fine or when i change que name and keep one routing key in queue it works fine

my exchange is topic
and in controller exchange and queue is same for all actions only routing key is diffirent

@sur-ser sur-ser changed the title Erro when publish in same exchange same queue with diffirent routing key using RPC Error when use @RabbitRPC in one controller and listten exchange and queue that same for all actions with diffirent routing keys Jun 28, 2024
@slyk
Copy link

slyk commented Jul 20, 2024

Same for me.
v 4.1 works fine.
v 5 gives errors. I found that the matchesRoutingKey() function is called ant tries to match the message routing key always with different handlers. So it is not taking messages from queue that match given routing keys, it just takes all messages from queue and tries to match them with a random routingKey from a file.

In my project I have three @RabbitRPC() listeners in one file and I sometimes get listener working ok and some times errors with "invalid routing key".

Here is my code and errors as example:

@RabbitRPC({
    exchange: 'amq.topic',      //using default exchange for topics
    routingKey: 'user.upsert',  //when message has this routing key (topic)
    queue: 'user',           //message will be sent to this queue and proceed by this service
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.find.byphone',//when client wants to find user by phone
    queue: 'user',                  //message will be sent to this queue and proceed by this service
    allowNonJsonMessages: true,     //we allow non-json messages (like strings, numbers, etc)
  })
@RabbitRPC({
    exchange: 'amq.topic',          //using default exchange for topics
    routingKey: 'user.get.byuuid',  //when client wants to get user by uuid
    queue: 'user',                  //message will be sent to this queue and proceed by this service
  })

and here is log when I send the same message to queue:
image

"ret: ...uuid... " - is the right processing of the listener. As you can see (I've modified matcher a little to see what its trying to match) there always different matcher for same routingKey. Is it by design? :)

P.S. Also while I was looking for problem I saw project with same problem (https://github.com/pavlokobyliatskyi/demo-chat/), he just tell people to use old version :) but maybe some fix can be introduced to new versions? Interesting that not many people have this.

@slyk
Copy link

slyk commented Jul 20, 2024

The error message is in "connections.ts", looking at the v4 code in that place (line 535) instead of error there were just nack() so it was just grabbing message, check for routing key and putting it back for each of @RabbitRPC() call while it looking for perfect match?

if (rpcOptions.routingKey !== msg.fields.routingKey) {
            channel.nack(msg, false, true);
            return;
          }

and in v5 there are more complex check because now routingKey could be an array, but nack() function called with requeue option "false" so we get error and lost message.

if (
            !matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)
          ) {
            channel.nack(msg, false, false);
            this.logger.error(
              'Received message with invalid routing key: ' +
                msg.fields.routingKey
            );
            return;
          }

https://github.com/golevelup/nestjs/blob/master/packages/rabbitmq/src/amqp/connection.ts

I'm new to rabbit. What is the right way to handle messages from queue that has specific routingKey?

Quick fix would be set requeue back to 'true' in nack(), but then we would have many errors in log... and maybe there should be some other logic?

@slyk
Copy link

slyk commented Jul 21, 2024

Seems like good practice is to make different queues for this messages, so I changed my code and got more queues, it works and for my small first app just to became familiar with rabbitMQ its ok :)

@RabbitRPC({
    exchange: 'amq.direct',      // DIRECT here instead of topic
    routingKey: 'user.upsert',  
    queue: 'user.upsert',         // name of the qeue that is same as reoutingKey
  })

@sur-ser
Copy link
Author

sur-ser commented Aug 14, 2024

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

@WonderPanda
Copy link
Collaborator

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

You're right, this is definitely sensible behavior. We're definitely open to contributions to improve the handling of routing keys

@slyk
Copy link

slyk commented Aug 14, 2024

While creating separate queues for every aspect of the project might seem like a quick fix, it defeats the purpose of having routing keys in the first place. The whole idea behind using routing keys is to prevent the chaos that would arise from having too many queues, especially in large projects with 50-70 microservices. If every interaction requires a separate queue, the system becomes overly complex and hard to manage. In many cases, it’s much more efficient and organized to use different routing keys to read from the same queue, which allows for better scalability and maintainability.

Don't see API in rabbit MQ that could allow us to get/subscribe only to some routing keys in queue.

So first consumer will receive the first message, if its not match needed routing key we should nack it with requeue.

Each message could go to each consumer while searching for match. So for many routing keys and consumers we will create many useless data flow...

Also here is quote from docs:
When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.

So if needed consumer service is down now other consumers will infinitely get the message, nack it back and so on.

So one queue for one consumer seems like bt design feature of rabbitMQ (or amqp).

The one idea that comes to me to handle our request is to handle routing of the messge on nestjs side according to routing key. Some wrapper function that knows all functions in current app that are waiting for the message with specified routing key and will send message to that function only.

@sur-ser
Copy link
Author

sur-ser commented Aug 27, 2024

thank you for sharing your thoughts, but unfortunately, you're mistaken in your understanding of how RabbitMQ works. The main idea behind using routing keys in RabbitMQ is precisely that different messages going into the same queue can be processed by different consumers depending on the matching routing keys.

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key. This allows one queue to be efficiently used by multiple consumers, each processing only the messages relevant to them, which is the core concept of routing keys in the AMQP protocol.

I opened this issue not because I want new functionality, but because there’s a bug in the current version of the nestjs-rabbitmq package that prevents routing keys from working correctly. In version 5, changes were made to the message handling logic, causing messages to be incorrectly processed and leading to errors. In version 4, everything worked as expected.

What you’re suggesting—creating separate queues for each routing key—might be a workaround, but it goes against the principle of using routing keys and leads to unnecessary system complexity. My goal is to address the issue in the current implementation, not to introduce new functionality.

@underfisk
Copy link
Contributor

@sur-ser If you have the time/willing to help us by contributing with a proper fix for this matter we would appreciate 🙏

@upundefined
Copy link

Any news?

After this commit it stopped working
#712

@slyk
Copy link

slyk commented Oct 2, 2024

Any news?

After this commit it stopped working #712

Seems like not everybody need it.
Maybe another issue with feature request tag will help to understand how many people really need this and someone could decide to implement it to public (for bounty?)

For my small project I've decided to split to bigger number of queues.

As far as I read amqp 0-9-1 specs there is only one mention of "message selectors" that could help us to not create endless loop of nack(), but no info how this should be implemented and seems like rabbitMQ does not have this in API. And we should use its API in current lib...

So if this will be the real problem in my project, I guess we should implement logic on nestjs side but rabbitMQ still wil give us ALL messages from queue in FIFO order, we just search for functions by routing_key and if nothing found we should nack()

In RabbitMQ, consumers can listen to the same queue but only receive messages that match their specific routing key.

This would be good, but I can't find that type of functionality described nor in amqp protocol specs nor in rabbitMQ. All filtering with rouiting_key end up in exchange when we bind it to queue(s). The exchange is smart part that understant logic... the queue can only FIFO to consumer.

@upundefined
Copy link

I found a library that uses a single queue, it is specified when initializing the module with the queueName parameter
https://github.com/AlariCode/nestjs-rmq
https://github.com/AlariCode/nestjs-rmq/blob/3ef5ecc62b1e6e19254a4ae76288a35da96c463c/lib/rmq.service.ts#L227

@PierreKiwi
Copy link

Hello!
I have started to use this library today and I am facing this issue too.

I have manually modified to the previous behavior (pre #713), and now it does work as expected.

@SinPP
Copy link

SinPP commented Jan 2, 2025

Having the same issues as mentioned above

Copy link

This issue is stale because it has been open for 30 days with no activity.

@github-actions github-actions bot added the stale label Feb 11, 2025
@sur-ser
Copy link
Author

sur-ser commented Feb 20, 2025

The core issue appears to be in how RabbitMQ message routing is handled in version 5+ of the nestjs-rabbitmq package, specifically when multiple RPC handlers are configured to consume from the same queue but with different routing keys.

Here's what's happening:

In v4, messages that didn't match a handler's routing key were nacked with requeue=true, allowing other handlers to attempt processing
In v5, messages are nacked with requeue=false and an error is logged when routing keys don't match, effectively dropping messages
Here's a proposed solution that maintains the intended behavior while fixing the issue:


public async setupRpcChannel<T, U>(
  handler: RpcSubscriberHandler<T, U>,
  rpcOptions: MessageHandlerOptions,
  channel: ConfirmChannel,
): Promise<ConsumerTag> {
  const queue = await this.setupQueue(rpcOptions, channel);

  const { consumerTag }: { consumerTag: ConsumerTag } = await channel.consume(
    queue,
    this.wrapConsumer(async (msg) => {
      try {
        if (msg == null) {
          throw new Error('Received null message');
        }

        // Check if routing key matches
        if (!matchesRoutingKey(msg.fields.routingKey, rpcOptions.routingKey)) {
          // Instead of nacking with requeue=false, requeue the message
          // This allows other handlers to process it
          channel.nack(msg, false, true);
          return;
        }

        const result = this.deserializeMessage<T>(msg, rpcOptions);
        const response = await handler(result.message, msg, result.headers);

        if (response instanceof Nack) {
          channel.nack(msg, false, response.requeue);
          return;
        }

        const { replyTo, correlationId, expiration, headers } = msg.properties;
        if (replyTo) {
          await this.publish('', replyTo, response, {
            correlationId,
            expiration,
            headers,
            persistent: rpcOptions.usePersistentReplyTo ?? false,
          });
        }
        channel.ack(msg);
      } catch (e) {
        if (msg == null) {
          return;
        } else {
          const errorHandler =
            rpcOptions.errorHandler ||
            this.config.defaultRpcErrorHandler ||
            getHandlerForLegacyBehavior(
              rpcOptions.errorBehavior ||
                this.config.defaultSubscribeErrorBehavior,
            );

          await errorHandler(channel, msg, e);
        }
      }
    }),
    rpcOptions?.queueOptions?.consumerOptions,
  );

  this.registerConsumerForQueue({
    type: 'rpc',
    consumerTag,
    handler,
    msgOptions: rpcOptions,
    channel,
  });

  return consumerTag;
}
  1. Changed channel.nack(msg, false, false) to channel.nack(msg, false, true) when routing keys don't match
  2. Removed the error logging for non-matching routing keys since this is expected behavior
  3. Maintained the ability for multiple handlers to consume from the same queue with different routing keys

The fix must restore the functionality that worked in v4 while maintaining the improvements made in v5 regarding routing key pattern matching.

I think this will fix it

@underfisk
Copy link
Contributor

@sur-ser Are you down to contribute with a PR? I think you're proposal does make sense, if we can have some test coverage along side that would be awesome

@echonabin
Copy link

Any updates on thi ? This issue still persists.

@echonabin
Copy link

@sur-ser Didi you get work-around for this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

8 participants