Skip to content

Commit 2eb3ca2

Browse files
authored
Properly implement delayed sending of batch file updates (#24)
* Getting the test in place and black reformatting * Black reformatting * Bulk sending now by default, and tests work * Update poetry installer * Try limiting testing * Fix extra imports Fixes #23 and #22
1 parent e4bf4bf commit 2eb3ca2

File tree

3 files changed

+501
-320
lines changed

3 files changed

+501
-320
lines changed

.github/workflows/ci.yaml

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,31 @@ jobs:
1212
test:
1313
strategy:
1414
matrix:
15-
python-version: [3.6, 3.7, 3.8, 3.9]
15+
python-version: [3.7, 3.8, 3.9]
1616
runs-on: ubuntu-latest
1717

1818
steps:
19-
- uses: actions/checkout@master
20-
- name: Set up Python ${{ matrix.python-version }}
21-
uses: actions/setup-python@v2
22-
with:
23-
python-version: ${{ matrix.python-version }}
24-
- name: Install poetry
25-
uses: Gr1N/setup-poetry@v4
26-
- name: Install dependencies
27-
run: |
28-
poetry install
29-
pip list
30-
- name: Lint with Flake8
31-
run: |
32-
poetry run flake8
33-
- name: Test with pytest
34-
run: |
35-
poetry run coverage run -m --source=src pytest tests
36-
poetry run coverage xml
37-
- name: Report coverage using codecov
38-
if: github.event_name == 'push' && matrix.python-version == 3.8
39-
uses: codecov/codecov-action@v1
40-
with:
41-
file: ./coverage.xml # optional
42-
flags: unittests # optional
19+
- uses: actions/checkout@master
20+
- name: Set up Python ${{ matrix.python-version }}
21+
uses: actions/setup-python@v2
22+
with:
23+
python-version: ${{ matrix.python-version }}
24+
- name: Install poetry
25+
uses: Gr1N/setup-poetry@v7
26+
- name: Install dependencies
27+
run: |
28+
poetry install
29+
pip list
30+
- name: Lint with Flake8
31+
run: |
32+
poetry run flake8
33+
- name: Test with pytest
34+
run: |
35+
poetry run coverage run -m --source=src pytest tests
36+
poetry run coverage xml
37+
- name: Report coverage using codecov
38+
if: github.event_name == 'push' && matrix.python-version == 3.8
39+
uses: codecov/codecov-action@v1
40+
with:
41+
file: ./coverage.xml # optional
42+
flags: unittests # optional

src/servicex_did_finder_lib/communication.py

Lines changed: 99 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
# Given name, build the RabbitMQ queue name by appending this.
2121
# This is backed into how ServiceX works - do not change unless it
2222
# is also changed in the ServiceX_App
23-
QUEUE_NAME_POSTFIX = '_did_requests'
23+
QUEUE_NAME_POSTFIX = "_did_requests"
2424

2525
# Easy to use local logger
2626
__logging = logging.getLogger(__name__)
2727
__logging.addHandler(logging.NullHandler())
2828

2929

3030
class _accumulator:
31-
'Track or cache files depending on the mode we are operating in'
31+
"Track or cache files depending on the mode we are operating in"
3232

3333
def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool):
3434
self._servicex = sx
@@ -37,7 +37,7 @@ def __init__(self, sx: ServiceXAdapter, sum: DIDSummary, hold_till_end: bool):
3737
self._file_cache: List[Dict[str, Any]] = []
3838

3939
def add(self, file_info: Dict[str, Any]):
40-
'Track and inject the file back into the system'
40+
"Track and inject the file back into the system"
4141
if self._hold_till_end:
4242
self._file_cache.append(file_info)
4343
else:
@@ -47,23 +47,31 @@ def add(self, file_info: Dict[str, Any]):
4747
self._servicex.put_file_add(file_info)
4848

4949
def send_on(self, count):
50-
'Send the accumulated files'
50+
"Send the accumulated files"
5151
if self._hold_till_end:
5252
self._hold_till_end = False
53-
files = sorted(self._file_cache, key=lambda x: x['paths'])
54-
for file_info in files[0:count]:
55-
self.add(file_info)
53+
files = sorted(self._file_cache, key=lambda x: x["paths"])
54+
self.send_bulk(files[:count])
5655

5756
def send_bulk(self, file_list: List[Dict[str, Any]]):
58-
'does a bulk put of files'
59-
for ifl in file_list:
60-
self._summary.add_file(ifl)
61-
self._servicex.put_file_add_bulk(file_list)
62-
self._servicex.post_transform_start()
57+
"does a bulk put of files"
58+
if self._hold_till_end:
59+
for f in file_list:
60+
self.add(f)
61+
else:
62+
if self._summary.file_count == 0:
63+
self._servicex.post_transform_start()
64+
for ifl in file_list:
65+
self._summary.add_file(ifl)
66+
self._servicex.put_file_add_bulk(file_list)
6367

6468

65-
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],
66-
user_callback: UserDIDHandler):
69+
async def run_file_fetch_loop(
70+
did: str,
71+
servicex: ServiceXAdapter,
72+
info: Dict[str, Any],
73+
user_callback: UserDIDHandler,
74+
):
6775
start_time = datetime.now()
6876

6977
summary = DIDSummary(did)
@@ -79,18 +87,19 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
7987
acc.send_bulk(file_info)
8088

8189
except Exception:
82-
if did_info.get_mode == 'all':
90+
if did_info.get_mode == "all":
8391
raise
8492

8593
# If we've been holding onto any files, we need to send them now.
8694
acc.send_on(did_info.file_count)
8795

8896
# Simple error checking and reporting
8997
if summary.file_count == 0:
90-
servicex.post_status_update(f'DID Finder found zero files for dataset {did}',
91-
severity='fatal')
98+
servicex.post_status_update(
99+
f"DID Finder found zero files for dataset {did}", severity="fatal"
100+
)
92101

93-
elapsed_time = int((datetime.now()-start_time).total_seconds())
102+
elapsed_time = int((datetime.now() - start_time).total_seconds())
94103
servicex.put_fileset_complete(
95104
{
96105
"files": summary.file_count,
@@ -101,12 +110,13 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
101110
}
102111
)
103112

104-
servicex.post_status_update(f'Completed load of files in {elapsed_time} seconds')
113+
servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds")
105114

106115

107-
def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body,
108-
file_prefix=None):
109-
'''rabbit_mq_callback Respond to RabbitMQ Message
116+
def rabbit_mq_callback(
117+
user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None
118+
):
119+
"""rabbit_mq_callback Respond to RabbitMQ Message
110120
111121
When a request to resolve a DID comes into the DID finder, we
112122
respond with this callback. This callback will remain active
@@ -119,19 +129,21 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
119129
properties ([type]): Properties of the message
120130
body ([type]): The body (json for us) of the message
121131
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
122-
'''
132+
"""
123133
request_id = None # set this in case we get an exception while loading request
124134
try:
125135
# Unpack the message. Really bad if we fail up here!
126136
did_request = json.loads(body)
127-
did = did_request['did']
128-
request_id = did_request['request_id']
129-
__logging.info(f'Received DID request {did_request}', extra={'requestId': request_id})
130-
servicex = ServiceXAdapter(did_request['service-endpoint'], file_prefix)
137+
did = did_request["did"]
138+
request_id = did_request["request_id"]
139+
__logging.info(
140+
f"Received DID request {did_request}", extra={"requestId": request_id}
141+
)
142+
servicex = ServiceXAdapter(did_request["service-endpoint"], file_prefix)
131143
servicex.post_status_update("DID Request received")
132144

133145
info = {
134-
'request-id': request_id,
146+
"request-id": request_id,
135147
}
136148

137149
# Process the request and resolve the DID
@@ -140,23 +152,30 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
140152

141153
except Exception as e:
142154
_, exec_value, _ = sys.exc_info()
143-
__logging.exception('DID Request Failed', extra={'requestId': request_id})
144-
servicex.post_status_update(f'DID Request Failed for id {request_id}: '
145-
f'{str(e)} - {exec_value}',
146-
severity='fatal')
155+
__logging.exception("DID Request Failed", extra={"requestId": request_id})
156+
servicex.post_status_update(
157+
f"DID Request Failed for id {request_id}: " f"{str(e)} - {exec_value}",
158+
severity="fatal",
159+
)
147160
raise
148161

149162
except Exception as e:
150-
__logging.exception(f'DID request failed {str(e)}', extra={'requestId': request_id})
163+
__logging.exception(
164+
f"DID request failed {str(e)}", extra={"requestId": request_id}
165+
)
151166

152167
finally:
153168
channel.basic_ack(delivery_tag=method.delivery_tag)
154169

155170

156-
def init_rabbit_mq(user_callback: UserDIDHandler,
157-
rabbitmq_url: str, queue_name: str, retries: int,
158-
retry_interval: float,
159-
file_prefix: str = None): # type: ignore
171+
def init_rabbit_mq(
172+
user_callback: UserDIDHandler,
173+
rabbitmq_url: str,
174+
queue_name: str,
175+
retries: int,
176+
retry_interval: float,
177+
file_prefix: str = None,
178+
): # type: ignore
160179
rabbitmq = None
161180
retry_count = 0
162181

@@ -168,28 +187,35 @@ def init_rabbit_mq(user_callback: UserDIDHandler,
168187

169188
__logging.info("Connected to RabbitMQ. Ready to start consuming requests")
170189

171-
_channel.basic_consume(queue=queue_name,
172-
auto_ack=False,
173-
on_message_callback=lambda c, m, p, b:
174-
rabbit_mq_callback(user_callback, c, m, p, b, file_prefix))
190+
_channel.basic_consume(
191+
queue=queue_name,
192+
auto_ack=False,
193+
on_message_callback=lambda c, m, p, b: rabbit_mq_callback(
194+
user_callback, c, m, p, b, file_prefix
195+
),
196+
)
175197
_channel.start_consuming()
176198

177199
except pika.exceptions.AMQPConnectionError: # type: ignore
178200
rabbitmq = None
179201
retry_count += 1
180202
if retry_count <= retries:
181-
__logging.exception(f'Failed to connect to RabbitMQ at {rabbitmq_url} '
182-
f'(try #{retry_count}). Waiting {retry_interval} seconds '
183-
'before trying again')
203+
__logging.exception(
204+
f"Failed to connect to RabbitMQ at {rabbitmq_url} "
205+
f"(try #{retry_count}). Waiting {retry_interval} seconds "
206+
"before trying again"
207+
)
184208
time.sleep(retry_interval)
185209
else:
186-
__logging.exception(f'Failed to connect to RabbitMQ. Giving Up after {retry_count}'
187-
' tries')
210+
__logging.exception(
211+
f"Failed to connect to RabbitMQ. Giving Up after {retry_count}"
212+
" tries"
213+
)
188214
raise
189215

190216

191217
def add_did_finder_cnd_arguments(parser: argparse.ArgumentParser):
192-
'''add_did_finder_cnd_arguments Add required arguments to a parser
218+
"""add_did_finder_cnd_arguments Add required arguments to a parser
193219
194220
If you need to parse command line arguments for some special configuration, create your
195221
own argument parser, and call this function to make sure the arguments needed
@@ -200,17 +226,26 @@ def add_did_finder_cnd_arguments(parser: argparse.ArgumentParser):
200226
Args:
201227
parser (argparse.ArgumentParser): The argument parser. Arguments needed for the
202228
did finder/servicex communication will be added.
203-
'''
204-
parser.add_argument('--rabbit-uri', dest="rabbit_uri", action='store', required=True)
205-
parser.add_argument('--prefix', dest="prefix", action='store', required=False,
206-
default="",
207-
help='Prefix to add to use a caching proxy for URIs')
229+
"""
230+
parser.add_argument(
231+
"--rabbit-uri", dest="rabbit_uri", action="store", required=True
232+
)
233+
parser.add_argument(
234+
"--prefix",
235+
dest="prefix",
236+
action="store",
237+
required=False,
238+
default="",
239+
help="Prefix to add to use a caching proxy for URIs",
240+
)
208241

209242

210-
def start_did_finder(did_finder_name: str,
211-
callback: UserDIDHandler,
212-
parsed_args: Optional[argparse.Namespace] = None):
213-
'''start_did_finder Start the DID finder
243+
def start_did_finder(
244+
did_finder_name: str,
245+
callback: UserDIDHandler,
246+
parsed_args: Optional[argparse.Namespace] = None,
247+
):
248+
"""start_did_finder Start the DID finder
214249
215250
Top level method that starts the DID finder, hooking it up to rabbitmq queues, etc.,
216251
and sets up the callback to be called each time ServiceX wants to render a DID into
@@ -231,7 +266,7 @@ def start_did_finder(did_finder_name: str,
231266
Defaults to None (automatically
232267
parses)
233268
command line arguments, create
234-
'''
269+
"""
235270
# Setup command line parsing
236271
if parsed_args is None:
237272
parser = argparse.ArgumentParser()
@@ -242,8 +277,10 @@ def start_did_finder(did_finder_name: str,
242277
initialize_root_logger(did_finder_name)
243278

244279
# Start up rabbit mq and also callbacks
245-
init_rabbit_mq(callback,
246-
parsed_args.rabbit_uri,
247-
f'{did_finder_name}{QUEUE_NAME_POSTFIX}',
248-
retries=12,
249-
retry_interval=10)
280+
init_rabbit_mq(
281+
callback,
282+
parsed_args.rabbit_uri,
283+
f"{did_finder_name}{QUEUE_NAME_POSTFIX}",
284+
retries=12,
285+
retry_interval=10,
286+
)

0 commit comments

Comments
 (0)