Skip to content

add proper s3 object sourcing #1092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ def put(self, request_id):
info = request.get_json()
logger = current_app.logger
log_extra = {
'requestId': request_id,
'file-id': info['file-id']
'requestId': request_id,
'file-id': info['file-id'],
's3-object-name': info['s3-object-name'],
}

logger.info("FileComplete", extra={**log_extra, 'metric': info})
Expand Down
3 changes: 0 additions & 3 deletions servicex_app/servicex_app/resources/transformation/results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import datetime

from flask_restful import reqparse
Expand All @@ -7,8 +6,6 @@
from servicex_app.models import TransformationResult
from servicex_app.resources.servicex_resource import ServiceXResource

logger = logging.getLogger(__name__)


class TransformationResults(ServiceXResource):
@auth_required
Expand Down
11 changes: 7 additions & 4 deletions transformer_sidecar/src/transformer_sidecar/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ def put_file_complete(self, rec: FileCompleteRecord):
fkwargs={"json": rec.to_json(), "timeout": (0.5, None)},
tries=MAX_RETRIES,
delay=RETRY_DELAY)
self.logger.info("Put file complete.", extra={'requestId': rec.request_id,
"file-id": rec.file_id,
"place": PLACE,
"file_path": rec.file_path})
self.logger.info("Put file complete.", extra={
'requestId': rec.request_id,
"file-id": rec.file_id,
"place": PLACE,
"file_path": rec.file_path,
"s3-object-name": rec.s3_object_name,
})
except requests.exceptions.ConnectionError:
self.logger.exception("Connection Error in put_file_complete",
extra={'requestId': rec.request_id,
Expand Down
20 changes: 11 additions & 9 deletions transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def transform_file(
transformer_stats = TransformerStats()
try:
# Loop through the replicas
for path, _file_path in zip(paths, _file_paths):
for _file_path in _file_paths:
logger.info(
"trying to transform file",
extra={
Expand Down Expand Up @@ -215,19 +215,19 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_path,
s3_object_name=path,
s3_object_name="none",
file_id=file_id,
status="success",
total_time=time.time() - total_time,
total_events=transformer_stats.total_events,
total_bytes=os.path.getsize(output_path),
)

if object_store:
upload_file(Path(transform_request["safeOutputFileName"]),
servicex,
rec
)
upload_file(
Path(transform_request["safeOutputFileName"]),
servicex,
rec
)
else:
servicex.put_file_complete(rec)

Expand Down Expand Up @@ -257,8 +257,8 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_paths[0],
s3_object_name=paths[0],
file_id=file_id,
s3_object_name="none",
status="failure",
total_time=time.time() - total_time,
total_events=0,
Expand Down Expand Up @@ -294,7 +294,7 @@ def transform_file(
rec = FileCompleteRecord(
request_id=request_id,
file_path=_file_paths[0],
s3_object_name=paths[0],
s3_object_name="none",
file_id=file_id,
status="failure",
total_time=time.time() - total_time,
Expand Down Expand Up @@ -354,6 +354,8 @@ def upload_file(source_path: Path,
file_to_upload = source_path
object_name = source_path.name

rec.s3_object_name = object_name

logger.info("Uploading file to object store.",
extra={'requestId': request_id,
"file-id": rec.file_id,
Expand Down