Skip to content

Commit a3aa200

Browse files
authored
fivetran-destination: Flush after each record is written (#26326)
This PR changes the Fivetran Destination to flush the CSV writer after each record is written. Previously we wouldn't flush the writer until it was dropped and for sufficiently large CSV files this would cause a hang. I was able to reproduce the hang by hooking up the Fivetran Destination to testdrive and using the `file-append` command to generate a CSV file with 1,000,000 records. On `main` this will hang but with this PR we successfully copy the file. ### Motivation Fix a hang in the Fivetran Destination ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [x] This PR includes the following [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note): - N/a
1 parent a8502ec commit a3aa200

File tree

1 file changed

+9
-1
lines changed
  • src/fivetran-destination/src/destination

1 file changed

+9
-1
lines changed

src/fivetran-destination/src/destination/dml.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,8 @@ async fn copy_files(
653653

654654
// Stream the files into the COPY FROM sink.
655655
for path in files {
656+
tracing::info!(?path, "starting copy");
657+
656658
// Open the CSV file, returning an AsyncReader.
657659
let file = load_file(file_config, path)
658660
.await
@@ -666,8 +668,14 @@ async fn copy_files(
666668
let mut record_stream = adapter.into_stream();
667669
while let Some(maybe_record) = record_stream.next().await {
668670
let record = maybe_record?;
669-
csv_sink.write_byte_record(&record).await?;
671+
csv_sink
672+
.write_byte_record(&record)
673+
.await
674+
.context("writing record")?;
675+
csv_sink.flush().await.context("flushing record")?;
670676
}
677+
678+
tracing::info!(?path, "finished copy");
671679
}
672680
}
673681

0 commit comments

Comments
 (0)