Skip to content

remove s3 bucket polling #1049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
11c652b
add backend route plus model, migration changes
MattShirley Apr 29, 2025
c812133
correct fail state
MattShirley Apr 29, 2025
6d13aa1
remove unused imports
MattShirley Apr 29, 2025
28d906c
handle null times
MattShirley Apr 29, 2025
7a4907e
Update servicex_app/servicex_app/resources/internal/get_dataset_files…
MattShirley Apr 29, 2025
e6619d2
remove blank lines for flake8
MattShirley Apr 29, 2025
1054a97
remove updated_at column
MattShirley Apr 29, 2025
04d10ac
change endpoint to check transformation_results instead of files
MattShirley May 6, 2025
0506106
flake8 compliance
MattShirley May 6, 2025
a1bff03
apply PR feedback
MattShirley May 13, 2025
9b68d50
flake8
MattShirley May 13, 2025
bf20f28
flake8
MattShirley May 13, 2025
6bbcce7
rename cutoff to begin_at
MattShirley May 15, 2025
b724501
add s3 object name
MattShirley May 27, 2025
2ee24f1
add s3-object-name to TransformationResult.to_json
MattShirley May 27, 2025
456c581
test what is in info
MattShirley May 27, 2025
c0f717c
fix breaking tests
MattShirley May 27, 2025
4ff137b
flake8 compliance
MattShirley May 27, 2025
b593c70
test info
MattShirley May 27, 2025
a0969df
fix breaking tests
MattShirley May 27, 2025
008d8c5
add first test
MattShirley May 28, 2025
e0936a2
flake8 compliance
MattShirley May 28, 2025
6eaa001
flake8 compliance
MattShirley May 28, 2025
827dc41
add additional testing
MattShirley Jun 4, 2025
b813196
flake8 compliance
MattShirley Jun 4, 2025
f8281ca
remove unnecessary test assert
MattShirley Jun 4, 2025
c631e9c
Merge branch 'develop' of https://github.com/ssl-hep/ServiceX into 10…
MattShirley Jun 9, 2025
f3743e8
add capability
MattShirley Jun 11, 2025
9325771
Merge branch 'develop' of https://github.com/ssl-hep/ServiceX into 10…
MattShirley Jun 11, 2025
76da340
uncomment capability
MattShirley Jun 11, 2025
66dbc55
fix test
MattShirley Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions servicex_app/migrations/versions/1.7.0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Revision ID: v1_7_0
Revises: v1_5_7a
Create Date: 2025-05-13

"""
from alembic import op
import sqlalchemy as sa

revision = 'v1_7_0'
down_revision = 'v1_5_7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('transform_result', sa.Column('created_at', sa.DateTime(), nullable=True))
op.execute('UPDATE transform_result SET created_at = CURRENT_TIMESTAMP')
op.alter_column('transform_result', 'created_at', nullable=False)
op.create_index('ix_transform_result_created_at', 'transform_result', ['created_at'])

op.add_column('transform_result', sa.Column('s3_object_name', sa.String(length=512), nullable=True))

op.create_index('ix_transform_result_request_id', 'transform_result', ['request_id'])


def downgrade():
op.drop_index('ix_transform_result_created_at', table_name='transform_result')
op.drop_column('transform_result', 'created_at')

op.drop_column('transform_result', 's3_object_name')

op.drop_index('ix_transform_result_request_id', table_name='transform_result')

8 changes: 6 additions & 2 deletions servicex_app/servicex_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ class TransformationResult(db.Model):
total_events = db.Column(db.BigInteger, nullable=True)
total_bytes = db.Column(db.BigInteger, nullable=True)
avg_rate = db.Column(db.Float, nullable=True)
created_at = db.Column(DateTime, default=func.now())
s3_object_name = db.Column(db.String(512), unique=False, nullable=True)

__table_args__ = (
db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'),
Expand All @@ -372,11 +374,13 @@ def to_json(cls, x):
'request-id': x.request_id,
'file-id': x.id,
'file-path': x.file_path,
's3-object-name': x.s3_object_name,
'transform_status': x.transform_status,
'transform_time': x.transform_time,
'total-events': x.total_events,
'total-bytes': x.total_bytes,
'avg-rate': x.avg_rate
'avg-rate': x.avg_rate,
'created_at': x.created_at.isoformat() if x.created_at else None,
}

def save_to_db(self):
Expand Down Expand Up @@ -470,7 +474,7 @@ def to_json(self):
'adler32': self.adler32,
'file_size': self.file_size,
'file_events': self.file_events,
'paths': self.paths
'paths': self.paths,
}

def save_to_db(self):
Expand Down
4 changes: 3 additions & 1 deletion servicex_app/servicex_app/resources/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ def get(self):
return {
"app-version": self._get_app_version(),
"code-gen-image": current_app.config['CODE_GEN_IMAGES'],
"capabilities": []
"capabilities": [
"poll_local_transformation_results"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ def save_transform_result(request_id: str, info: dict[str, str], session: Sessio
transform_time=info['total-time'],
total_bytes=info['total-bytes'],
total_events=info['total-events'],
avg_rate=info['avg-rate']
avg_rate=info['avg-rate'],
s3_object_name=info['s3-object-name'],
)
session.add(rec)

Expand Down
43 changes: 43 additions & 0 deletions servicex_app/servicex_app/resources/transformation/results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
import datetime

from flask_restful import reqparse

from servicex_app.decorators import auth_required
from servicex_app.models import TransformationResult
from servicex_app.resources.servicex_resource import ServiceXResource

logger = logging.getLogger(__name__)


class TransformationResults(ServiceXResource):
@auth_required
def get(self, request_id):
if not request_id:
return {"message": "Missing required transformation request_id"}, 400

parser = reqparse.RequestParser()
parser.add_argument(
'later_than',
type=str,
required=False,
location='args'
)

args = parser.parse_args()

transform_result_query = TransformationResult.query.filter_by(request_id=request_id)

later_than_str = args.get('later_than')
if later_than_str:
try:
later_than = datetime.datetime.fromisoformat(later_than_str)
except (AttributeError, ValueError):
return {"message": f"later_than value {later_than_str} is not an ISO 8601 compliant datetime"}, 400
transform_result_query = transform_result_query.filter(TransformationResult.created_at > later_than)

results = [transformation_result.to_json(transformation_result) for transformation_result in transform_result_query]

return {
"results": results
}
2 changes: 2 additions & 0 deletions servicex_app/servicex_app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
from servicex_app.resources.transformation.get_all import AllTransformationRequests
from servicex_app.resources.transformation.get_one import TransformationRequest
from servicex_app.resources.transformation.deployment import DeploymentStatus
from servicex_app.resources.transformation.results import TransformationResults

from servicex_app.resources.users.all_users import AllUsers
from servicex_app.resources.users.token_refresh import TokenRefresh
Expand Down Expand Up @@ -139,6 +140,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
api.add_resource(AllTransformationRequests, prefix)
prefix += "/<string:request_id>"
api.add_resource(TransformationRequest, prefix)
api.add_resource(TransformationResults, prefix + '/results')

DeleteTransform.make_api(object_store)
api.add_resource(DeleteTransform, prefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def file_complete_response(self):
'total-time': 100,
'total-events': 10000,
'total-bytes': 325683,
'avg-rate': 30.2
'avg-rate': 30.2,
's3-object-name': 'file://s3-object-name',
}

def test_put_transform_file_complete_files_remaining(self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ def test_get_info(self, client, mock_app_version):
'python': 'sslhep/servicex_code_gen_python:develop',
'uproot': 'sslhep/servicex_code_gen_func_adl_uproot:develop'
},
'capabilities': []
'capabilities': [
'poll_local_transformation_results'
]
} # noqa: E501
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from pytest import fixture
from datetime import datetime, timedelta

from servicex_app.code_gen_adapter import CodeGenAdapter
from servicex_app_test.resource_test_base import ResourceTestBase
from servicex_app.models import TransformationResult


class TestTransformationResults(ResourceTestBase):
@staticmethod
def _generate_transformation_request(**kwargs):
request = {
"did": "123-45-678",
"selection": "test-string",
"result-destination": "object-store",
"result-format": "root-file",
"workers": 10,
"codegen": "atlasxaod",
}
request.update(kwargs)
return request

@fixture
def mock_codegen(self, mocker):
mock_code_gen = mocker.MagicMock(CodeGenAdapter)
mock_code_gen.generate_code_for_selection.return_value = (
"my-cm",
"ssl-hep/func_adl:latest",
"bash",
"echo",
)
return mock_code_gen

@fixture
def mock_transformation_result(self, mocker):
return mocker.patch(
"servicex_app.resources.transformation.results.TransformationResult"
)

@staticmethod
def sample_results():
"""Return a list of sample transformation results"""
results = []
base_time = datetime.utcnow()

for i in range(3):
result = TransformationResult(
id=i + 1,
file_id=i + 1,
file_path=f"/output/result_{i}.root",
request_id="test-request-id",
transform_status="complete",
transform_time=120 + (i * 10),
total_events=1000 * (i + 1),
total_bytes=2048 * (i + 1),
avg_rate=10.5 + i,
s3_object_name=f"bucket/result_{i}.root",
created_at=base_time - timedelta(minutes=i * 10),
)
results.append(result)

return results

def test_get_results_nonexistent_request_id(
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
):
"""Test getting results for non-existent request_id."""
client = self._test_client(
rabbit_adaptor=mock_rabbit_adaptor,
code_gen_service=mock_codegen,
celery_app=mock_celery_app,
)

response = client.get("/servicex/transformation/non-existent-id/results")

assert response.status_code == 200
data = response.json

assert "results" in data
assert isinstance(data["results"], list)
assert len(data["results"]) == 0

def test_get_results_missing_request_id(
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
):
"""Test getting results with missing request_id parameter."""
client = self._test_client(
rabbit_adaptor=mock_rabbit_adaptor,
code_gen_service=mock_codegen,
celery_app=mock_celery_app,
)

response = client.get("/servicex/transformation/results")

assert response.status_code == 404

def test_get_results_with_samples(
self,
mock_rabbit_adaptor,
mock_codegen,
mock_celery_app,
mock_transformation_result,
):
"""Test getting results with mock sample results."""
client = self._test_client(
rabbit_adaptor=mock_rabbit_adaptor,
code_gen_service=mock_codegen,
celery_app=mock_celery_app,
)

mock_query = mock_transformation_result.query
mock_filtered = mock_query.filter_by.return_value
sample_results = self.sample_results()
mock_filtered.__iter__ = lambda mock_self: iter(sample_results)

response = client.get("/servicex/transformation/test-request-id/results")

assert response.status_code == 200
data = response.json

assert "results" in data
assert isinstance(data["results"], list)
assert (
len(data["results"]) == 3
) # We expect 3 results from our sample_results function

result = data["results"][0]
assert "request-id" in result
assert "file-path" in result
assert "transform_status" in result
assert "transform_time" in result
assert "total-events" in result
assert "total-bytes" in result
assert "avg-rate" in result

mock_query.filter_by.assert_called_with(request_id="test-request-id")

def test_get_results_with_invalid_later_than_format(
self, mock_rabbit_adaptor, mock_codegen, mock_celery_app
):
"""Test later_than parameter with invalid datetime format."""
client = self._test_client(
rabbit_adaptor=mock_rabbit_adaptor,
code_gen_service=mock_codegen,
celery_app=mock_celery_app,
)

response = client.get(
"/servicex/transformation/any-request-id/results",
query_string={"later_than": "invalid-datetime-format"},
)

assert response.status_code == 400
data = response.json

assert "message" in data
assert (
"later_than value invalid-datetime-format is not an ISO 8601 compliant datetime"
in data["message"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@

class FileCompleteRecord:
def __init__(self, request_id: str, file_path: str, file_id: int, status: str,
total_time: float, total_events: int, total_bytes: int):
total_time: float, total_events: int, total_bytes: int, s3_object_name: str):
assert request_id, "request_id is required"
assert file_path, "file_path is required"
assert s3_object_name, "s3_object_name is required"
assert file_id, "file_id is required"
assert status, "status is required"

self.request_id = request_id
self.file_path = file_path
self.s3_object_name = s3_object_name
self.file_id = file_id
self.status = status
self.total_time = total_time
Expand All @@ -66,6 +68,7 @@ def to_json(self) -> dict[str, Any]:
return {
"requestId": self.request_id,
"file-path": self.file_path,
"s3-object-name": self.s3_object_name,
"file-id": self.file_id,
"status": self.status,
"total-time": self.total_time,
Expand Down
5 changes: 4 additions & 1 deletion transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def transform_file(
transformer_stats = TransformerStats()
try:
# Loop through the replicas
for _file_path in _file_paths:
for path, _file_path in zip(paths, _file_paths):
logger.info(
"trying to transform file",
extra={
Expand Down Expand Up @@ -215,6 +215,7 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_path,
s3_object_name=path,
file_id=file_id,
status="success",
total_time=time.time() - total_time,
Expand Down Expand Up @@ -256,6 +257,7 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_paths[0],
s3_object_name=paths[0],
file_id=file_id,
status="failure",
total_time=time.time() - total_time,
Expand Down Expand Up @@ -292,6 +294,7 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_paths[0],
s3_object_name=paths[0],
file_id=file_id,
status="failure",
total_time=time.time() - total_time,
Expand Down
Loading