Skip to content

SQL server: consistent snapshot and lsn selection #32979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion src/sql-server-util/examples/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ async fn main() -> Result<(), anyhow::Error> {
tracing::info!("connection 2 successful!");

// Get an initial snapshot of the table.
let (lsn, stats, snapshot) = cdc_handle.snapshot(None).await?;
let (lsn, stats, snapshot) = cdc_handle
.snapshot(None, 1, &mz_repr::GlobalId::User(1))
.await?;
tracing::info!("snapshot stats: {stats:?}");
{
let mut snapshot = std::pin::pin!(snapshot);
Expand Down
197 changes: 179 additions & 18 deletions src/sql-server-util/src/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,46 @@
//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s
//! optionally from the [`Lsn`] returned in step 1.
//!
//! Internally we get a snapshot by setting our transaction isolation level to
//! [`TransactionIsolationLevel::Snapshot`], getting the current maximum LSN with
//! [`crate::inspect::get_max_lsn`] and then running a `SELECT *`. We've observed that by
//! using [`TransactionIsolationLevel::Snapshot`] the LSN remains stable for the entire
//! transaction.
//! The snapshot process is responsible for identifying an [`Lsn`] that corresponds to
//! a point-in-time view of the data for the table(s) being copied. Similarly to
//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to
//! achieve this.
//!
//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only
//! see writes committed before the transaction began. More specficially, this
//! snapshot is implemented using versions that are visibile based on the
//! transaction sequence number (`XSN`). The `XSN` is set at the first
//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17).
//! This provides us a suitable starting point for capturing the table data.
//! To force an `XSN` to be assigned, experiments have shown that a table must
//! be read. We choose a well-known table that we should already have access to,
//! [cdc.change_tables](https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-change-tables-transact-sql?view=sql-server-ver17),
//! and read a single value from it.
//!
//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`]
//! returned from any CDC tables or CDC functions will always be stale,
//! in relation to the source table that CDC is tracking. The system table
//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17)
//! will contain an [`Lsn`] for any transaction that performs a write operation.
//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17)
//! is sufficient to generate an [`Lsn`] in this case.
//!
//! To ensure that the the point-in-time view is established atomically with
//! collection of the [`Lsn`], we lock the tables to prevent writes from being
//! interleaved between the 2 commands (read to establish `XSN` and creation of
//! the savepoint).
//!
//! SQL server supports table locks, but those will only be released
//! once the outermost transaction completes. For this reason, this module
//! uses two connections for the snapshot process. The first connection is used
//! to initiate a transaction and lock the upstream tables under
//! [`TransactionIsolationLevel::ReadCommitted`] isolation. While the first
//! connection maintains the locks, the second connection starts a
//! transaction with [`TransactionIsolationLevel::Snapshot`] isolation and
//! creates a savepoint. Once the savepoint is created, SQL server has assigned
//! an [`Lsn`] and the the first connection rolls back the transaction.
//! The [`Lsn`] and snapshot are captured by the second connection within the
//! existing transaction.
//!
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return
//! all changes between a `[lower, upper)` bound of [`Lsn`]s.
Expand All @@ -34,8 +69,10 @@ use std::time::Duration;
use derivative::Derivative;
use futures::{Stream, StreamExt};
use mz_ore::retry::RetryResult;
use mz_repr::GlobalId;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use tiberius::numeric::Numeric;

use crate::{Client, SqlServerError, TransactionIsolationLevel};

Expand All @@ -56,7 +93,7 @@ pub struct CdcStream<'a> {
///
/// Note: When CDC is first enabled in an instance of SQL Server it can take a moment
/// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot
/// we'll wait this duration for SQL Server to report an LSN and thus indicate CDC is
/// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is
/// ready to go.
max_lsn_wait: Duration,
}
Expand Down Expand Up @@ -95,7 +132,7 @@ impl<'a> CdcStream<'a> {
self
}

/// The max duration we'll wait for SQL Server to return an LSN before taking a
/// The max duration we'll wait for SQL Server to return an [`Lsn`] before taking a
/// snapshot.
///
/// When CDC is first enabled in SQL Server it can take a moment before it is fully
Expand All @@ -114,6 +151,8 @@ impl<'a> CdcStream<'a> {
pub async fn snapshot<'b>(
&'b mut self,
instances: Option<BTreeSet<Arc<str>>>,
worker_id: usize,
source_id: &'b GlobalId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust forces you to put a lifetime due to async and this parameter being a reference.. You can avoid that with even more syntax but I suggest you just change this to GlobalId since it's a small Copy type

) -> Result<
(
Lsn,
Expand All @@ -122,6 +161,8 @@ impl<'a> CdcStream<'a> {
),
SqlServerError,
> {
static SAVEPOINT_NAME: &str = "_mz_snap_";

// Determine what table we need to snapshot.
let instances = self
.capture_instances
Expand All @@ -134,20 +175,72 @@ impl<'a> CdcStream<'a> {
.map(|i| i.as_ref());
let tables =
crate::inspect::get_tables_for_capture_instance(self.client, instances).await?;
tracing::info!(?tables, "got table for capture instance");
tracing::info!(%source_id, ?tables, "timely-{worker_id} got table for capture instance");

// Before starting a transaction where the LSN will not advance, ensure
// the upstream DB is ready for CDC.
self.wait_for_ready().await?;

// Intentionally logging this at info for debugging. This section won't get entered
// often, but if there are problems here, it will be much easier to troubleshoot
// knowing where stall/hang might be happening.
tracing::info!(%source_id, "timely-{worker_id} upstream is ready");

// The client that will be used for fencing does not need any special isolation level
// as it will be just be locking the table(s).
let mut fencing_client = self.client.new_connection().await?;
let mut fence_txn = fencing_client.transaction().await?;

// TODO improve table locking: https://github.com/MaterializeInc/database-issues/issues/9512
for (_capture_instance, schema, table) in &tables {
tracing::trace!(%source_id, %schema, %table, "timely-{worker_id} locking table");
fence_txn.lock_table_shared(&*schema, &*table).await?;
}

// So we know that we locked that tables and roughly how long that took based on the time diff
// from the last message.
tracing::info!(%source_id, "timely-{worker_id} locked tables");

self.client
.set_transaction_isolation(TransactionIsolationLevel::Snapshot)
.await?;
let txn = self.client.transaction().await?;
let mut txn = self.client.transaction().await?;

// Get the current LSN of the database.
let lsn = crate::inspect::get_max_lsn(txn.client).await?;
tracing::info!(?tables, ?lsn, "starting snapshot");
// Creating a savepoint forces a write to the transaction log, which will
// assign an LSN, but it does not force a transaction sequence number to be
// assigned as far as I can tell. I have not observed any entries added to
// `sys.dm_tran_active_snapshot_database_transactions` when creating a savepoint
// or when reading system views to retrieve the LSN.
//
// We choose cdc.change_tables because it is a system table that will exist
// when CDC is enabled, it has a well known schema, and as a CDC client,
// we should be able to read from it already.
let res = txn
.simple_query("SELECT TOP 1 object_id FROM cdc.change_tables")
.await?;
if res.len() != 1 {
Err(SqlServerError::InvariantViolated(
"No objects found in cdc.change_tables".into(),
))?
}

// Because the tables are exclusively locked, any write operation has either
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Because the tables are exclusively locked, any write operation has either
// Because the tables are locked, any write operation has either

// completed, or is blocked. The LSN and XSN acquired now will represent a
// consistent point-in-time view, such that any comitted write will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// consistent point-in-time view, such that any comitted write will be
// consistent point-in-time view, such that any committed write will be

// visible to this snapshot and the LSN of such a write will be less than
// or equal to the LSN captured here. Creating the savepoint sets the LSN,
// we can read it after rolling back the locks.
txn.create_savepoint(SAVEPOINT_NAME).await?;
tracing::info!(%source_id, %SAVEPOINT_NAME, "timely-{worker_id} created savepoint");

// Once the XSN is esablished and the LSN captured, the tables no longer
// need to be locked. Any writes that happen to the upstream tables
// will have an LSN higher than our captured LSN, and will be read from CDC.
fence_txn.rollback().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should commit this transaction instead of rolling it back. What I'm thinking is that when we wrap up the fencing transaction we actually want to ensure that the server didn't decide to cancel it or otherwise mess with it while we weren't looking, since in that case the lock we think we have might not be there.

I don't know if SQL does this too but for example pg cancels a transaction if it can't serialize it and you only learn it at commit time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to changing it. My reasoning for rollback is that this transaction (and the snapshot transaction) should never make any changes to the database. It prevents us accidentally committing things due to some innocuous change in the future. That's entirely based on my own battle scars 😁

A rollback without an active transaction does fail in SQL server

1> rollback;
2> go
Msg 3903, Level 16, State 1, Server 41d33a81ef67, Line 1
The ROLLBACK TRANSACTION request has no corresponding BEGIN TRANSACTION.

In the case of PG, when a transaction is canceled, are you saying that only the commit returns the error? I would expect that commit or rollback would fail because there is no transaction to operate on at that point (or it's in an error state).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to construct a case using KILL but I can't, rollback hangs if I kill the session under its nose. I don't feel strongly about this, it should be fine to use rollback here if any error gets surfaced instead of being swept under the carpet.


let lsn = txn.get_lsn().await?;

tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot");

// Get the size of each table we're about to snapshot.
//
Expand All @@ -158,29 +251,29 @@ impl<'a> CdcStream<'a> {
tracing::trace!(%capture_instance, %schema, %table, "snapshot stats start");
let size = crate::inspect::snapshot_size(txn.client, &*schema, &*table).await?;
snapshot_stats.insert(Arc::clone(capture_instance), size);
tracing::trace!(%capture_instance, %schema, %table, "snapshot stats end");
tracing::trace!(%source_id, %capture_instance, %schema, %table, "timely-{worker_id} snapshot stats end");
}

// Run a `SELECT` query to snapshot the entire table.
let stream = async_stream::stream! {
// TODO(sql_server3): A stream of streams would be better here than
// returning the name with each result, but the lifetimes are tricky.
for (capture_instance, schema_name, table_name) in tables {
tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot start");
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} snapshot start");

let snapshot = crate::inspect::snapshot(txn.client, &*schema_name, &*table_name);
let mut snapshot = std::pin::pin!(snapshot);
while let Some(result) = snapshot.next().await {
yield (Arc::clone(&capture_instance), result);
}

tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot end");
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} tmsnapshot end");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} tmsnapshot end");
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} snapshot end");

}

// Slightly awkward, but if the commit fails we need to conform to
// Slightly awkward, but if the rollback fails we need to conform to
// type of the stream.
if let Err(e) = txn.commit().await {
yield ("commit".into(), Err(e));
if let Err(e) = txn.rollback().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't follow this change. Is there something we want to undo here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as the above statement with commit vs. rollback. We do not intend to make any changes to the DB, so we rollback the transaction that read all the data.

yield ("rollback".into(), Err(e));
}
};

Expand Down Expand Up @@ -545,6 +638,40 @@ impl TryFrom<&[u8]> for Lsn {
}
}

impl TryFrom<Numeric> for Lsn {
type Error = String;

fn try_from(value: Numeric) -> Result<Self, Self::Error> {
if value.dec_part() != 0 {
return Err(format!(
"LSN expect Numeric(25,0), but found decimal portion {}",
value.dec_part()
));
}
let mut decimal_lsn = value.int_part();
// LSN is composed of 4 bytes : 4 bytes : 2 bytes
// and MS provided the method to decode that here
// https://github.com/microsoft/sql-server-samples/blob/master/samples/features/ssms-templates/Sql/Change%20Data%20Capture/Enumeration/Create%20Function%20fn_convertnumericlsntobinary.sql

let vlf_id = u32::try_from(decimal_lsn / 10_i128.pow(15))
.map_err(|e| format!("Failed to decode vlf_id for lsn {decimal_lsn}: {e:?}"))?;
decimal_lsn -= i128::from(vlf_id) * 10_i128.pow(15);

let block_id = u32::try_from(decimal_lsn / 10_i128.pow(5))
.map_err(|e| format!("Failed to decode block_id for lsn {decimal_lsn}: {e:?}"))?;
decimal_lsn -= i128::from(block_id) * 10_i128.pow(5);

let record_id = u16::try_from(decimal_lsn)
.map_err(|e| format!("Failed to decode record_id for lsn {decimal_lsn}: {e:?}"))?;

Ok(Lsn {
vlf_id,
block_id,
record_id,
})
}
}

impl columnation::Columnation for Lsn {
type InnerRegion = columnation::CopyRegion<Lsn>;
}
Expand Down Expand Up @@ -695,6 +822,7 @@ impl Operation {
mod tests {
use super::Lsn;
use proptest::prelude::*;
use tiberius::numeric::Numeric;

#[mz_ore::test]
fn smoketest_lsn_ordering() {
Expand Down Expand Up @@ -776,4 +904,37 @@ mod tests {
test_case(random_bytes, num_increment)
})
}

#[mz_ore::test]
fn test_numeric_lsn_ordering() {
let a = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00001_i128, 0)).unwrap();
let b = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00002_i128, 0)).unwrap();
let c = Lsn::try_from(Numeric::new_with_scale(45_0000008785_00002_i128, 0)).unwrap();
let d = Lsn::try_from(Numeric::new_with_scale(49_0000008784_00002_i128, 0)).unwrap();
assert!(a < b);
assert!(b < c);
assert!(c < d);
assert!(a < d);

assert_eq!(a, a);
assert_eq!(b, b);
assert_eq!(c, c);
assert_eq!(d, d);
}

#[mz_ore::test]
fn test_numeric_lsn_invalid() {
let with_decimal = Numeric::new_with_scale(1, 20);
assert!(Lsn::try_from(with_decimal).is_err());

for v in [
4294967296_0000000000_00000_i128, // vlf_id is too large
1_4294967296_00000_i128, // block_id is too large
1_0000000001_65536_i128, // record_id is too large
-49_0000008784_00002_i128, // negative is invalid
] {
let invalid_lsn = Numeric::new_with_scale(v, 0);
assert!(Lsn::try_from(invalid_lsn).is_err());
}
}
}
23 changes: 23 additions & 0 deletions src/sql-server-util/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use smallvec::SmallVec;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tiberius::numeric::Numeric;

use crate::cdc::{Lsn, RowFilterOption};
use crate::desc::{SqlServerColumnRaw, SqlServerTableRaw};
Expand Down Expand Up @@ -59,6 +60,28 @@ pub async fn increment_lsn(client: &mut Client, lsn: Lsn) -> Result<Lsn, SqlServ
parse_lsn(&result[..1])
}

/// Parse an [`Lsn`] in Decimal(25,0) format of the provided [`tiberius::Row`].
///
/// Returns an error if the provided slice doesn't have exactly one row.
pub(crate) fn parse_numeric_lsn(row: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
match row {
[r] => {
let numeric_lsn = r
.try_get::<Numeric, _>(0)?
.ok_or_else(|| SqlServerError::NullLsn)?;
let lsn = Lsn::try_from(numeric_lsn).map_err(|msg| SqlServerError::InvalidData {
column_name: "lsn".to_string(),
error: msg,
})?;
Ok(lsn)
}
other => Err(SqlServerError::InvalidData {
column_name: "lsn".to_string(),
error: format!("expected 1 column, got {other:?}"),
}),
}
}

/// Parse an [`Lsn`] from the first column of the provided [`tiberius::Row`].
///
/// Returns an error if the provided slice doesn't have exactly one row.
Expand Down
Loading