Skip to content

Commit dca0ef9

Browse files
authored
Merge branch 'big-db-change-2' (#28)
1 parent de0e8cd commit dca0ef9

File tree

9 files changed

+56
-162
lines changed

9 files changed

+56
-162
lines changed

.github/workflows/ci.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,20 @@ jobs:
1212
test:
1313
strategy:
1414
matrix:
15-
python-version: [3.7, 3.8, 3.9]
15+
python-version: [3.8, 3.9]
1616
runs-on: ubuntu-latest
1717

1818
steps:
19-
- uses: actions/checkout@master
19+
- uses: actions/checkout@v4.0.0
2020
- name: Set up Python ${{ matrix.python-version }}
21-
uses: actions/setup-python@v2
21+
uses: actions/setup-python@v4.7.0
2222
with:
2323
python-version: ${{ matrix.python-version }}
2424
- name: Install poetry
25-
uses: Gr1N/setup-poetry@v7
25+
uses: Gr1N/setup-poetry@v8
2626
- name: Install dependencies
2727
run: |
28-
poetry install
28+
poetry install --with dev
2929
pip list
3030
- name: Lint with Flake8
3131
run: |
@@ -36,7 +36,7 @@ jobs:
3636
poetry run coverage xml
3737
- name: Report coverage using codecov
3838
if: github.event_name == 'push' && matrix.python-version == 3.8
39-
uses: codecov/codecov-action@v1
39+
uses: codecov/codecov-action@v3.1.4
4040
with:
4141
file: ./coverage.xml # optional
4242
flags: unittests # optional

.github/workflows/pypi.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ jobs:
99
runs-on: ubuntu-latest
1010

1111
steps:
12-
- uses: actions/checkout@v2
12+
- uses: actions/checkout@v4.0.0
1313
- name: Set up Python
14-
uses: actions/setup-python@v2
14+
uses: actions/setup-python@v4.7.0
1515
with:
1616
python-version: 3.8
1717
- name: Get the version
1818
id: get_version
1919
run: echo ::set-output name=VERSION::${GITHUB_REF#refs/tags/}
2020
- name: Install poetry
21-
uses: Gr1N/setup-poetry@v7
21+
uses: Gr1N/setup-poetry@v8
2222
- name: Build
2323
run: |
2424
poetry version ${{ steps.get_version.outputs.VERSION }}
2525
- name: Build and publish to pypi
2626
uses: JRubics/poetry-publish@v1.6
2727
with:
28-
pypi_token: ${{ secrets.pypi_password_sx_did_lib }}
28+
pypi_token: ${{ secrets.pypi_password_sx_did_lib }}

README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ Create an async callback method that `yield`s file info dictionaries. For exampl
2828
The arguments to the method are straight forward:
2929

3030
* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
31-
* `info` contains a dict of various info about the request that asked for this DID:
32-
* `request-id` The request id that has this DID associated. For logging.
31+
* `info` contains a dict of various info about the request that asked for this DID.
3332

3433
Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows:
3534

@@ -114,7 +113,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
114113
__log = logger.getLogger(__name__)
115114
async def my_callback(did_name: str, info: Dict[str, Any]):
116115
__log.info(f'Looking up dataset {did_name}.',
117-
extra={'requestId': info['request-id']})
116+
extra={'somethign': info['something']})
118117

119118
for i in range(0, 10):
120119
yield {
@@ -125,9 +124,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
125124
}
126125
```
127126

128-
Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier.
129-
130-
The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them.
127+
The `start_did_finder` will configure the python root logger properly.
131128

132129
## URI Format
133130

pyproject.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ description = "ServiceX DID Library Routines"
55
authors = ["Gordon Watts <gwatts@uw.edu>"]
66

77
[tool.poetry.dependencies]
8-
python = "^3.6"
8+
python = "^3.8"
99
pika = "1.1.0"
1010
make-it-sync = "^1.0.0"
1111
requests = "^2.25.0"
1212

13-
[tool.poetry.dev-dependencies]
14-
pytest = "^5.2"
13+
[tool.poetry.group.dev]
14+
optional = true
15+
16+
[tool.poetry.group.dev.dependencies]
17+
pytest = "^7.4"
1518
flake8 = "^3.9.1"
1619
pytest-mock = "^3.5.1"
1720
coverage = "^5.5"

src/servicex_did_finder_lib/communication.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
__logging = logging.getLogger(__name__)
2727
__logging.addHandler(logging.NullHandler())
2828

29+
# TODO: get rid of different modes. It should always be batch.
30+
2931

3032
class _accumulator:
3133
"Track or cache files depending on the mode we are operating in"
@@ -42,8 +44,6 @@ def add(self, file_info: Dict[str, Any]):
4244
self._file_cache.append(file_info)
4345
else:
4446
self._summary.add_file(file_info)
45-
if self._summary.file_count == 1:
46-
self._servicex.post_transform_start()
4747
self._servicex.put_file_add(file_info)
4848

4949
def send_on(self, count):
@@ -59,8 +59,6 @@ def send_bulk(self, file_list: List[Dict[str, Any]]):
5959
for f in file_list:
6060
self.add(f)
6161
else:
62-
if self._summary.file_count == 0:
63-
self._servicex.post_transform_start()
6462
for ifl in file_list:
6563
self._summary.add_file(ifl)
6664
self._servicex.put_file_add_bulk(file_list)
@@ -93,12 +91,6 @@ async def run_file_fetch_loop(
9391
# If we've been holding onto any files, we need to send them now.
9492
acc.send_on(did_info.file_count)
9593

96-
# Simple error checking and reporting
97-
if summary.file_count == 0:
98-
servicex.post_status_update(
99-
f"DID Finder found zero files for dataset {did}", severity="fatal"
100-
)
101-
10294
elapsed_time = int((datetime.now() - start_time).total_seconds())
10395
servicex.put_fileset_complete(
10496
{
@@ -110,11 +102,9 @@ async def run_file_fetch_loop(
110102
}
111103
)
112104

113-
servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds")
114-
115105

116106
def rabbit_mq_callback(
117-
user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None
107+
user_callback: UserDIDHandler, channel, method, properties, body
118108
):
119109
"""rabbit_mq_callback Respond to RabbitMQ Message
120110
@@ -128,22 +118,23 @@ def rabbit_mq_callback(
128118
method ([type]): Delivery method
129119
properties ([type]): Properties of the message
130120
body ([type]): The body (json for us) of the message
131-
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
132121
"""
133-
request_id = None # set this in case we get an exception while loading request
122+
dataset_id = None # set this in case we get an exception while loading request
134123
try:
135124
# Unpack the message. Really bad if we fail up here!
136125
did_request = json.loads(body)
137126
did = did_request["did"]
138-
request_id = did_request["request_id"]
127+
dataset_id = did_request["dataset_id"]
128+
endpoint = did_request["endpoint"]
139129
__logging.info(
140-
f"Received DID request {did_request}", extra={"requestId": request_id}
130+
f"Received DID request {did_request}",
131+
extra={"dataset_id": dataset_id}
141132
)
142-
servicex = ServiceXAdapter(did_request["service-endpoint"], file_prefix)
143-
servicex.post_status_update("DID Request received")
133+
servicex = ServiceXAdapter(dataset_id=dataset_id,
134+
endpoint=endpoint)
144135

145136
info = {
146-
"request-id": request_id,
137+
"dataset-id": dataset_id,
147138
}
148139

149140
# Process the request and resolve the DID
@@ -152,16 +143,12 @@ def rabbit_mq_callback(
152143

153144
except Exception as e:
154145
_, exec_value, _ = sys.exc_info()
155-
__logging.exception("DID Request Failed", extra={"requestId": request_id})
156-
servicex.post_status_update(
157-
f"DID Request Failed for id {request_id}: " f"{str(e)} - {exec_value}",
158-
severity="fatal",
159-
)
146+
__logging.exception(f"DID Request Failed {str(e)}", extra={"dataset_id": dataset_id})
160147
raise
161148

162149
except Exception as e:
163150
__logging.exception(
164-
f"DID request failed {str(e)}", extra={"requestId": request_id}
151+
f"DID request failed {str(e)}", extra={"dataset_id": dataset_id}
165152
)
166153

167154
finally:
@@ -174,7 +161,6 @@ def init_rabbit_mq(
174161
queue_name: str,
175162
retries: int,
176163
retry_interval: float,
177-
file_prefix: str = None,
178164
): # type: ignore
179165
rabbitmq = None
180166
retry_count = 0
@@ -191,7 +177,7 @@ def init_rabbit_mq(
191177
queue=queue_name,
192178
auto_ack=False,
193179
on_message_callback=lambda c, m, p, b: rabbit_mq_callback(
194-
user_callback, c, m, p, b, file_prefix
180+
user_callback, c, m, p, b
195181
),
196182
)
197183
_channel.start_consuming()

src/servicex_did_finder_lib/did_logging.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
class DIDFormatter(logging.Formatter):
66
"""
7-
Need a customer formatter to allow for logging with request ids that vary.
8-
Normally log messages are "level instance component request_id msg" and
9-
request_id gets set by initialize_logging but we need a handler that'll let
10-
us pass in the request id and have that embedded in the log message
7+
Need a customer formatter to allow for logging with dataset ids that vary.
8+
Normally log messages are "level instance component dataset_id msg" and
9+
dataset_id gets set by initialize_logging but we need a handler that'll let
10+
us pass in the dataset id and have that embedded in the log message
1111
"""
1212

1313
def format(self, record: logging.LogRecord) -> str:
@@ -18,10 +18,10 @@ def format(self, record: logging.LogRecord) -> str:
1818
:return: formatted log message
1919
"""
2020

21-
if hasattr(record, "requestId"):
21+
if hasattr(record, "datasetId"):
2222
return super().format(record)
2323
else:
24-
setattr(record, "requestId", None)
24+
setattr(record, "datasetId", None)
2525
return super().format(record)
2626

2727

@@ -36,7 +36,7 @@ def initialize_root_logger(did_scheme: str):
3636
instance = os.environ.get('INSTANCE_NAME', 'Unknown')
3737
formatter = DIDFormatter('%(levelname)s ' +
3838
f"{instance} {did_scheme}_did_finder " +
39-
'%(requestId)s %(message)s')
39+
'%(datasetId)s %(message)s')
4040
handler = logging.StreamHandler()
4141
handler.setFormatter(formatter)
4242
handler.setLevel(logging.INFO)

src/servicex_did_finder_lib/servicex_adaptor.py

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -35,40 +35,17 @@
3535

3636

3737
class ServiceXAdapter:
38-
def __init__(self, endpoint, file_prefix=None):
38+
def __init__(self, endpoint, dataset_id):
3939
self.endpoint = endpoint
40-
self.file_prefix = file_prefix
40+
self.dataset_id = dataset_id
4141

4242
self.logger = logging.getLogger(__name__)
4343
self.logger.addHandler(logging.NullHandler())
4444

45-
def post_status_update(self, status_msg, severity="info"):
46-
success = False
47-
attempts = 0
48-
while not success and attempts < MAX_RETRIES:
49-
try:
50-
requests.post(self.endpoint + "/status", data={
51-
"timestamp": datetime.now().isoformat(),
52-
"source": "DID Finder",
53-
"severity": severity,
54-
"info": status_msg
55-
})
56-
success = True
57-
except requests.exceptions.ConnectionError:
58-
self.logger.exception(f'Connection error to ServiceX App. Will retry '
59-
f'(try {attempts} out of {MAX_RETRIES}')
60-
attempts += 1
61-
if not success:
62-
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status '
63-
f'message: {str(status_msg)} - Ignoring error.')
64-
65-
def _prefix_file(self, file_path):
66-
return file_path if not self.file_prefix else self.file_prefix+file_path
67-
6845
def _create_json(self, file_info):
6946
return {
7047
"timestamp": datetime.now().isoformat(),
71-
"paths": [self._prefix_file(fp) for fp in file_info['paths']],
48+
"paths": file_info['paths'],
7249
'adler32': file_info['adler32'],
7350
'file_size': file_info['file_size'],
7451
'file_events': file_info['file_events']
@@ -80,8 +57,8 @@ def put_file_add(self, file_info):
8057
while not success and attempts < MAX_RETRIES:
8158
try:
8259
mesg = self._create_json(file_info)
83-
requests.put(self.endpoint + "/files", json=mesg)
84-
self.logger.info(f"Metric: {json.dumps(mesg)}")
60+
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
61+
self.logger.info("adding file:", extra=file_info)
8562
success = True
8663
except requests.exceptions.ConnectionError:
8764
self.logger.exception(f'Connection error to ServiceX App. Will retry '
@@ -103,7 +80,7 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
10380
mesg.append(self._create_json(fi))
10481
while not success and attempts < MAX_RETRIES:
10582
try:
106-
requests.put(self.endpoint + "/files", json=mesg)
83+
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
10784
self.logger.info(f"Metric: {json.dumps(mesg)}")
10885
success = True
10986
except requests.exceptions.ConnectionError:
@@ -114,27 +91,12 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
11491
self.logger.error(f'After {attempts} tries, failed to send ServiceX App '
11592
f'a put_file_bulk message: {mesg} - Ignoring error.')
11693

117-
def post_transform_start(self):
118-
success = False
119-
attempts = 0
120-
while not success and attempts < MAX_RETRIES:
121-
try:
122-
requests.post(self.endpoint + "/start")
123-
success = True
124-
except requests.exceptions.ConnectionError:
125-
self.logger.exception(f'Connection error to ServiceX App. Will retry '
126-
f'(try {attempts} out of {MAX_RETRIES}')
127-
attempts += 1
128-
if not success:
129-
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a '
130-
f'transform start message - Ignoring error.')
131-
13294
def put_fileset_complete(self, summary):
13395
success = False
13496
attempts = 0
13597
while not success and attempts < MAX_RETRIES:
13698
try:
137-
requests.put(self.endpoint + "/complete", json=summary)
99+
requests.put(f"{self.endpoint}{self.dataset_id}/complete", json=summary)
138100
success = True
139101
except requests.exceptions.ConnectionError:
140102
self.logger.exception(f'Connection error to ServiceX App. Will retry '

0 commit comments

Comments
 (0)