Skip to content

Commit 15deac1

Browse files
committed
Make DIDFinderApp a subclass of Celery App
1 parent 7e525e1 commit 15deac1

File tree

3 files changed

+51
-109
lines changed

3 files changed

+51
-109
lines changed

README.md

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,32 @@ by adding the following to your `pyproject.tom` file:
2828
servicex-did-finder-lib = "^3.0"
2929
```
3030

31-
Create a script that will run your DID. It needs to contain your generator function that adheres
32-
to the UserDIDHandler signature:
31+
Create a celery app that will run your DID finder. This app will be responsible for starting the
32+
Celery worker and registering your DID finder function as a task. Here is an example of how to do
33+
this. Celery prefers that the app is in a file called `celery.py` in a module in your project. Here
34+
is an example of how to do this:
35+
36+
## celery.py:
3337
```python
34-
UserDIDHandler = Callable[
35-
[str, Dict[str, Any], Dict[str, Any]],
36-
Generator[Dict[str, Any], None, None]
37-
]
38+
39+
from servicex_did_finder_lib import DIDFinderApp
40+
rucio_adaptor = RucioAdaptor()
41+
app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adaptor})
3842
```
3943

44+
Attach the DID finder to the app by using the `did_lookup_task` decorator. This decorator will
45+
register the function as a Celery task. Here is an example of how to do this:
46+
47+
```python
48+
@app.did_lookup_task(name="did_finder_rucio.lookup_dataset")
49+
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
50+
self.do_lookup(did=did, dataset_id=dataset_id,
51+
endpoint=endpoint, user_did_finder=find_files)
52+
```
53+
54+
You will need to implement the `find_files` function. This function is a generator that yields
55+
file information dictionaries.
56+
4057
The arguments to the method are straight forward:
4158

4259
* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
@@ -74,45 +91,22 @@ def find_files(did_name: str,
7491
}
7592
```
7693

77-
There is a small amount of additional boilerplate code that is required to create a DID Finder. This
78-
is the code that will create the Celery app and register your function as a task. Here is an
79-
example (which assumes that `find_files` is your DID handler):
80-
```python
81-
from servicex_did_finder_lib import DIDFinderApp
82-
83-
app = DIDFinderApp('cernopendata')
84-
85-
@app.did_lookup_task(name="did_finder_cern_opendata.lookup_dataset")
86-
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
87-
self.do_lookup(did=did, dataset_id=dataset_id,
88-
endpoint=endpoint, user_did_finder=find_files)
89-
90-
app.start()
91-
```
9294

9395
## Extra Command Line Arguments
9496
Sometimes you need to pass additional information to your DID Finder from the command line. You do
95-
this by creating your own `ArgParser` and then calling the `add_did_finder_cnd_arguments` method
96-
which inserts the arguments that the library needs to pass to the finder. Here is an example:
97-
97+
this by creating your own `ArgParser`
9898
```python
9999
import argparse
100-
from servicex_did_finder_lib import DIDFinderApp
101-
102-
parser = argparse.ArgumentParser()
103-
parser.add_argument('--foo', dest='foo', action='store',
104-
default='',
105-
help='Prefix to add to Xrootd URLs')
100+
# Parse command-line arguments
101+
parser = argparse.ArgumentParser(description='DIDFinderApp')
102+
parser.add_argument('--custom-arg', help='Custom argument for DIDFinderApp')
103+
args, unknown = parser.parse_known_args()
106104

107-
DIDFinderApp.add_did_finder_cnd_arguments(parser)
105+
# Create the app instance
106+
app = DIDFinderApp('myApp', did_finder_args={"custom-arg": args.custom_arg})
108107

109108
```
110109

111-
You then just pass the dictionary of parsed args to your app constructor:
112-
```python
113-
app = DIDFinderApp('cernopendata', parsed_args=parser.parse_args())
114-
```
115-
116110
These parsed args will be passed to your `find_files` function as a dictionary in
117111
the `did_finder_args` parameter.
118112

src/servicex_did_finder_lib/did_finder_app.py

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28-
import argparse
2928
import logging
3029
from datetime import datetime
3130
from typing import Any, Generator, Callable, Dict, Optional
@@ -120,37 +119,31 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U
120119
)
121120

122121

123-
class DIDFinderApp:
122+
class DIDFinderApp(Celery):
124123
"""
125124
The main application for a DID finder. This will setup the Celery application
126125
and start the worker to process the DID requests.
127126
"""
128127
def __init__(self, did_finder_name: str,
129-
parsed_args: Optional[argparse.Namespace] = None):
128+
did_finder_args: Optional[Dict[str, Any]] = None,
129+
*args, **kwargs):
130130
"""
131131
Initialize the DID finder application
132132
Args:
133-
did_finder_name: The name of the DID finder
134-
parsed_args: The parsed command line arguments. Leave as None to use the default parser
133+
did_finder_name: The name of the DID finder.
134+
did_finder_args: The parsed command line arguments and other objects you want
135+
to make available to the tasks
135136
"""
136137

137138
self.name = did_finder_name
138-
self.parsed_args = vars(parsed_args) if parsed_args else None
139-
140-
# Setup command line parsing
141-
if self.parsed_args is None:
142-
parser = argparse.ArgumentParser()
143-
self.add_did_finder_cnd_arguments(parser)
144-
self.parsed_args = vars(parser.parse_args())
145-
146139
initialize_root_logger(self.name)
147140

148-
self.app = Celery(f"did_finder_{self.name}",
149-
broker_url=self.parsed_args['rabbit_uri'],
150-
broker_connection_retry_on_startup=True)
141+
super().__init__(f"did_finder_{self.name}", *args,
142+
broker_connection_retry_on_startup=True,
143+
**kwargs)
151144

152-
# Cache the args in the App so they are accessible to the tasks
153-
self.app.did_finder_args = self.parsed_args
145+
# Cache the args in the App, so they are accessible to the tasks
146+
self.did_finder_args = did_finder_args
154147

155148
def did_lookup_task(self, name):
156149
"""
@@ -166,41 +159,8 @@ def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
166159
name: The name of the task
167160
"""
168161
def decorator(func):
169-
@self.app.task(base=DIDFinderTask, bind=True, name=name)
162+
@self.task(base=DIDFinderTask, bind=True, name=name)
170163
def wrapper(*args, **kwargs):
171164
return func(*args, **kwargs)
172165
return wrapper
173166
return decorator
174-
175-
def start(self):
176-
self.app.worker_main(argv=['worker',
177-
'--loglevel=INFO',
178-
'-Q', f'did_finder_{self.name}',
179-
'-n', f'{self.name}@%h'
180-
])
181-
182-
@classmethod
183-
def add_did_finder_cnd_arguments(cls, parser: argparse.ArgumentParser):
184-
"""add_did_finder_cnd_arguments Add required arguments to a parser
185-
186-
If you need to parse command line arguments for some special configuration, create your
187-
own argument parser, and call this function to make sure the arguments needed
188-
for running the back-end communication are filled in properly.
189-
190-
Then pass the results of the parsing to the DID Finder App's constructor method.
191-
192-
Args:
193-
parser (argparse.ArgumentParser): The argument parser. Arguments needed for the
194-
did finder/servicex communication will be added.
195-
"""
196-
parser.add_argument(
197-
"--rabbit-uri", dest="rabbit_uri", action="store", required=True
198-
)
199-
parser.add_argument(
200-
"--prefix",
201-
dest="prefix",
202-
action="store",
203-
required=False,
204-
default="",
205-
help="Prefix to add to use a caching proxy for URIs",
206-
)

tests/servicex_did_finder_lib_tests/test_did_finder_app.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28-
import sys
2928
from unittest.mock import patch
3029
import pytest
3130
from celery import Celery
@@ -104,25 +103,14 @@ def test_did_finder_task_exception(mocker, servicex, single_file_info):
104103
)
105104

106105

107-
def test_did_finder_app(mocker, monkeypatch):
108-
# Temporarily replace sys.argv with mock_args
109-
monkeypatch.setattr(sys, 'argv', [
110-
"did_finder.py",
111-
"--rabbit-uri", "my-rabbit"
112-
])
106+
def test_celery_app():
107+
app = DIDFinderApp('foo')
108+
assert isinstance(app, Celery)
109+
assert app.name == 'foo'
113110

114-
mock_celery_app = mocker.MagicMock(Celery)
111+
@app.did_lookup_task(name="did_finder_rucio.lookup_dataset")
112+
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
113+
self.do_lookup(did=did, dataset_id=dataset_id,
114+
endpoint=endpoint, user_did_finder=lambda x, y, z: None)
115115

116-
with patch(
117-
"servicex_did_finder_lib.did_finder_app.Celery", autospec=True
118-
) as celery:
119-
celery.return_value = mock_celery_app
120-
app = DIDFinderApp(did_finder_name="pytest", parsed_args=None)
121-
app.start()
122-
celery.assert_called_with("did_finder_pytest",
123-
broker_connection_retry_on_startup=True,
124-
broker_url="my-rabbit")
125-
mock_celery_app.worker_main.assert_called_with(argv=['worker',
126-
'--loglevel=INFO',
127-
'-Q', 'did_finder_pytest',
128-
'-n', 'pytest@%h'])
116+
assert lookup_dataset.__name__ == 'wrapper'

0 commit comments

Comments
 (0)