Skip to content

Add source table support for SQL server #32987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def get_default_system_parameters(
"storage_server_maintenance_interval",
"storage_sink_progress_search",
"storage_sink_ensure_topic_config",
"sql_server_snapshot_max_lsn_wait",
"sql_server_max_lsn_wait",
"sql_server_snapshot_progress_report_interval",
"sql_server_cdc_poll_interval",
"sql_server_cdc_cleanup_change_table",
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ def __init__(
"storage_sink_progress_search",
"storage_sink_ensure_topic_config",
"ore_overflowing_behavior",
"sql_server_snapshot_max_lsn_wait",
"sql_server_max_lsn_wait",
"sql_server_snapshot_progress_report_interval",
"sql_server_cdc_poll_interval",
"sql_server_cdc_cleanup_change_table",
Expand Down
33 changes: 2 additions & 31 deletions src/sql-server-util/src/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use std::time::Duration;

use derivative::Derivative;
use futures::{Stream, StreamExt};
use mz_ore::retry::RetryResult;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -347,41 +346,13 @@ impl<'a> CdcStream<'a> {
/// values. It should be called before taking the initial [`CdcStream::snapshot`]
/// to ensure the system is ready to proceed with CDC.
async fn wait_for_ready(&mut self) -> Result<(), SqlServerError> {
fn _map_result<T>(result: Result<T, SqlServerError>) -> RetryResult<T, SqlServerError> {
match result {
Ok(val) => RetryResult::Ok(val),
Err(err @ SqlServerError::NullLsn) => RetryResult::RetryableErr(err),
Err(other) => RetryResult::FatalErr(other),
}
}

// Ensure all of the capture instances are reporting an LSN.
for instance in self.capture_instances.keys() {
let (_client, min_result) = mz_ore::retry::Retry::default()
.max_duration(self.max_lsn_wait)
.retry_async_with_state(&mut self.client, |_, client| async {
let result = crate::inspect::get_min_lsn(*client, &*instance).await;
(client, _map_result(result))
})
.await;
if let Err(e) = min_result {
tracing::warn!(%instance, "did not report a minimum LSN in time");
return Err(e);
}
crate::inspect::get_min_lsn_retry(self.client, instance, self.max_lsn_wait).await?;
}

// Ensure the database is reporting a max LSN.
let (_client, lsn_result) = mz_ore::retry::Retry::default()
.max_duration(self.max_lsn_wait)
.retry_async_with_state(&mut self.client, |_, client| async {
let result = crate::inspect::get_max_lsn(*client).await;
(client, _map_result(result))
})
.await;
if let Err(e) = lsn_result {
tracing::warn!("database did not report a maximum LSN in time");
return Err(e);
};
crate::inspect::get_max_lsn_retry(self.client, self.max_lsn_wait).await?;

Ok(())
}
Expand Down
55 changes: 55 additions & 0 deletions src/sql-server-util/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
use futures::Stream;
use itertools::Itertools;
use mz_ore::cast::CastFrom;
use mz_ore::retry::RetryResult;
use smallvec::SmallVec;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::cdc::{Lsn, RowFilterOption};
use crate::desc::{SqlServerColumnRaw, SqlServerTableRaw};
Expand All @@ -34,6 +36,28 @@ pub async fn get_min_lsn(
mz_ore::soft_assert_eq_or_log!(result.len(), 1);
parse_lsn(&result[..1])
}
/// Returns the minimum log sequence number for the specified `capture_instance`, retrying
/// if the log sequence number is not available.
///
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-cdc-get-min-lsn-transact-sql?view=sql-server-ver16>
pub async fn get_min_lsn_retry(
client: &mut Client,
capture_instance: &str,
max_retry_duration: Duration,
) -> Result<Lsn, SqlServerError> {
let (_client, lsn_result) = mz_ore::retry::Retry::default()
.max_duration(max_retry_duration)
.retry_async_with_state(client, |_, client| async {
let result = crate::inspect::get_min_lsn(client, capture_instance).await;
(client, map_null_lsn_to_retry(result))
})
.await;
let Ok(lsn) = lsn_result else {
tracing::warn!("database did not report a minimum LSN in time");
return lsn_result;
};
Ok(lsn)
}

/// Returns the maximum log sequence number for the entire database.
///
Expand All @@ -46,6 +70,37 @@ pub async fn get_max_lsn(client: &mut Client) -> Result<Lsn, SqlServerError> {
parse_lsn(&result[..1])
}

/// Returns the maximum log sequence number for the entire database, retrying
/// if the log sequence number is not available.
///
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-cdc-get-max-lsn-transact-sql?view=sql-server-ver16>
pub async fn get_max_lsn_retry(
client: &mut Client,
max_retry_duration: Duration,
) -> Result<Lsn, SqlServerError> {
let (_client, lsn_result) = mz_ore::retry::Retry::default()
.max_duration(max_retry_duration)
.retry_async_with_state(client, |_, client| async {
let result = crate::inspect::get_max_lsn(client).await;
(client, map_null_lsn_to_retry(result))
})
.await;

let Ok(lsn) = lsn_result else {
tracing::warn!("database did not report a maximum LSN in time");
return lsn_result;
};
Ok(lsn)
}

fn map_null_lsn_to_retry<T>(result: Result<T, SqlServerError>) -> RetryResult<T, SqlServerError> {
match result {
Ok(val) => RetryResult::Ok(val),
Err(err @ SqlServerError::NullLsn) => RetryResult::RetryableErr(err),
Err(other) => RetryResult::FatalErr(other),
}
}

/// Increments the log sequence number.
///
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-cdc-increment-lsn-transact-sql?view=sql-server-ver16>
Expand Down
4 changes: 4 additions & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1629,9 +1629,11 @@ pub fn plan_create_subsource(
SourceExportStatementDetails::SqlServer {
table,
capture_instance,
initial_lsn,
} => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
capture_instance,
table,
initial_lsn,
text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
exclude_columns: exclude_columns
.into_iter()
Expand Down Expand Up @@ -1781,9 +1783,11 @@ pub fn plan_create_table_from_source(
SourceExportStatementDetails::SqlServer {
table,
capture_instance,
initial_lsn,
} => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
table,
capture_instance,
initial_lsn,
text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
exclude_columns: exclude_columns
.into_iter()
Expand Down
Loading