Skip to content

remove s3 bucket polling #1049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions servicex_app/migrations/versions/1.6.2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from alembic import op
import sqlalchemy as sa

revision = 'v1_6_2'
down_revision = 'v1_5_7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('files', sa.Column('created_at', sa.DateTime(), nullable=True))
op.add_column('files', sa.Column('updated_at', sa.DateTime(), nullable=True))

op.execute('UPDATE files SET created_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP')

op.alter_column('files', 'created_at', nullable=False)
op.alter_column('files', 'updated_at', nullable=False)

op.create_index('ix_files_created_at', 'files', ['created_at'])
op.create_index('ix_files_updated_at', 'files', ['updated_at'])


def downgrade():
op.drop_index('ix_files_created_at', table_name='files')
op.drop_index('ix_files_updated_at', table_name='files')

op.drop_column('files', 'created_at')
op.drop_column('files', 'updated_at')
6 changes: 5 additions & 1 deletion servicex_app/servicex_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,18 @@ class DatasetFile(db.Model):
file_events = db.Column(db.BigInteger, nullable=True)
paths = db.Column(db.Text(), unique=False, nullable=False)
dataset = relationship("Dataset", back_populates="files")
created_at = db.Column(DateTime, default=func.now())
updated_at = db.Column(DateTime, default=func.now(), onupdate=func.now())

def to_json(self):
return {
'id': self.id,
'adler32': self.adler32,
'file_size': self.file_size,
'file_events': self.file_events,
'paths': self.paths
'paths': self.paths,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
}

def save_to_db(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from datetime import datetime
from flask_restful import reqparse

from servicex_app.decorators import auth_required
from servicex_app.models import DatasetFile
from servicex_app.resources.servicex_resource import ServiceXResource

logger = logging.getLogger(__name__)


class GetDatasetFilesSince(ServiceXResource):
@auth_required
def get(self):
parser = reqparse.RequestParser()
parser.add_argument('cutoff', type=str, location='args', required=False)
args = parser.parse_args()
files_query = DatasetFile.query

if args.get('cutoff'):
cutoff_str = args['cutoff']

try:
if cutoff_str.endswith('Z'):
cutoff_str = cutoff_str[:-1] + '+00:00'

cutoff_datetime = datetime.fromisoformat(cutoff_str)

files_query = files_query.filter(DatasetFile.created_at >= cutoff_datetime)

logger.debug(f"Filtering files created after {cutoff_datetime}")
except ValueError as e:
logger.error(f"Error parsing cutoff datetime: {e}")
return {
'message': f"Invalid cutoff parameter: {cutoff_str}. Must be a valid ISO 8601 datetime (YYYY-MM-DDTHH:MM:SS+00:00)"
}, 400

files = [file.to_json() for file in files_query]
return {
"files": files
}
2 changes: 2 additions & 0 deletions servicex_app/servicex_app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
from servicex_app.resources.internal.fileset_complete import FilesetComplete
from servicex_app.resources.internal.transform_status import TransformationStatusInternal
from servicex_app.resources.internal.transformer_file_complete import TransformerFileComplete
from servicex_app.resources.internal.get_dataset_files_since import GetDatasetFilesSince

from servicex_app.resources.transformation.submit import SubmitTransformationRequest
from servicex_app.resources.transformation.status import TransformationStatus
Expand Down Expand Up @@ -133,6 +134,7 @@ def add_routes(api, transformer_manager, rabbit_mq_adaptor,
api.add_resource(AllDatasets, '/servicex/datasets')
api.add_resource(OneDataset, '/servicex/datasets/<int:dataset_id>')
api.add_resource(DeleteDataset, '/servicex/datasets/<int:dataset_id>')
api.add_resource(GetDatasetFilesSince, '/servicex/datasets/updated-since')

prefix = "/servicex/transformation"
api.add_resource(SubmitTransformationRequest, prefix)
Expand Down