Skip to content

Commit 1dcf503

Browse files
MattShirleyponyisi
authored andcommitted
remove s3 bucket polling (#1049)
* add backend route plus model, DB migration changes, tests, add capability
1 parent 99a3cd5 commit 1dcf503

File tree

12 files changed

+286
-27
lines changed

12 files changed

+286
-27
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""
2+
Revision ID: v1_7_0
3+
Revises: v1_5_7a
4+
Create Date: 2025-05-13
5+
6+
"""
7+
from alembic import op
8+
import sqlalchemy as sa
9+
10+
revision = 'v1_7_0'
11+
down_revision = 'v1_5_7a'
12+
branch_labels = None
13+
depends_on = None
14+
15+
16+
def upgrade():
17+
op.add_column('transform_result', sa.Column('created_at', sa.DateTime(), nullable=True))
18+
op.execute('UPDATE transform_result SET created_at = CURRENT_TIMESTAMP')
19+
op.alter_column('transform_result', 'created_at', nullable=False)
20+
op.create_index('ix_transform_result_created_at', 'transform_result', ['created_at'])
21+
22+
op.add_column('transform_result', sa.Column('s3_object_name', sa.String(length=512), nullable=True))
23+
24+
op.create_index('ix_transform_result_request_id', 'transform_result', ['request_id'])
25+
26+
27+
def downgrade():
28+
op.drop_index('ix_transform_result_created_at', table_name='transform_result')
29+
op.drop_column('transform_result', 'created_at')
30+
31+
op.drop_column('transform_result', 's3_object_name')
32+
33+
op.drop_index('ix_transform_result_request_id', table_name='transform_result')
34+

servicex_app/servicex_app/models.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ class TransformationResult(db.Model):
356356
total_events = db.Column(db.BigInteger, nullable=True)
357357
total_bytes = db.Column(db.BigInteger, nullable=True)
358358
avg_rate = db.Column(db.Float, nullable=True)
359+
created_at = db.Column(DateTime, default=func.now())
360+
s3_object_name = db.Column(db.String(512), unique=False, nullable=True)
359361

360362
__table_args__ = (
361363
db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'),
@@ -372,11 +374,13 @@ def to_json(cls, x):
372374
'request-id': x.request_id,
373375
'file-id': x.id,
374376
'file-path': x.file_path,
377+
's3-object-name': x.s3_object_name,
375378
'transform_status': x.transform_status,
376379
'transform_time': x.transform_time,
377380
'total-events': x.total_events,
378381
'total-bytes': x.total_bytes,
379-
'avg-rate': x.avg_rate
382+
'avg-rate': x.avg_rate,
383+
'created_at': x.created_at.isoformat() if x.created_at else None,
380384
}
381385

382386
def save_to_db(self):
@@ -470,7 +474,7 @@ def to_json(self):
470474
'adler32': self.adler32,
471475
'file_size': self.file_size,
472476
'file_events': self.file_events,
473-
'paths': self.paths
477+
'paths': self.paths,
474478
}
475479

476480
def save_to_db(self):

servicex_app/servicex_app/resources/info.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,7 @@ def get(self):
3434
return {
3535
"app-version": self._get_app_version(),
3636
"code-gen-image": current_app.config['CODE_GEN_IMAGES'],
37-
"capabilities": []
37+
"capabilities": [
38+
"poll_local_transformation_results"
39+
]
3840
}

servicex_app/servicex_app/resources/internal/transformer_file_complete.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ def save_transform_result(request_id: str, info: dict[str, str], session: Sessio
177177
transform_time=info['total-time'],
178178
total_bytes=info['total-bytes'],
179179
total_events=info['total-events'],
180-
avg_rate=info['avg-rate']
180+
avg_rate=info['avg-rate'],
181+
s3_object_name=info['s3-object-name'],
181182
)
182183
session.add(rec)
183184

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
import datetime
3+
4+
from flask_restful import reqparse
5+
6+
from servicex_app.decorators import auth_required
7+
from servicex_app.models import TransformationResult
8+
from servicex_app.resources.servicex_resource import ServiceXResource
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class TransformationResults(ServiceXResource):
14+
@auth_required
15+
def get(self, request_id):
16+
if not request_id:
17+
return {"message": "Missing required transformation request_id"}, 400
18+
19+
parser = reqparse.RequestParser()
20+
parser.add_argument(
21+
'later_than',
22+
type=str,
23+
required=False,
24+
location='args'
25+
)
26+
27+
args = parser.parse_args()
28+
29+
transform_result_query = TransformationResult.query.filter_by(request_id=request_id)
30+
31+
later_than_str = args.get('later_than')
32+
if later_than_str:
33+
try:
34+
later_than = datetime.datetime.fromisoformat(later_than_str)
35+
except (AttributeError, ValueError):
36+
return {"message": f"later_than value {later_than_str} is not an ISO 8601 compliant datetime"}, 400
37+
transform_result_query = transform_result_query.filter(TransformationResult.created_at > later_than)
38+
39+
results = [transformation_result.to_json(transformation_result) for transformation_result in transform_result_query]
40+
41+
return {
42+
"results": results
43+
}

servicex_app/servicex_app/routes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
5151
from servicex_app.resources.transformation.get_all import AllTransformationRequests
5252
from servicex_app.resources.transformation.get_one import TransformationRequest
5353
from servicex_app.resources.transformation.deployment import DeploymentStatus
54+
from servicex_app.resources.transformation.results import TransformationResults
5455

5556
from servicex_app.resources.users.all_users import AllUsers
5657
from servicex_app.resources.users.token_refresh import TokenRefresh
@@ -139,6 +140,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
139140
api.add_resource(AllTransformationRequests, prefix)
140141
prefix += "/<string:request_id>"
141142
api.add_resource(TransformationRequest, prefix)
143+
api.add_resource(TransformationResults, prefix + '/results')
142144

143145
DeleteTransform.make_api(object_store)
144146
api.add_resource(DeleteTransform, prefix)

servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ def file_complete_response(self):
7878
'total-time': 100,
7979
'total-events': 10000,
8080
'total-bytes': 325683,
81-
'avg-rate': 30.2
81+
'avg-rate': 30.2,
82+
's3-object-name': 'file://s3-object-name',
8283
}
8384

8485
def test_put_transform_file_complete_files_remaining(self,

servicex_app/servicex_app_test/resources/test_servicex_info.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,7 @@ def test_get_info(self, client, mock_app_version):
4141
'python': 'sslhep/servicex_code_gen_python:develop',
4242
'uproot': 'sslhep/servicex_code_gen_func_adl_uproot:develop'
4343
},
44-
'capabilities': []
44+
'capabilities': [
45+
'poll_local_transformation_results'
46+
]
4547
} # noqa: E501
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from pytest import fixture
2+
from datetime import datetime, timedelta
3+
4+
from servicex_app.code_gen_adapter import CodeGenAdapter
5+
from servicex_app_test.resource_test_base import ResourceTestBase
6+
from servicex_app.models import TransformationResult
7+
8+
9+
class TestTransformationResults(ResourceTestBase):
10+
@staticmethod
11+
def _generate_transformation_request(**kwargs):
12+
request = {
13+
"did": "123-45-678",
14+
"selection": "test-string",
15+
"result-destination": "object-store",
16+
"result-format": "root-file",
17+
"workers": 10,
18+
"codegen": "atlasxaod",
19+
}
20+
request.update(kwargs)
21+
return request
22+
23+
@fixture
24+
def mock_codegen(self, mocker):
25+
mock_code_gen = mocker.MagicMock(CodeGenAdapter)
26+
mock_code_gen.generate_code_for_selection.return_value = (
27+
"my-cm",
28+
"ssl-hep/func_adl:latest",
29+
"bash",
30+
"echo",
31+
)
32+
return mock_code_gen
33+
34+
@fixture
35+
def mock_transformation_result(self, mocker):
36+
return mocker.patch(
37+
"servicex_app.resources.transformation.results.TransformationResult"
38+
)
39+
40+
@staticmethod
41+
def sample_results():
42+
"""Return a list of sample transformation results"""
43+
results = []
44+
base_time = datetime.utcnow()
45+
46+
for i in range(3):
47+
result = TransformationResult(
48+
id=i + 1,
49+
file_id=i + 1,
50+
file_path=f"/output/result_{i}.root",
51+
request_id="test-request-id",
52+
transform_status="complete",
53+
transform_time=120 + (i * 10),
54+
total_events=1000 * (i + 1),
55+
total_bytes=2048 * (i + 1),
56+
avg_rate=10.5 + i,
57+
s3_object_name=f"bucket/result_{i}.root",
58+
created_at=base_time - timedelta(minutes=i * 10),
59+
)
60+
results.append(result)
61+
62+
return results
63+
64+
def test_get_results_nonexistent_request_id(
65+
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
66+
):
67+
"""Test getting results for non-existent request_id."""
68+
client = self._test_client(
69+
rabbit_adaptor=mock_rabbit_adaptor,
70+
code_gen_service=mock_codegen,
71+
celery_app=mock_celery_app,
72+
)
73+
74+
response = client.get("/servicex/transformation/non-existent-id/results")
75+
76+
assert response.status_code == 200
77+
data = response.json
78+
79+
assert "results" in data
80+
assert isinstance(data["results"], list)
81+
assert len(data["results"]) == 0
82+
83+
def test_get_results_missing_request_id(
84+
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
85+
):
86+
"""Test getting results with missing request_id parameter."""
87+
client = self._test_client(
88+
rabbit_adaptor=mock_rabbit_adaptor,
89+
code_gen_service=mock_codegen,
90+
celery_app=mock_celery_app,
91+
)
92+
93+
response = client.get("/servicex/transformation/results")
94+
95+
assert response.status_code == 404
96+
97+
def test_get_results_with_samples(
98+
self,
99+
mock_rabbit_adaptor,
100+
mock_codegen,
101+
mock_celery_app,
102+
mock_transformation_result,
103+
):
104+
"""Test getting results with mock sample results."""
105+
client = self._test_client(
106+
rabbit_adaptor=mock_rabbit_adaptor,
107+
code_gen_service=mock_codegen,
108+
celery_app=mock_celery_app,
109+
)
110+
111+
mock_query = mock_transformation_result.query
112+
mock_filtered = mock_query.filter_by.return_value
113+
sample_results = self.sample_results()
114+
mock_filtered.__iter__ = lambda mock_self: iter(sample_results)
115+
116+
response = client.get("/servicex/transformation/test-request-id/results")
117+
118+
assert response.status_code == 200
119+
data = response.json
120+
121+
assert "results" in data
122+
assert isinstance(data["results"], list)
123+
assert (
124+
len(data["results"]) == 3
125+
) # We expect 3 results from our sample_results function
126+
127+
result = data["results"][0]
128+
assert "request-id" in result
129+
assert "file-path" in result
130+
assert "transform_status" in result
131+
assert "transform_time" in result
132+
assert "total-events" in result
133+
assert "total-bytes" in result
134+
assert "avg-rate" in result
135+
136+
mock_query.filter_by.assert_called_with(request_id="test-request-id")
137+
138+
def test_get_results_with_invalid_later_than_format(
139+
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
140+
):
141+
"""Test later_than parameter with invalid datetime format."""
142+
client = self._test_client(
143+
rabbit_adaptor=mock_rabbit_adaptor,
144+
code_gen_service=mock_codegen,
145+
celery_app=mock_celery_app,
146+
)
147+
148+
response = client.get(
149+
"/servicex/transformation/any-request-id/results",
150+
query_string={"later_than": "invalid-datetime-format"},
151+
)
152+
153+
assert response.status_code == 400
154+
data = response.json
155+
156+
assert "message" in data
157+
assert (
158+
"later_than value invalid-datetime-format is not an ISO 8601 compliant datetime"
159+
in data["message"]
160+
)

transformer_sidecar/src/transformer_sidecar/servicex_adapter.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,16 @@
4747

4848
class FileCompleteRecord:
4949
def __init__(self, request_id: str, file_path: str, file_id: int, status: str,
50-
total_time: float, total_events: int, total_bytes: int):
50+
total_time: float, total_events: int, total_bytes: int, s3_object_name: str):
5151
assert request_id, "request_id is required"
5252
assert file_path, "file_path is required"
53+
assert s3_object_name, "s3_object_name is required"
5354
assert file_id, "file_id is required"
5455
assert status, "status is required"
5556

5657
self.request_id = request_id
5758
self.file_path = file_path
59+
self.s3_object_name = s3_object_name
5860
self.file_id = file_id
5961
self.status = status
6062
self.total_time = total_time
@@ -66,6 +68,7 @@ def to_json(self) -> dict[str, Any]:
6668
return {
6769
"requestId": self.request_id,
6870
"file-path": self.file_path,
71+
"s3-object-name": self.s3_object_name,
6972
"file-id": self.file_id,
7073
"status": self.status,
7174
"total-time": self.total_time,

0 commit comments

Comments
 (0)