Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/puma/plugin/rabbit_carrots.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$
@puma_pid = $PROCESS_ID

@core_service = RabbitCarrots::Core.new(logger: log_writer)

Expand Down Expand Up @@ -43,6 +43,7 @@ def stop_rabbit_carrots
Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end

def monitor_puma
Expand All @@ -57,7 +58,7 @@ def monitor(process_dead, message)
loop do
if send(process_dead)
log message
Process.kill('TERM', $$)
Process.kill('TERM', $PROCESS_ID)
break
end
sleep 2
Expand Down
4 changes: 2 additions & 2 deletions lib/rabbit_carrots/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class Configuration
:rabbitmq_exchange_name,
:automatically_recover,
:network_recovery_interval,
:recovery_attempts,
:orm
:recovery_attempts

def orm
@orm ||= :activerecord
end
Expand Down
18 changes: 9 additions & 9 deletions lib/rabbit_carrots/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def start(kill_to_restart_on_standard_error: false)
def request_shutdown
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.log 'Shutting down Rabbit Carrots service...'
logger.error 'Shutting down Rabbit Carrots service...'
end
@shutdown_requested = true
@threads.each(&:kill)
Expand All @@ -71,7 +71,7 @@ def request_shutdown
def stop
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.log 'Stoppig the Rabbit Carrots service...'
logger.error 'Stoppig the Rabbit Carrots service...'
end
@running = false
end
Expand All @@ -80,32 +80,32 @@ def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, ki
RabbitCarrots::Connection.instance.channel.with do |channel|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)

logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)

routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }

queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested

logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
logger.log "Nacked message: #{payload}"
logger.warn "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
logger.log "Nacked and Requeued message: #{payload}"
logger.warn "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e
logger.log "Error handling message: #{payload}. Error: #{e.message}"
logger.error "Error handling message: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
Expand Down