Skip to content

Commit 8659380

Browse files
vyntheralvadim.kar
andauthored
Added publishing_job_callable, default_publishing_job_queue and custom_queue_name (#37)
* Bump version to 1.6.4 and add publishing job configuration options t а f а f f * Updated doc * fix * fix custom_queue_name issue * use kwargs in publish and message * f --------- Co-authored-by: vadim.kar <vadim.kar@cadolabs.io>
1 parent 216e2da commit 8659380

File tree

9 files changed

+138
-31
lines changed

9 files changed

+138
-31
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# Changelog
22
All notable changes to this project will be documented in this file.
33

4+
## [1.7.0] - 2025-08-19
5+
### Added
6+
- Ability to specify a custom job class for publishing via `publishing_job_class_callable` config.
7+
- Ability to specify a default queue for publishing jobs via `default_publishing_job_queue` config.
8+
- Ability to specify a custom queue per publish call via `custom_queue_name` argument.
9+
410
## [1.5.0] - 2025-05-19
511
### Added
612
- Added ability to split log message into parts

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ GIT
88
PATH
99
remote: .
1010
specs:
11-
rabbit_messaging (1.6.3)
11+
rabbit_messaging (1.7.0)
1212
bunny (~> 2.0)
1313
kicks
1414

README.md

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ require "rabbit_messaging"
9797
}
9898
```
9999

100+
- `publishing_job_class_callable` (`Proc`)
101+
102+
Custom job class (e.g. ActiveJob or Sidekiq::Job) to work with published messages.
103+
104+
- `default_publishing_job_queue` (`String` or `Symbol`)
105+
106+
The name of the queue that will be used by default for publishing jobs. `default` by default.
107+
100108
- `before_receiving_hooks, after_receiving_hooks` (`Array of Procs`)
101109

102110
Before and after hooks with message processing in the middle. Where `before_receiving_hooks` and `after_receiving_hooks` are empty arrays by default.
@@ -139,7 +147,7 @@ require "rabbit_messaging"
139147
- `connection_reset_max_retries` (`Integer`)
140148
141149
Maximum number of reconnection attempts after a connection loss. Default: 10.
142-
150+
143151
```ruby
144152
config.connection_reset_max_retries = 20
145153
```
@@ -165,13 +173,16 @@ require "rabbit_messaging"
165173
166174
```ruby
167175
Rabbit.publish(
168-
routing_key: :support,
169-
event: :ping,
170-
data: { foo: :bar }, # default is {}
171-
exchange_name: 'fanout', # default is fine too
172-
confirm_select: true, # setting this to false grants you great speed up and absolutelly no guarantees
173-
headers: { "foo" => "bar" }, # custom arguments for routing, default is {}
174-
message_id: "asdadsadsad", # A unique identifier such as a UUID that your application can use to identify the message.
176+
{
177+
routing_key: :support,
178+
event: :ping,
179+
data: { foo: :bar }, # default is {}
180+
exchange_name: 'fanout', # default is fine too
181+
confirm_select: true, # setting this to false grants you great speed up and absolutelly no guarantees
182+
headers: { "foo" => "bar" }, # custom arguments for routing, default is {}
183+
message_id: "asdadsadsad", # A unique identifier such as a UUID that your application can use to identify the message.
184+
},
185+
custom_queue_name: :my_custom_queue, # The name of the queue for publishing jobs. Overrides the default queue.
175186
)
176187
```
177188

lib/rabbit.rb

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ class Config
1818
:hooks,
1919
:queue_name_conversion,
2020
:receiving_job_class_callable,
21+
:publishing_job_class_callable,
22+
:default_publishing_job_queue,
2123
:handler_resolver_callable,
2224
:exception_notifier,
2325
:before_receiving_hooks,
@@ -41,6 +43,8 @@ def initialize( # rubocop:disable Metrics/MethodLength
4143
environment: :production,
4244
queue_name_conversion: nil,
4345
receiving_job_class_callable: nil,
46+
publishing_job_class_callable: nil,
47+
default_publishing_job_queue: :default,
4448
handler_resolver_callable: nil,
4549
exception_notifier: nil,
4650
before_receiving_hooks: [],
@@ -63,6 +67,8 @@ def initialize( # rubocop:disable Metrics/MethodLength
6367
self.environment = environment.to_sym
6468
self.queue_name_conversion = queue_name_conversion
6569
self.receiving_job_class_callable = receiving_job_class_callable
70+
self.publishing_job_class_callable = publishing_job_class_callable
71+
self.default_publishing_job_queue = default_publishing_job_queue
6672
self.handler_resolver_callable = handler_resolver_callable
6773
self.exception_notifier = exception_notifier
6874
self.before_receiving_hooks = before_receiving_hooks
@@ -163,13 +169,35 @@ def configure
163169
config.validate!
164170
end
165171

166-
def publish(message_options)
167-
message = Publishing::Message.new(message_options)
172+
def publish(
173+
routing_key: nil,
174+
event: nil,
175+
data: {},
176+
exchange_name: [],
177+
confirm_select: true,
178+
realtime: false,
179+
headers: {},
180+
message_id: nil,
181+
custom_queue_name: nil
182+
)
183+
message = Publishing::Message.new(
184+
routing_key: routing_key,
185+
event: event,
186+
data: data,
187+
exchange_name: exchange_name,
188+
confirm_select: confirm_select,
189+
realtime: realtime,
190+
headers: headers,
191+
message_id: message_id,
192+
)
193+
job_class = config.publishing_job_class_callable
194+
publish_job_callable = job_class.is_a?(Proc) ? job_class.call : (job_class || Publishing::Job)
195+
queue_name = custom_queue_name || default_queue_name
168196

169197
if message.realtime?
170198
Publishing.publish(message)
171199
else
172-
Publishing::Job.set(queue: default_queue_name).perform_later(message.to_hash)
200+
publish_job_callable.set(queue: queue_name).perform_later(message.to_hash)
173201
end
174202
end
175203

@@ -179,6 +207,6 @@ def queue_name(queue, ignore_conversion: false)
179207
end
180208

181209
def default_queue_name(ignore_conversion: false)
182-
queue_name(:default, ignore_conversion: ignore_conversion)
210+
queue_name(config.default_publishing_job_queue, ignore_conversion: ignore_conversion)
183211
end
184212
end

lib/rabbit/publishing/job.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module Rabbit::Publishing
66
class Job < ActiveJob::Base
77
def perform(message)
8-
Rabbit::Publishing.publish(Message.new(message))
8+
Rabbit::Publishing.publish(Message.new(**message))
99
end
1010
end
1111
end

lib/rabbit/publishing/message.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,24 @@ class Message
99
alias_method :confirm_select?, :confirm_select
1010
alias_method :realtime?, :realtime
1111

12-
def initialize(attributes = {})
13-
self.routing_key = attributes[:routing_key]
14-
self.event = attributes[:event]&.to_s
15-
self.data = attributes.fetch(:data, {})
16-
self.exchange_name = Array(attributes.fetch(:exchange_name, []))
17-
self.confirm_select = attributes.fetch(:confirm_select, true)
18-
self.realtime = attributes.fetch(:realtime, false)
19-
self.headers = attributes.fetch(:headers, {})
20-
self.message_id = attributes[:message_id]
12+
def initialize(
13+
routing_key: nil,
14+
event: nil,
15+
data: {},
16+
exchange_name: [],
17+
confirm_select: true,
18+
realtime: false,
19+
headers: {},
20+
message_id: nil
21+
)
22+
self.routing_key = routing_key
23+
self.event = event&.to_s
24+
self.data = data
25+
self.exchange_name = Array(exchange_name)
26+
self.confirm_select = confirm_select
27+
self.realtime = realtime
28+
self.headers = headers
29+
self.message_id = message_id
2130
end
2231

2332
def to_hash

lib/rabbit/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module Rabbit
4-
VERSION = "1.6.3"
4+
VERSION = "1.7.0"
55
end

spec/units/rabbit/publishing/message_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
describe Rabbit::Publishing::Message do
44
describe "basic_publish_args" do
5-
subject(:message) { described_class.new(attributes) }
5+
subject(:message) { described_class.new(**attributes) }
66

77
context "rounting key not specified" do
88
let(:attributes) { Hash[event: :ping] }

spec/units/rabbit_spec.rb

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
message_id: "uuid",
1313
}
1414
end
15+
let(:additional_params) { {} }
1516

1617
before do
1718
Rabbit.config.queue_name_conversion = -> (queue) { "#{queue}_prepared" }
@@ -52,8 +53,8 @@
5253

5354
it "publishes" do
5455
if expect_to_use_job
55-
set_params = { queue: "default_prepared" }
56-
expect(Rabbit::Publishing::Job).to receive(:set).with(set_params).and_call_original
56+
set_params = { queue: expected_queue }
57+
expect(job_class).to receive(:set).with(set_params).and_call_original
5758
perform_params = {
5859
routing_key: "some_queue",
5960
event: "some_event",
@@ -68,7 +69,7 @@
6869
.to receive(:perform_later).with(perform_params).and_call_original
6970

7071
else
71-
expect(Rabbit::Publishing::Job).not_to receive(:perform_later)
72+
expect(job_class).not_to receive(:perform_later)
7273
end
7374

7475
expect(publish_logger).to receive(:debug).with(<<~MSG.strip)
@@ -79,7 +80,7 @@
7980
test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \
8081
confirm: ...world"}
8182
MSG
82-
described_class.publish(message_options)
83+
described_class.publish(**message_options, **additional_params)
8384
end
8485

8586
after do
@@ -96,6 +97,7 @@
9697
let(:publish_logger) { double("publish_logger") }
9798
let(:bunny) { double("bunny") }
9899
let(:channel) { double("channel") }
100+
let(:job_class) { Rabbit::Publishing::Job }
99101

100102
before do
101103
allow(Bunny).to receive_message_chain(:new, :start).and_return(bunny)
@@ -132,27 +134,78 @@
132134
confirm: {"hello":"world"}
133135
MSG
134136

135-
expect { described_class.publish(message_options) }.not_to raise_error
137+
expect { described_class.publish(**message_options) }.not_to raise_error
136138
end
137139

138140
it "raises the last exception after max retries" do
139141
allow(channel).to receive(:basic_publish).and_raise(Bunny::ConnectionClosedError.new(""))
140142

141-
expect { described_class.publish(message_options) }
143+
expect { described_class.publish(**message_options) }
142144
.to raise_error(Bunny::ConnectionClosedError)
143145
end
144146
end
145147

146148
context "realtime" do
147149
let(:realtime) { true }
148150
let(:expect_to_use_job) { false }
151+
let(:expected_queue) { "default_prepared" }
152+
let(:job_class) { Rabbit::Publishing::Job }
149153

150154
include_examples "publishes"
151155
end
152156

153157
context "not realtime" do
154158
let(:realtime) { false }
155159
let(:expect_to_use_job) { true }
160+
let(:expected_queue) { "default_prepared" }
161+
let(:job_class) { Rabbit::Publishing::Job }
162+
163+
include_examples "publishes"
164+
end
165+
166+
context "with custom job class" do
167+
let(:realtime) { false }
168+
let(:expect_to_use_job) { true }
169+
let(:expected_queue) { "default_prepared" }
170+
let(:job_class) { Class.new(Rabbit::Publishing::Job) }
171+
172+
before do
173+
stub_const("CustomJobClass", job_class)
174+
allow(Rabbit.config).to receive(:publishing_job_class_callable).and_return(job_class)
175+
end
176+
177+
include_examples "publishes"
178+
end
179+
180+
context "with custom default_publishing_job_queue" do
181+
let(:realtime) { false }
182+
let(:expect_to_use_job) { true }
183+
let(:job_class) { Rabbit::Publishing::Job }
184+
let(:default_publishing_job_queue) { :custom_queue }
185+
let(:expected_queue) { "passed_to_method_queue" }
186+
let(:additional_params) { { custom_queue_name: "passed_to_method_queue" } }
187+
188+
before do
189+
allow(Rabbit.config).to(
190+
receive(:default_publishing_job_queue).and_return(default_publishing_job_queue),
191+
)
192+
end
193+
194+
include_examples "publishes"
195+
end
196+
197+
context "with custom queue name" do
198+
let(:realtime) { false }
199+
let(:expect_to_use_job) { true }
200+
let(:job_class) { Rabbit::Publishing::Job }
201+
let(:default_publishing_job_queue) { :custom_queue }
202+
let(:expected_queue) { "custom_queue_prepared" }
203+
204+
before do
205+
allow(Rabbit.config).to(
206+
receive(:default_publishing_job_queue).and_return(default_publishing_job_queue),
207+
)
208+
end
156209

157210
include_examples "publishes"
158211
end

0 commit comments

Comments
 (0)