Skip to content

Commit f25b740

Browse files
Ilija VukoticBenGalewsky
authored andcommitted
init chnages
1 parent 13932b7 commit f25b740

File tree

2 files changed

+83
-3
lines changed

2 files changed

+83
-3
lines changed

src/servicex_did_finder_lib/communication.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
class _accumulator:
3131
'Track or cache files depending on the mode we are operating in'
32+
3233
def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool):
3334
self._servicex = sx
3435
self._summary = sum
@@ -53,6 +54,14 @@ def send_on(self, count):
5354
for file_info in files[0:count]:
5455
self.add(file_info)
5556

57+
def send_bulk(self, file_list: List[Dict[str, Any]]):
58+
'does a bulk put of files'
59+
for ifl in file_list:
60+
self._summary.add_file(ifl)
61+
self._servicex.put_file_add(file_list)
62+
if self._summary.file_count == 1:
63+
self._servicex.post_transform_start()
64+
5665

5766
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],
5867
user_callback: UserDIDHandler):
@@ -65,7 +74,10 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
6574

6675
try:
6776
async for file_info in user_callback(did_info.did, info):
68-
acc.add(file_info)
77+
if type(file_info) is dict:
78+
acc.add(file_info)
79+
else:
80+
acc.send_bulk(file_info)
6981

7082
except Exception:
7183
if did_info.get_mode == 'all':

tests/servicex_did_finder_lib/test_communication.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ async def my_callback(did_name: str, info: Dict[str, Any]):
122122
'adler32': 'no clue',
123123
'file_size': 22323,
124124
'file_events': 0
125-
})
125+
})
126126
SXAdaptor.put_fileset_complete.assert_called_once()
127127

128128

@@ -154,6 +154,39 @@ async def my_callback(did_name: str, info: Dict[str, Any]):
154154
SXAdaptor.put_fileset_complete.assert_called_once()
155155

156156

157+
def test_bulk_file_call_with_param(rabbitmq, SXAdaptor):
158+
'Test a working, simple, two file call with parameter'
159+
160+
seen_name = None
161+
162+
async def my_callback(did_name: str, info: Dict[str, Any]):
163+
nonlocal seen_name
164+
seen_name = did_name
165+
yield [{
166+
'paths': ["fork/it/over"],
167+
'adler32': 'no clue',
168+
'file_size': 22323,
169+
'file_events': 0,
170+
}, {
171+
'paths': ["fork/it/over"],
172+
'adler32': 'no clue',
173+
'file_size': 22323,
174+
'file_events': 0,
175+
}]
176+
177+
init_rabbit_mq(my_callback, 'http://myrabbit.com', 'test_queue_name', retries=12,
178+
retry_interval=10)
179+
180+
rabbitmq.send_did_request('hithere?files=10')
181+
182+
# Make sure callback was called
183+
assert seen_name == 'hithere'
184+
185+
# Make sure the file was sent along, along with the completion
186+
SXAdaptor.put_file_add.assert_called_once()
187+
SXAdaptor.put_fileset_complete.assert_called_once()
188+
189+
157190
def test_with_scope(rabbitmq, SXAdaptor):
158191
seen_name: Optional[str] = None
159192

@@ -389,6 +422,41 @@ async def my_user_callback(did, info):
389422
assert SXAdaptor.post_status_update.called_once()
390423

391424

425+
@pytest.mark.asyncio
426+
async def test_run_file_bulk_fetch_loop(SXAdaptor, mocker):
427+
async def my_user_callback(did, info):
428+
return_values = [
429+
{
430+
'paths': ['/tmp/foo'],
431+
'adler32': '13e4f',
432+
'file_size': 1024,
433+
'file_events': 128
434+
},
435+
{
436+
'paths': ['/tmp/bar'],
437+
'adler32': 'f33d',
438+
'file_size': 2046,
439+
'file_events': 64
440+
}
441+
]
442+
yield return_values
443+
444+
await run_file_fetch_loop("123-456", SXAdaptor, {}, my_user_callback)
445+
SXAdaptor.post_transform_start.assert_called_once()
446+
447+
assert SXAdaptor.put_file_add.call_count == 1
448+
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['paths'][0] == '/tmp/foo'
449+
assert SXAdaptor.put_file_add.call_args_list[1][0][0]['paths'][0] == '/tmp/bar'
450+
451+
SXAdaptor.put_fileset_complete.assert_called_once
452+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 2
453+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files-skipped'] == 0
454+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['total-events'] == 192
455+
assert SXAdaptor.put_fileset_complete.call_args[0][0]['total-bytes'] == 3070
456+
457+
assert SXAdaptor.post_status_update.called_once()
458+
459+
392460
@pytest.mark.asyncio
393461
async def test_run_file_fetch_one(SXAdaptor, mocker):
394462
async def my_user_callback(did, info):
@@ -479,7 +547,7 @@ async def my_user_callback(did, info):
479547
assert SXAdaptor.put_file_add.call_args_list[0][0][0]['paths'] == [
480548
'/tmp/bar',
481549
'others:/tmp/bar'
482-
]
550+
]
483551

484552
SXAdaptor.put_fileset_complete.assert_called_once
485553
assert SXAdaptor.put_fileset_complete.call_args[0][0]['files'] == 1

0 commit comments

Comments
 (0)