Skip to content

Commit b724501

Browse files
committed
add s3 object name
1 parent 6bbcce7 commit b724501

File tree

6 files changed

+30
-13
lines changed

6 files changed

+30
-13
lines changed
Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""
2-
Revision ID: v1_6_2
2+
Revision ID: v1_7_0
33
Revises: v1_5_7a
44
Create Date: 2025-05-13
55
66
"""
77
from alembic import op
88
import sqlalchemy as sa
99

10-
revision = 'v1_6_2'
10+
revision = 'v1_7_0'
1111
down_revision = 'v1_5_7a'
1212
branch_labels = None
1313
depends_on = None
@@ -17,11 +17,20 @@ def upgrade():
1717
op.add_column('transform_result', sa.Column('created_at', sa.DateTime(), nullable=True))
1818
op.execute('UPDATE transform_result SET created_at = CURRENT_TIMESTAMP')
1919
op.alter_column('transform_result', 'created_at', nullable=False)
20-
2120
op.create_index('ix_transform_result_created_at', 'transform_result', ['created_at'])
2221

22+
op.add_column('transform_result', sa.Column('s3_object_name', sa.String(length=512), nullable=True))
23+
op.create_index('ix_transform_result_s3_object_name', 'transform_result', ['s3_object_name'])
24+
25+
op.create_index('ix_transform_result_request_id', 'transform_result', ['request_id'])
26+
2327

2428
def downgrade():
2529
op.drop_index('ix_transform_result_created_at', table_name='transform_result')
26-
2730
op.drop_column('transform_result', 'created_at')
31+
32+
op.drop_index('ix_transform_result_s3_object_name', table_name='transform_result')
33+
op.drop_column('transform_result', 's3_object_name')
34+
35+
op.drop_index('ix_transform_result_request_id', table_name='transform_result')
36+

servicex_app/servicex_app/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ class TransformationResult(db.Model):
357357
total_bytes = db.Column(db.BigInteger, nullable=True)
358358
avg_rate = db.Column(db.Float, nullable=True)
359359
created_at = db.Column(DateTime, default=func.now())
360+
s3_object_name = db.Column(db.String(512), unique=False, nullable=True)
360361

361362
__table_args__ = (
362363
db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'),

servicex_app/servicex_app/resources/internal/transformer_file_complete.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ def save_transform_result(request_id: str, info: dict[str, str], session: Sessio
177177
transform_time=info['total-time'],
178178
total_bytes=info['total-bytes'],
179179
total_events=info['total-events'],
180-
avg_rate=info['avg-rate']
180+
avg_rate=info['avg-rate'],
181+
s3_object_name=info['s3-object-name'],
181182
)
182183
session.add(rec)
183184

servicex_app/servicex_app/resources/transformation/results.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def get(self, request_id):
1818

1919
parser = reqparse.RequestParser()
2020
parser.add_argument(
21-
'begin_at',
21+
'later_than',
2222
type=str,
2323
required=False,
2424
location='args'
@@ -28,13 +28,13 @@ def get(self, request_id):
2828

2929
transform_result_query = TransformationResult.query.filter_by(request_id=request_id)
3030

31-
begin_at_str = args.get('begin_at')
32-
if begin_at_str:
31+
later_than_str = args.get('later_than')
32+
if later_than_str:
3333
try:
34-
begin_at = datetime.datetime.fromisoformat(begin_at_str)
34+
later_than = datetime.datetime.fromisoformat(later_than_str)
3535
except AttributeError:
36-
return {"message": f"begin_at value {begin_at_str} is not an ISO 8601 compliant datetime"}, 400
37-
transform_result_query = transform_result_query.filter(TransformationResult.created_at > begin_at)
36+
return {"message": f"later_than value {later_than_str} is not an ISO 8601 compliant datetime"}, 400
37+
transform_result_query = transform_result_query.filter(TransformationResult.created_at > later_than)
3838

3939
results = [transformation_result.to_json(transformation_result) for transformation_result in transform_result_query]
4040

transformer_sidecar/src/transformer_sidecar/servicex_adapter.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,16 @@
4747

4848
class FileCompleteRecord:
4949
def __init__(self, request_id: str, file_path: str, file_id: int, status: str,
50-
total_time: float, total_events: int, total_bytes: int):
50+
total_time: float, total_events: int, total_bytes: int, s3_object_name: str):
5151
assert request_id, "request_id is required"
5252
assert file_path, "file_path is required"
53+
assert s3_object_name, "s3_object_name is required"
5354
assert file_id, "file_id is required"
5455
assert status, "status is required"
5556

5657
self.request_id = request_id
5758
self.file_path = file_path
59+
self.s3_object_name = s3_object_name
5860
self.file_id = file_id
5961
self.status = status
6062
self.total_time = total_time
@@ -66,6 +68,7 @@ def to_json(self) -> dict[str, Any]:
6668
return {
6769
"requestId": self.request_id,
6870
"file-path": self.file_path,
71+
"s3-object-name": self.s3_object_name,
6972
"file-id": self.file_id,
7073
"status": self.status,
7174
"total-time": self.total_time,

transformer_sidecar/src/transformer_sidecar/transformer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def transform_file(
163163
transformer_stats = TransformerStats()
164164
try:
165165
# Loop through the replicas
166-
for _file_path in _file_paths:
166+
for path, _file_path in zip(paths, _file_paths):
167167
logger.info(
168168
"trying to transform file",
169169
extra={
@@ -215,6 +215,7 @@ def transform_file(
215215
rec = FileCompleteRecord(
216216
request_id=request_id,
217217
file_path=_file_path,
218+
s3_object_name=path,
218219
file_id=file_id,
219220
status="success",
220221
total_time=time.time() - total_time,
@@ -256,6 +257,7 @@ def transform_file(
256257
rec = FileCompleteRecord(
257258
request_id=request_id,
258259
file_path=_file_paths[0],
260+
s3_object_name=paths[0],
259261
file_id=file_id,
260262
status="failure",
261263
total_time=time.time() - total_time,
@@ -292,6 +294,7 @@ def transform_file(
292294
rec = FileCompleteRecord(
293295
request_id=request_id,
294296
file_path=_file_paths[0],
297+
s3_object_name=paths[0],
295298
file_id=file_id,
296299
status="failure",
297300
total_time=time.time() - total_time,

0 commit comments

Comments
 (0)