Skip to content

Commit 8800f0b

Browse files
authored
DID Parameter Parsing (#17)
* Implement parameter parsing as in this [ServiceX story](ssl-hep/ServiceX#395) Individual CommitsL * Parse a `files` count thing * Get rid of post-3.6 dependencies * Search for the parsed DID, not the original parameterized one! * Fix up flake8 errors * Test for parsed parameter removal * Update docs and comments * Convert so everything is multi-path * Make sure multi-path works ok * Fix flake8 * Obey all vs avail get modes * Get logic of using get and files correct
1 parent a8cbb9e commit 8800f0b

File tree

5 files changed

+399
-10
lines changed

5 files changed

+399
-10
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,13 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
128128
Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier.
129129

130130
The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them.
131+
132+
## URI Format
133+
134+
All the incoming DID's are expected to be URI's without the schema. As such, there are two parameters that are currently parsed by the library. The rest are let through and routed to the callback:
135+
136+
* `files` - Number of files to report back to ServiceX. All files from the dataset are found, and then sorted in order. The first n files are then
137+
sent back. Default is all files.
138+
* `get` - If the value is `all` (the default) then all files in the dataset must be returned. If the value is `available`, then only files that are accessible need be returned.
139+
140+
As am example, if the following URI is given to ServiceX, "rucio://dataset_name?files=20&get=available", then the first 20 available files of the dataset will be processed by the rest of servicex.

src/servicex_did_finder_lib/communication.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import json
44
import logging
55
import time
6-
from typing import Any, AsyncGenerator, Callable, Dict, Optional
6+
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional
77
import sys
88

99
import pika
1010
from make_it_sync import make_sync
1111

1212
from servicex_did_finder_lib.did_summary import DIDSummary
1313
from servicex_did_finder_lib.did_logging import initialize_root_logger
14+
from servicex_did_finder_lib.util_uri import parse_did_uri
1415
from .servicex_adaptor import ServiceXAdapter
1516

1617
# The type for the callback method to handle DID's, supplied by the user.
@@ -26,17 +27,52 @@
2627
__logging.addHandler(logging.NullHandler())
2728

2829

30+
class _accumulator:
31+
'Track or cache files depending on the mode we are operating in'
32+
def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool):
33+
self._servicex = sx
34+
self._summary = sum
35+
self._hold_till_end = hold_till_end
36+
self._file_cache: List[Dict[str, Any]] = []
37+
38+
def add(self, file_info: Dict[str, Any]):
39+
'Track and inject the file back into the system'
40+
if self._hold_till_end:
41+
self._file_cache.append(file_info)
42+
else:
43+
self._summary.add_file(file_info)
44+
if self._summary.file_count == 1:
45+
self._servicex.post_transform_start()
46+
self._servicex.put_file_add(file_info)
47+
48+
def send_on(self, count):
49+
'Send the accumulated files'
50+
if self._hold_till_end:
51+
self._hold_till_end = False
52+
files = sorted(self._file_cache, key=lambda x: x['paths'])
53+
for file_info in files[0:count]:
54+
self.add(file_info)
55+
56+
2957
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],
3058
user_callback: UserDIDHandler):
31-
summary = DIDSummary(did)
3259
start_time = datetime.now()
33-
async for file_info in user_callback(did, info):
3460

35-
# Track the file, inject back into the system
36-
summary.add_file(file_info)
37-
if summary.file_count == 1:
38-
servicex.post_transform_start()
39-
servicex.put_file_add(file_info)
61+
summary = DIDSummary(did)
62+
did_info = parse_did_uri(did)
63+
hold_till_end = did_info.file_count != -1
64+
acc = _accumulator(servicex, summary, hold_till_end)
65+
66+
try:
67+
async for file_info in user_callback(did_info.did, info):
68+
acc.add(file_info)
69+
70+
except Exception:
71+
if did_info.get_mode == 'all':
72+
raise
73+
74+
# If we've been holding onto any files, we need to send them now.
75+
acc.send_on(did_info.file_count)
4076

4177
# Simple error checking and reporting
4278
if summary.file_count == 0:
@@ -54,7 +90,7 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
5490
}
5591
)
5692

57-
servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds')
93+
servicex.post_status_update(f'Completed load of files in {elapsed_time} seconds')
5894

5995

6096
def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body,
@@ -109,7 +145,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
109145
def init_rabbit_mq(user_callback: UserDIDHandler,
110146
rabbitmq_url: str, queue_name: str, retries: int,
111147
retry_interval: float,
112-
file_prefix: str = None):
148+
file_prefix: str = None): # type: ignore
113149
rabbitmq = None
114150
retry_count = 0
115151

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from typing import Dict, List
2+
import urllib
3+
4+
5+
class ParsedDIDInfo:
6+
def __init__(self, did: str, get_mode: str, file_count: int):
7+
self.did = did
8+
self.get_mode = get_mode
9+
self.file_count = file_count
10+
11+
# The did to pass into the library
12+
did: str
13+
14+
# Mode to get the files (default 'all')
15+
get_mode: str
16+
17+
# Number of files to fetch (default '-1')
18+
file_count: int
19+
20+
21+
def parse_did_uri(uri: str) -> ParsedDIDInfo:
22+
'''Parse the uri that is given to us from ServiceX, pulling out
23+
the components we care about, and keeping the DID that needs to
24+
be passed down.
25+
26+
URI arguments parsed:
27+
28+
* `files` - Number of files to fetch (default is all)
29+
* `get` - Mode to get the files (default is 'all'). Only "available" is also supported.
30+
31+
Args:
32+
uri (str): DID from ServiceX
33+
34+
Returns:
35+
ParsedDIDInfo: The URI parsed into parts
36+
'''
37+
info = urllib.parse.urlparse(uri) # type: ignore
38+
39+
params = urllib.parse.parse_qs(info.query) # type: ignore
40+
get_string = 'all' if 'get' not in params else params['get'][-1]
41+
file_count = -1 if 'files' not in params else int(params['files'][0])
42+
43+
if get_string not in ['all', 'available']:
44+
raise ValueError('Bad value for "get" string in DID - must be "all" or "available", not '
45+
f'"{get_string}"')
46+
47+
for k in ['get', 'files']:
48+
if k in params:
49+
del params[k]
50+
51+
def unwind_params(ps: Dict[str, List[str]]):
52+
for k, values in ps.items():
53+
for v in values:
54+
yield k, v
55+
56+
new_query = "&".join(f'{k}={v}' for k, v in unwind_params(params))
57+
if len(new_query) > 0:
58+
new_query = "?" + new_query
59+
60+
return ParsedDIDInfo(info.path + new_query, get_string, file_count)

tests/servicex_did_finder_lib/test_communication.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,39 @@ async def my_callback(did_name: str, info: Dict[str, Any]):
116116
# Make sure callback was called
117117
assert seen_name == 'hi-there'
118118

119+
# Make sure the file was sent along, along with the completion
120+
SXAdaptor.put_file_add.assert_called_once_with({
121+
'paths': ['fork/it/over'],
122+
'adler32': 'no clue',
123+
'file_size': 22323,
124+
'file_events': 0
125+
})
126+
SXAdaptor.put_fileset_complete.assert_called_once()
127+
128+
129+
def test_one_file_call_with_param(rabbitmq, SXAdaptor):
130+
'Test a working, simple, one file call with parameter'
131+
132+
seen_name = None
133+
134+
async def my_callback(did_name: str, info: Dict[str, Any]):
135+
nonlocal seen_name
136+
seen_name = did_name
137+
yield {
138+
'paths': ["fork/it/over"],
139+
'adler32': 'no clue',
140+
'file_size': 22323,
141+
'file_events': 0,
142+
}
143+
144+
init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
145+
retry_interval=10)
146+
147+
rabbitmq.send_did_request('hithere?files=10')
148+
149+
# Make sure callback was called
150+
assert seen_name == 'hithere'
151+
119152
# Make sure the file was sent along, along with the completion
120153
SXAdaptor.put_file_add.assert_called_once()
121154
SXAdaptor.put_fileset_complete.assert_called_once()
@@ -142,6 +175,81 @@ async def my_callback(did_name: str, info: Dict[str, Any]) \
142175
SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal')
143176

144177

178+
def test_failed_file_after_good(rabbitmq, SXAdaptor):
179+
'Test a callback that fails before any files are sent'
180+
181+
async def my_callback(did_name: str, info: Dict[str, Any]) \
182+
-> AsyncGenerator[Dict[str, Any], None]:
183+
yield {
184+
'paths': ["fork/it/over"],
185+
'adler32': 'no clue',
186+
'file_size': 22323,
187+
'file_events': 0,
188+
}
189+
raise Exception('that did not work')
190+
191+
init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
192+
retry_interval=10)
193+
rabbitmq.send_did_request('hi-there')
194+
195+
# Make sure the file was sent along, along with the completion
196+
SXAdaptor.put_file_add.assert_called_once()
197+
SXAdaptor.put_fileset_complete.assert_not_called()
198+
SXAdaptor.post_status_update.assert_any_call(ANY, severity='fatal')
199+
200+
201+
def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor):
202+
'Test a callback that fails before any files are sent'
203+
204+
async def my_callback(did_name: str, info: Dict[str, Any]) \
205+
-> AsyncGenerator[Dict[str, Any], None]:
206+
yield {
207+
'paths': ["fork/it/over"],
208+
'adler32': 'no clue',
209+
'file_size': 22323,
210+
'file_events': 0,
211+
}
212+
raise Exception('that did not work')
213+
214+
init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
215+
retry_interval=10)
216+
rabbitmq.send_did_request('hi-there?get=available')
217+
218+
# Make sure the file was sent along, along with the completion
219+
SXAdaptor.put_file_add.assert_called_once()
220+
SXAdaptor.put_fileset_complete.assert_called_once()
221+
SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds")
222+
223+
224+
def test_failed_file_after_good_with_avail_limited_number(rabbitmq, SXAdaptor):
225+
'Files are sent back, only available allowed, but want a certian number'
226+
227+
async def my_callback(did_name: str, info: Dict[str, Any]) \
228+
-> AsyncGenerator[Dict[str, Any], None]:
229+
yield {
230+
'paths': ["fork/it/over1"],
231+
'adler32': 'no clue',
232+
'file_size': 22323,
233+
'file_events': 0,
234+
}
235+
yield {
236+
'paths': ["fork/it/over2"],
237+
'adler32': 'no clue',
238+
'file_size': 22323,
239+
'file_events': 0,
240+
}
241+
raise Exception('that did not work')
242+
243+
init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
244+
retry_interval=10)
245+
rabbitmq.send_did_request('hi-there?get=available&files=1')
246+
247+
# Make sure the file was sent along, along with the completion
248+
SXAdaptor.put_file_add.assert_called_once()
249+
SXAdaptor.put_fileset_complete.assert_called_once()
250+
SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds")
251+
252+
145253
def test_no_files_returned(rabbitmq, SXAdaptor):
146254
'Test a callback that fails before any files are sent'
147255

@@ -258,6 +366,103 @@ async def my_user_callback(did, info):
258366
assert SXAdaptor.post_status_update.called_once()
259367

260368

369+
@pytest.mark.asyncio
370+
async def test_run_file_fetch_one(SXAdaptor, mocker):
371+
async def my_user_callback(did, info):
372+
return_values = [
373+
{
374+
'paths': ['/tmp/foo'],
375+
'adler32': '13e4f',
376+
'file_size': 1024,
377+
'file_events': 128
378+
},
379+
{
380+
'paths': ['/tmp/bar'],
381+
'adler32': 'f33d',
382+
'file_size': 2046,
383+
'file_events': 64
384+
}
385+
]
386+
for v in return_values:
387+
yield v
388+
389+
await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback)
390+
SXAdaptor.post_transform_start.assert_called_once()
391+
392+
assert SXAdaptor.put_file_add.call_count == 1
393+
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['paths'] == ['/tmp/bar']
394+
395+
SXAdaptor.put_fileset_complete.assert_called_once
396+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1
397+
assert SXAdaptor.post_status_update.called_once()
398+
399+
400+
@pytest.mark.asyncio
401+
async def test_run_file_fetch_one_reverse(SXAdaptor, mocker):
402+
'The files should be sorted so they return the same'
403+
async def my_user_callback(did, info):
404+
return_values = [
405+
{
406+
'paths': ['/tmp/bar'],
407+
'adler32': 'f33d',
408+
'file_size': 2046,
409+
'file_events': 64
410+
},
411+
{
412+
'paths': ['/tmp/foo'],
413+
'adler32': '13e4f',
414+
'file_size': 1024,
415+
'file_events': 128
416+
},
417+
]
418+
for v in return_values:
419+
yield v
420+
421+
await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback)
422+
SXAdaptor.post_transform_start.assert_called_once()
423+
424+
assert SXAdaptor.put_file_add.call_count == 1
425+
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['paths'][0] == '/tmp/bar'
426+
427+
SXAdaptor.put_fileset_complete.assert_called_once
428+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1
429+
assert SXAdaptor.post_status_update.called_once()
430+
431+
432+
@pytest.mark.asyncio
433+
async def test_run_file_fetch_one_multi(SXAdaptor, mocker):
434+
async def my_user_callback(did, info):
435+
return_values = [
436+
{
437+
'paths': ['/tmp/foo', 'others:/tmp/foo'],
438+
'adler32': '13e4f',
439+
'file_size': 1024,
440+
'file_events': 128
441+
},
442+
{
443+
'paths': ['/tmp/bar', 'others:/tmp/bar'],
444+
'adler32': 'f33d',
445+
'file_size': 2046,
446+
'file_events': 64
447+
}
448+
]
449+
for v in return_values:
450+
yield v
451+
452+
await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback)
453+
SXAdaptor.post_transform_start.assert_called_once()
454+
455+
assert SXAdaptor.put_file_add.call_count == 1
456+
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['paths'] == [
457+
'/tmp/bar',
458+
'others:/tmp/bar'
459+
]
460+
461+
SXAdaptor.put_fileset_complete.assert_called_once
462+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1
463+
assert SXAdaptor.post_status_update.called_once()
464+
465+
261466
@pytest.mark.asyncio
262467
async def test_run_file_fetch_loop_bad_did(SXAdaptor, mocker):
263468
async def my_user_callback(did, info):

0 commit comments

Comments
 (0)