Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
rabbit_carrots (1.0.2)
rabbit_carrots (1.0.3)
bunny (>= 2.22)
connection_pool (~> 2.4)

Expand All @@ -18,12 +18,12 @@ GEM
minitest (>= 5.1)
mutex_m
tzinfo (~> 2.0)
amq-protocol (2.3.2)
amq-protocol (2.3.4)
ast (2.4.2)
base64 (0.2.0)
bigdecimal (3.1.4)
bunny (2.22.0)
amq-protocol (~> 2.3, >= 2.3.1)
bunny (2.24.0)
amq-protocol (~> 2.3)
sorted_set (~> 1, >= 1.0.2)
concurrent-ruby (1.2.2)
connection_pool (2.4.1)
Expand Down Expand Up @@ -80,7 +80,7 @@ GEM
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5)
set (1.1.0)
set (1.1.2)
sorted_set (1.0.3)
rbtree
set (~> 1.0)
Expand Down
5 changes: 3 additions & 2 deletions lib/puma/plugin/rabbit_carrots.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$
@puma_pid = $PROCESS_ID

@core_service = RabbitCarrots::Core.new(logger: log_writer)

Expand Down Expand Up @@ -43,6 +43,7 @@ def stop_rabbit_carrots
Process.kill('TERM', rabbit_carrots_pid)
Process.wait(rabbit_carrots_pid)
rescue Errno::ECHILD, Errno::ESRCH
log 'Rabbit Carrots already stopped'
end

def monitor_puma
Expand All @@ -57,7 +58,7 @@ def monitor(process_dead, message)
loop do
if send(process_dead)
log message
Process.kill('TERM', $$)
Process.kill('TERM', $PROCESS_ID)
break
end
sleep 2
Expand Down
4 changes: 2 additions & 2 deletions lib/rabbit_carrots/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class Configuration
:rabbitmq_exchange_name,
:automatically_recover,
:network_recovery_interval,
:recovery_attempts,
:orm
:recovery_attempts

def orm
@orm ||= :activerecord
end
Expand Down
39 changes: 29 additions & 10 deletions lib/rabbit_carrots/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class << self
end

def initialize(logger: nil)
@logger = logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout)
@logger = create_logger_adapter(logger || Logger.new(Rails.env.production? ? '/proc/self/fd/1' : $stdout))
@threads = []
@running = true
@shutdown_requested = false
Expand Down Expand Up @@ -61,7 +61,7 @@ def start(kill_to_restart_on_standard_error: false)
def request_shutdown
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.log 'Shutting down Rabbit Carrots service...'
logger.error 'Shutting down Rabbit Carrots service...'
end
@shutdown_requested = true
@threads.each(&:kill)
Expand All @@ -71,7 +71,7 @@ def request_shutdown
def stop
# Workaround to a known issue with Signal Traps and logs
Thread.start do
logger.log 'Stoppig the Rabbit Carrots service...'
logger.error 'Stoppig the Rabbit Carrots service...'
end
@running = false
end
Expand All @@ -80,32 +80,32 @@ def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, ki
RabbitCarrots::Connection.instance.channel.with do |channel|
exchange = channel.topic(RabbitCarrots.configuration.rabbitmq_exchange_name, durable: true)

logger.log "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
logger.info "Listening on QUEUE: #{queue_name} for ROUTING KEYS: #{routing_keys}"
queue = channel.queue(queue_name, durable: true, arguments: queue_arguments)

routing_keys.map(&:strip).each { |k| queue.bind(exchange, routing_key: k) }

queue.subscribe(block: false, manual_ack: true, prefetch: 10) do |delivery_info, properties, payload|
break if @shutdown_requested

logger.log "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
logger.info "Received from queue: #{queue_name}, Routing Keys: #{routing_keys}"
handler_class.handle!(channel, delivery_info, properties, payload)
channel.ack(delivery_info.delivery_tag, false)
rescue RabbitCarrots::EventHandlers::Errors::NackMessage, JSON::ParserError => _e
logger.log "Nacked message: #{payload}"
logger.warn "Nacked message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, false)
rescue RabbitCarrots::EventHandlers::Errors::NackAndRequeueMessage => _e
logger.log "Nacked and Requeued message: #{payload}"
logger.warn "Nacked and Requeued message: #{payload}"
channel.nack(delivery_info.delivery_tag, false, true)
rescue self.class.database_agnostic_not_null_violation, self.class.database_agnostic_record_invalid => e
logger.log "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
logger.warn "Null constraint or Invalid violation: #{payload}. Error: #{e.message}"
channel.ack(delivery_info.delivery_tag, false)
rescue self.class.database_agnostic_connection_not_established => e
logger.log "Error connection not established to the database: #{payload}. Error: #{e.message}"
logger.warn "Error connection not established to the database: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
rescue StandardError => e
logger.log "Error handling message: #{payload}. Error: #{e.message}"
logger.error "Error handling message: #{payload}. Error: #{e.message}"
sleep 3
channel.nack(delivery_info.delivery_tag, false, true)
Process.kill('SIGTERM', Process.pid) if kill_to_restart_on_standard_error
Expand All @@ -115,5 +115,24 @@ def run_task(queue_name:, handler_class:, routing_keys:, queue_arguments: {}, ki
logger.error "Bunny session error: #{e.message}"
request_shutdown
end

private

def create_logger_adapter(logger)
return logger if logger.respond_to?(:info) && logger.respond_to?(:error) && logger.respond_to?(:warn)

adapter = Object.new
def adapter.info(msg)
@logger.write("[INFO] #{msg}\n")
end
def adapter.error(msg)
@logger.write("[ERROR] #{msg}\n")
end
def adapter.warn(msg)
@logger.write("[WARN] #{msg}\n")
end
adapter.instance_variable_set(:@logger, logger)
adapter
end
end
end
2 changes: 1 addition & 1 deletion lib/rabbit_carrots/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module RabbitCarrots
VERSION = '1.0.2'
VERSION = '1.0.3'
end
Loading