Skip to content

Commit e85b589

Browse files
Ilija VukoticBenGalewsky
authored andcommitted
fixes
1 parent c2cad3e commit e85b589

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

src/servicex_did_finder_lib/communication.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ def send_bulk(self, file_list: List[Dict[str, Any]]):
5858
'does a bulk put of files'
5959
for ifl in file_list:
6060
self._summary.add_file(ifl)
61-
if self._summary.file_count == 1:
62-
self._servicex.post_transform_start()
63-
self._servicex.put_file_add(file_list)
61+
self._servicex.put_file_add_bulk(file_list)
62+
self._servicex.post_transform_start()
6463

6564

6665
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],

src/servicex_did_finder_lib/servicex_adaptor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
MAX_RETRIES = 3
3535

3636

37+
def chunks(lst, n):
38+
"""Yield successive n-sized chunks from lst."""
39+
for i in range(0, len(lst), n):
40+
yield lst[i:i + n]
41+
42+
3743
class ServiceXAdapter:
3844
def __init__(self, endpoint, file_prefix=None):
3945
self.endpoint = endpoint
@@ -88,6 +94,32 @@ def put_file_add(self, file_info):
8894
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file '
8995
f'message: {str(file_info)} - Ignoring error.')
9096

97+
def put_file_add_bulk(self, file_list):
98+
for chunk in chunks(file_list, 20):
99+
success = False
100+
attempts = 0
101+
mesg = []
102+
for fi in chunk:
103+
mesg.append({
104+
"timestamp": datetime.now().isoformat(),
105+
"paths": [self._prefix_file(fp) for fp in fi['paths']],
106+
'adler32': fi['adler32'],
107+
'file_size': fi['file_size'],
108+
'file_events': fi['file_events']
109+
})
110+
while not success and attempts < MAX_RETRIES:
111+
try:
112+
requests.put(self.endpoint + "/files", json=mesg)
113+
self.logger.info(f"Metric: {json.dumps(mesg)}")
114+
success = True
115+
except requests.exceptions.ConnectionError:
116+
self.logger.exception(f'Connection error to ServiceX App. Will retry '
117+
f'(try {attempts} out of {MAX_RETRIES}')
118+
attempts += 1
119+
if not success:
120+
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file_bulk '
121+
f'message: {mesg} - Ignoring error.')
122+
91123
def post_transform_start(self):
92124
success = False
93125
attempts = 0

0 commit comments

Comments
 (0)