Skip to content

Commit d10439d

Browse files
committed
Fix maintaining pipeline when using AMQP
If RabbitMQ droped the connection, pika can emit the StreamLostError which can be gracefully handled by reconnection attempt. In addition, consuming on BlockingConnection without the timeout can block internal maintanence operations, like sending heartbeats [1]. [1] https://pika.readthedocs.io/en/1.2.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume
1 parent d5e3e41 commit d10439d

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
### Configuration
1313

1414
### Core
15+
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).
1516

1617
### Development
1718

intelmq/lib/pipeline.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu
511511
if pika is None:
512512
raise ValueError("To use AMQP you must install the 'pika' library.")
513513
self.properties = pika.BasicProperties(delivery_mode=2) # message persistence
514+
self.heartbeat = 10
514515

515516
def load_configurations(self, queues_type):
516517
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1")
@@ -533,9 +534,9 @@ def load_configurations(self, queues_type):
533534
self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH))
534535
pika_version = tuple(int(x) for x in pika.__version__.split('.'))
535536
if pika_version < (0, 11):
536-
self.kwargs['heartbeat_interval'] = 10
537+
self.kwargs['heartbeat_interval'] = self.heartbeat
537538
else:
538-
self.kwargs['heartbeat'] = 10
539+
self.kwargs['heartbeat'] = self.heartbeat
539540
if pika_version < (1, ):
540541
# https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q
541542
self.publish_raises_nack = False
@@ -607,7 +608,10 @@ def _send(self, destination_queue, message, reconnect=True):
607608
mandatory=True,
608609
)
609610
except Exception as exc: # UnroutableError, NackError in 1.0.0
610-
if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed):
611+
if reconnect and (
612+
isinstance(exc, pika.exceptions.ConnectionClosed) or
613+
isinstance(exc, pika.exceptions.StreamLostError)
614+
):
611615
self.logger.debug('Error sending the message. '
612616
'Will re-connect and re-send.',
613617
exc_info=True)
@@ -645,9 +649,13 @@ def _receive(self) -> bytes:
645649
if self.source_queue is None:
646650
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
647651
try:
648-
method, header, body = next(self.channel.consume(self.source_queue))
649-
if method:
650-
self.delivery_tag = method.delivery_tag
652+
method, body = None, None
653+
while not (method or body):
654+
method, _, body = next(
655+
self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2)
656+
)
657+
if method:
658+
self.delivery_tag = method.delivery_tag
651659
except Exception as exc:
652660
raise exceptions.PipelineError(exc)
653661
else:

0 commit comments

Comments
 (0)