Skip to content

remove s3 bucket polling #1049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions servicex_app/migrations/versions/1.6.2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Revision ID: v1_6_2
Revises: v1_5_7a
Create Date: 2025-05-13

"""
from alembic import op
import sqlalchemy as sa

revision = 'v1_6_2'
down_revision = 'v1_5_7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('transform_result', sa.Column('created_at', sa.DateTime(), nullable=True))
op.execute('UPDATE transform_result SET created_at = CURRENT_TIMESTAMP')
op.alter_column('transform_result', 'created_at', nullable=False)

op.create_index('ix_transform_result_created_at', 'transform_result', ['created_at'])


def downgrade():
op.drop_index('ix_transform_result_created_at', table_name='transform_result')

op.drop_column('transform_result', 'created_at')
6 changes: 4 additions & 2 deletions servicex_app/servicex_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ class TransformationResult(db.Model):
total_events = db.Column(db.BigInteger, nullable=True)
total_bytes = db.Column(db.BigInteger, nullable=True)
avg_rate = db.Column(db.Float, nullable=True)
created_at = db.Column(DateTime, default=func.now())

__table_args__ = (
db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'),
Expand All @@ -376,7 +377,8 @@ def to_json(cls, x):
'transform_time': x.transform_time,
'total-events': x.total_events,
'total-bytes': x.total_bytes,
'avg-rate': x.avg_rate
'avg-rate': x.avg_rate,
'created_at': x.created_at.isoformat() if x.created_at else None,
}

def save_to_db(self):
Expand Down Expand Up @@ -470,7 +472,7 @@ def to_json(self):
'adler32': self.adler32,
'file_size': self.file_size,
'file_events': self.file_events,
'paths': self.paths
'paths': self.paths,
}

def save_to_db(self):
Expand Down
43 changes: 43 additions & 0 deletions servicex_app/servicex_app/resources/transformation/results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
import datetime

from flask_restful import reqparse

from servicex_app.decorators import auth_required
from servicex_app.models import TransformationResult
from servicex_app.resources.servicex_resource import ServiceXResource

logger = logging.getLogger(__name__)


class TransformationResults(ServiceXResource):
@auth_required
def get(self, request_id):
if not request_id:
return {"message": "Missing required transformation request_id"}, 400

parser = reqparse.RequestParser()
parser.add_argument(
'begin_at',
type=str,
required=False,
location='args'
)

args = parser.parse_args()

transform_result_query = TransformationResult.query.filter_by(request_id=request_id)

begin_at_str = args.get('begin_at')
if begin_at_str:
try:
begin_at = datetime.datetime.fromisoformat(begin_at_str)
except AttributeError:
return {"message": f"begin_at value {begin_at_str} is not an ISO 8601 compliant datetime"}, 400
transform_result_query = transform_result_query.filter(TransformationResult.created_at > begin_at)

results = [transformation_result.to_json(transformation_result) for transformation_result in transform_result_query]

return {
"results": results
}
2 changes: 2 additions & 0 deletions servicex_app/servicex_app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
from servicex_app.resources.transformation.get_all import AllTransformationRequests
from servicex_app.resources.transformation.get_one import TransformationRequest
from servicex_app.resources.transformation.deployment import DeploymentStatus
from servicex_app.resources.transformation.results import TransformationResults

from servicex_app.resources.users.all_users import AllUsers
from servicex_app.resources.users.token_refresh import TokenRefresh
Expand Down Expand Up @@ -139,6 +140,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
api.add_resource(AllTransformationRequests, prefix)
prefix += "/<string:request_id>"
api.add_resource(TransformationRequest, prefix)
api.add_resource(TransformationResults, prefix + '/results')

DeleteTransform.make_api(object_store)
api.add_resource(DeleteTransform, prefix)
Expand Down