-
Notifications
You must be signed in to change notification settings - Fork 473
SQL server: consistent snapshot and lsn selection #32979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
60767a2
bc54675
639374e
913e4ce
0f66f3b
aef6f3e
831bb01
6efa280
c64ad03
ce54fc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,11 +17,46 @@ | |||||
//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s | ||||||
//! optionally from the [`Lsn`] returned in step 1. | ||||||
//! | ||||||
//! Internally we get a snapshot by setting our transaction isolation level to | ||||||
//! [`TransactionIsolationLevel::Snapshot`], getting the current maximum LSN with | ||||||
//! [`crate::inspect::get_max_lsn`] and then running a `SELECT *`. We've observed that by | ||||||
//! using [`TransactionIsolationLevel::Snapshot`] the LSN remains stable for the entire | ||||||
//! transaction. | ||||||
//! The snapshot process is responsible for identifying an [`Lsn`] that corresponds to | ||||||
//! a point-in-time view of the data for the table(s) being copied. Similarly to | ||||||
//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to | ||||||
//! achieve this. | ||||||
//! | ||||||
//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only | ||||||
//! see writes committed before the transaction began. More specficially, this | ||||||
//! snapshot is implemented using versions that are visibile based on the | ||||||
//! transaction sequence number (`XSN`). The `XSN` is set at the first | ||||||
//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17). | ||||||
//! This provides us a suitable starting point for capturing the table data. | ||||||
//! To force an `XSN` to be assigned, experiments have shown that a table must | ||||||
//! be read. We choose a well-known table that we should already have access to, | ||||||
//! [cdc.change_tables](https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-change-tables-transact-sql?view=sql-server-ver17), | ||||||
//! and read a single value from it. | ||||||
//! | ||||||
//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`] | ||||||
//! returned from any CDC tables or CDC functions will always be stale, | ||||||
//! in relation to the source table that CDC is tracking. The system table | ||||||
//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17) | ||||||
//! will contain an [`Lsn`] for any transaction that performs a write operation. | ||||||
//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17) | ||||||
//! is sufficient to generate an [`Lsn`] in this case. | ||||||
//! | ||||||
//! To ensure that the the point-in-time view is established atomically with | ||||||
//! collection of the [`Lsn`], we lock the tables to prevent writes from being | ||||||
//! interleaved between the 2 commands (read to establish `XSN` and creation of | ||||||
//! the savepoint). | ||||||
//! | ||||||
//! SQL server supports table locks, but those will only be released | ||||||
//! once the outermost transaction completes. For this reason, this module | ||||||
//! uses two connections for the snapshot process. The first connection is used | ||||||
//! to initiate a transaction and lock the upstream tables under | ||||||
//! [`TransactionIsolationLevel::ReadCommitted`] isolation. While the first | ||||||
//! connection maintains the locks, the second connection starts a | ||||||
//! transaction with [`TransactionIsolationLevel::Snapshot`] isolation and | ||||||
//! creates a savepoint. Once the savepoint is created, SQL server has assigned | ||||||
//! an [`Lsn`] and the the first connection rolls back the transaction. | ||||||
//! The [`Lsn`] and snapshot are captured by the second connection within the | ||||||
//! existing transaction. | ||||||
//! | ||||||
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return | ||||||
//! all changes between a `[lower, upper)` bound of [`Lsn`]s. | ||||||
|
@@ -34,8 +69,10 @@ use std::time::Duration; | |||||
use derivative::Derivative; | ||||||
use futures::{Stream, StreamExt}; | ||||||
use mz_ore::retry::RetryResult; | ||||||
use mz_repr::GlobalId; | ||||||
use proptest_derive::Arbitrary; | ||||||
use serde::{Deserialize, Serialize}; | ||||||
use tiberius::numeric::Numeric; | ||||||
|
||||||
use crate::{Client, SqlServerError, TransactionIsolationLevel}; | ||||||
|
||||||
|
@@ -56,7 +93,7 @@ pub struct CdcStream<'a> { | |||||
/// | ||||||
/// Note: When CDC is first enabled in an instance of SQL Server it can take a moment | ||||||
/// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot | ||||||
/// we'll wait this duration for SQL Server to report an LSN and thus indicate CDC is | ||||||
/// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is | ||||||
/// ready to go. | ||||||
max_lsn_wait: Duration, | ||||||
} | ||||||
|
@@ -95,7 +132,7 @@ impl<'a> CdcStream<'a> { | |||||
self | ||||||
} | ||||||
|
||||||
/// The max duration we'll wait for SQL Server to return an LSN before taking a | ||||||
/// The max duration we'll wait for SQL Server to return an [`Lsn`] before taking a | ||||||
/// snapshot. | ||||||
/// | ||||||
/// When CDC is first enabled in SQL Server it can take a moment before it is fully | ||||||
|
@@ -114,6 +151,8 @@ impl<'a> CdcStream<'a> { | |||||
pub async fn snapshot<'b>( | ||||||
&'b mut self, | ||||||
instances: Option<BTreeSet<Arc<str>>>, | ||||||
worker_id: usize, | ||||||
source_id: &'b GlobalId, | ||||||
) -> Result< | ||||||
( | ||||||
Lsn, | ||||||
|
@@ -122,6 +161,8 @@ impl<'a> CdcStream<'a> { | |||||
), | ||||||
SqlServerError, | ||||||
> { | ||||||
static SAVEPOINT_NAME: &str = "_mz_snap_"; | ||||||
|
||||||
// Determine what table we need to snapshot. | ||||||
let instances = self | ||||||
.capture_instances | ||||||
|
@@ -134,20 +175,72 @@ impl<'a> CdcStream<'a> { | |||||
.map(|i| i.as_ref()); | ||||||
let tables = | ||||||
crate::inspect::get_tables_for_capture_instance(self.client, instances).await?; | ||||||
tracing::info!(?tables, "got table for capture instance"); | ||||||
tracing::info!(%source_id, ?tables, "timely-{worker_id} got table for capture instance"); | ||||||
|
||||||
// Before starting a transaction where the LSN will not advance, ensure | ||||||
// the upstream DB is ready for CDC. | ||||||
self.wait_for_ready().await?; | ||||||
|
||||||
// Intentionally logging this at info for debugging. This section won't get entered | ||||||
// often, but if there are problems here, it will be much easier to troubleshoot | ||||||
// knowing where stall/hang might be happening. | ||||||
tracing::info!(%source_id, "timely-{worker_id} upstream is ready"); | ||||||
|
||||||
// The client that will be used for fencing does not need any special isolation level | ||||||
// as it will be just be locking the table(s). | ||||||
let mut fencing_client = self.client.new_connection().await?; | ||||||
let mut fence_txn = fencing_client.transaction().await?; | ||||||
|
||||||
// TODO improve table locking: https://github.com/MaterializeInc/database-issues/issues/9512 | ||||||
for (_capture_instance, schema, table) in &tables { | ||||||
tracing::trace!(%source_id, %schema, %table, "timely-{worker_id} locking table"); | ||||||
fence_txn.lock_table_shared(&*schema, &*table).await?; | ||||||
} | ||||||
|
||||||
// So we know that we locked that tables and roughly how long that took based on the time diff | ||||||
// from the last message. | ||||||
tracing::info!(%source_id, "timely-{worker_id} locked tables"); | ||||||
|
||||||
self.client | ||||||
.set_transaction_isolation(TransactionIsolationLevel::Snapshot) | ||||||
.await?; | ||||||
let txn = self.client.transaction().await?; | ||||||
let mut txn = self.client.transaction().await?; | ||||||
|
||||||
// Get the current LSN of the database. | ||||||
let lsn = crate::inspect::get_max_lsn(txn.client).await?; | ||||||
tracing::info!(?tables, ?lsn, "starting snapshot"); | ||||||
// Creating a savepoint forces a write to the transaction log, which will | ||||||
// assign an LSN, but it does not force a transaction sequence number to be | ||||||
// assigned as far as I can tell. I have not observed any entries added to | ||||||
// `sys.dm_tran_active_snapshot_database_transactions` when creating a savepoint | ||||||
// or when reading system views to retrieve the LSN. | ||||||
// | ||||||
// We choose cdc.change_tables because it is a system table that will exist | ||||||
// when CDC is enabled, it has a well known schema, and as a CDC client, | ||||||
// we should be able to read from it already. | ||||||
let res = txn | ||||||
.simple_query("SELECT TOP 1 object_id FROM cdc.change_tables") | ||||||
.await?; | ||||||
if res.len() != 1 { | ||||||
Err(SqlServerError::InvariantViolated( | ||||||
"No objects found in cdc.change_tables".into(), | ||||||
))? | ||||||
} | ||||||
|
||||||
// Because the tables are exclusively locked, any write operation has either | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// completed, or is blocked. The LSN and XSN acquired now will represent a | ||||||
// consistent point-in-time view, such that any comitted write will be | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// visible to this snapshot and the LSN of such a write will be less than | ||||||
// or equal to the LSN captured here. Creating the savepoint sets the LSN, | ||||||
// we can read it after rolling back the locks. | ||||||
txn.create_savepoint(SAVEPOINT_NAME).await?; | ||||||
tracing::info!(%source_id, %SAVEPOINT_NAME, "timely-{worker_id} created savepoint"); | ||||||
|
||||||
// Once the XSN is esablished and the LSN captured, the tables no longer | ||||||
// need to be locked. Any writes that happen to the upstream tables | ||||||
// will have an LSN higher than our captured LSN, and will be read from CDC. | ||||||
fence_txn.rollback().await?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should commit this transaction instead of rolling it back. What I'm thinking is that when we wrap up the fencing transaction we actually want to ensure that the server didn't decide to cancel it or otherwise mess with it while we weren't looking, since in that case the lock we think we have might not be there. I don't know if SQL does this too but for example pg cancels a transaction if it can't serialize it and you only learn it at commit time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not opposed to changing it. My reasoning for rollback is that this transaction (and the snapshot transaction) should never make any changes to the database. It prevents us accidentally committing things due to some innocuous change in the future. That's entirely based on my own battle scars 😁 A rollback without an active transaction does fail in SQL server
In the case of PG, when a transaction is canceled, are you saying that only the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to construct a case using |
||||||
|
||||||
let lsn = txn.get_lsn().await?; | ||||||
|
||||||
tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot"); | ||||||
|
||||||
// Get the size of each table we're about to snapshot. | ||||||
// | ||||||
|
@@ -158,29 +251,29 @@ impl<'a> CdcStream<'a> { | |||||
tracing::trace!(%capture_instance, %schema, %table, "snapshot stats start"); | ||||||
let size = crate::inspect::snapshot_size(txn.client, &*schema, &*table).await?; | ||||||
snapshot_stats.insert(Arc::clone(capture_instance), size); | ||||||
tracing::trace!(%capture_instance, %schema, %table, "snapshot stats end"); | ||||||
tracing::trace!(%source_id, %capture_instance, %schema, %table, "timely-{worker_id} snapshot stats end"); | ||||||
} | ||||||
|
||||||
// Run a `SELECT` query to snapshot the entire table. | ||||||
let stream = async_stream::stream! { | ||||||
// TODO(sql_server3): A stream of streams would be better here than | ||||||
// returning the name with each result, but the lifetimes are tricky. | ||||||
for (capture_instance, schema_name, table_name) in tables { | ||||||
tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot start"); | ||||||
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} snapshot start"); | ||||||
|
||||||
let snapshot = crate::inspect::snapshot(txn.client, &*schema_name, &*table_name); | ||||||
let mut snapshot = std::pin::pin!(snapshot); | ||||||
while let Some(result) = snapshot.next().await { | ||||||
yield (Arc::clone(&capture_instance), result); | ||||||
} | ||||||
|
||||||
tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot end"); | ||||||
tracing::trace!(%source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} tmsnapshot end"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
// Slightly awkward, but if the commit fails we need to conform to | ||||||
// Slightly awkward, but if the rollback fails we need to conform to | ||||||
// type of the stream. | ||||||
if let Err(e) = txn.commit().await { | ||||||
yield ("commit".into(), Err(e)); | ||||||
if let Err(e) = txn.rollback().await { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't follow this change. Is there something we want to undo here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as the above statement with commit vs. rollback. We do not intend to make any changes to the DB, so we rollback the transaction that read all the data. |
||||||
yield ("rollback".into(), Err(e)); | ||||||
} | ||||||
}; | ||||||
|
||||||
|
@@ -545,6 +638,40 @@ impl TryFrom<&[u8]> for Lsn { | |||||
} | ||||||
} | ||||||
|
||||||
impl TryFrom<Numeric> for Lsn { | ||||||
type Error = String; | ||||||
|
||||||
fn try_from(value: Numeric) -> Result<Self, Self::Error> { | ||||||
if value.dec_part() != 0 { | ||||||
return Err(format!( | ||||||
"LSN expect Numeric(25,0), but found decimal portion {}", | ||||||
value.dec_part() | ||||||
)); | ||||||
} | ||||||
let mut decimal_lsn = value.int_part(); | ||||||
// LSN is composed of 4 bytes : 4 bytes : 2 bytes | ||||||
// and MS provided the method to decode that here | ||||||
// https://github.com/microsoft/sql-server-samples/blob/master/samples/features/ssms-templates/Sql/Change%20Data%20Capture/Enumeration/Create%20Function%20fn_convertnumericlsntobinary.sql | ||||||
|
||||||
let vlf_id = u32::try_from(decimal_lsn / 10_i128.pow(15)) | ||||||
.map_err(|e| format!("Failed to decode vlf_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
decimal_lsn -= i128::from(vlf_id) * 10_i128.pow(15); | ||||||
|
||||||
let block_id = u32::try_from(decimal_lsn / 10_i128.pow(5)) | ||||||
.map_err(|e| format!("Failed to decode block_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
decimal_lsn -= i128::from(block_id) * 10_i128.pow(5); | ||||||
|
||||||
let record_id = u16::try_from(decimal_lsn) | ||||||
.map_err(|e| format!("Failed to decode record_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
|
||||||
Ok(Lsn { | ||||||
vlf_id, | ||||||
block_id, | ||||||
record_id, | ||||||
}) | ||||||
} | ||||||
} | ||||||
|
||||||
impl columnation::Columnation for Lsn { | ||||||
type InnerRegion = columnation::CopyRegion<Lsn>; | ||||||
} | ||||||
|
@@ -695,6 +822,7 @@ impl Operation { | |||||
mod tests { | ||||||
use super::Lsn; | ||||||
use proptest::prelude::*; | ||||||
use tiberius::numeric::Numeric; | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn smoketest_lsn_ordering() { | ||||||
|
@@ -776,4 +904,37 @@ mod tests { | |||||
test_case(random_bytes, num_increment) | ||||||
}) | ||||||
} | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn test_numeric_lsn_ordering() { | ||||||
let a = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00001_i128, 0)).unwrap(); | ||||||
let b = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00002_i128, 0)).unwrap(); | ||||||
let c = Lsn::try_from(Numeric::new_with_scale(45_0000008785_00002_i128, 0)).unwrap(); | ||||||
let d = Lsn::try_from(Numeric::new_with_scale(49_0000008784_00002_i128, 0)).unwrap(); | ||||||
assert!(a < b); | ||||||
assert!(b < c); | ||||||
assert!(c < d); | ||||||
assert!(a < d); | ||||||
|
||||||
assert_eq!(a, a); | ||||||
assert_eq!(b, b); | ||||||
assert_eq!(c, c); | ||||||
assert_eq!(d, d); | ||||||
} | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn test_numeric_lsn_invalid() { | ||||||
let with_decimal = Numeric::new_with_scale(1, 20); | ||||||
assert!(Lsn::try_from(with_decimal).is_err()); | ||||||
|
||||||
for v in [ | ||||||
4294967296_0000000000_00000_i128, // vlf_id is too large | ||||||
1_4294967296_00000_i128, // block_id is too large | ||||||
1_0000000001_65536_i128, // record_id is too large | ||||||
-49_0000008784_00002_i128, // negative is invalid | ||||||
] { | ||||||
let invalid_lsn = Numeric::new_with_scale(v, 0); | ||||||
assert!(Lsn::try_from(invalid_lsn).is_err()); | ||||||
} | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rust forces you to put a lifetime due to async and this parameter being a reference.. You can avoid that with even more syntax but I suggest you just change this to
GlobalId
since it's a smallCopy
type