Skip to content

Commit 7e525e1

Browse files
committed
Specify queue name to make sure we only receive tasks destined for this did finder
1 parent 18ffd07 commit 7e525e1

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

src/servicex_did_finder_lib/did_finder_app.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U
101101

102102
acc.send_on(did_info.file_count)
103103
except Exception:
104+
# noinspection PyTypeChecker
104105
self.logger.error(
105106
f"Error processing DID {did}",
106107
extra={"dataset_id": dataset_id},
@@ -144,7 +145,9 @@ def __init__(self, did_finder_name: str,
144145

145146
initialize_root_logger(self.name)
146147

147-
self.app = Celery(f"did_finder_{self.name}", broker_url=self.parsed_args['rabbit_uri'])
148+
self.app = Celery(f"did_finder_{self.name}",
149+
broker_url=self.parsed_args['rabbit_uri'],
150+
broker_connection_retry_on_startup=True)
148151

149152
# Cache the args in the App so they are accessible to the tasks
150153
self.app.did_finder_args = self.parsed_args
@@ -170,7 +173,11 @@ def wrapper(*args, **kwargs):
170173
return decorator
171174

172175
def start(self):
173-
self.app.worker_main(argv=['worker', '--loglevel=INFO'])
176+
self.app.worker_main(argv=['worker',
177+
'--loglevel=INFO',
178+
'-Q', f'did_finder_{self.name}',
179+
'-n', f'{self.name}@%h'
180+
])
174181

175182
@classmethod
176183
def add_did_finder_cnd_arguments(cls, parser: argparse.ArgumentParser):

tests/servicex_did_finder_lib_tests/test_did_finder_app.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,5 +119,10 @@ def test_did_finder_app(mocker, monkeypatch):
119119
celery.return_value = mock_celery_app
120120
app = DIDFinderApp(did_finder_name="pytest", parsed_args=None)
121121
app.start()
122-
celery.assert_called_with("did_finder_pytest", broker_url="my-rabbit")
123-
mock_celery_app.worker_main.assert_called_with(argv=['worker', '--loglevel=INFO'])
122+
celery.assert_called_with("did_finder_pytest",
123+
broker_connection_retry_on_startup=True,
124+
broker_url="my-rabbit")
125+
mock_celery_app.worker_main.assert_called_with(argv=['worker',
126+
'--loglevel=INFO',
127+
'-Q', 'did_finder_pytest',
128+
'-n', 'pytest@%h'])

0 commit comments

Comments
 (0)