-
Notifications
You must be signed in to change notification settings - Fork 473
SQL server: consistent snapshot and lsn selection #32979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
60767a2
bc54675
639374e
913e4ce
0f66f3b
aef6f3e
831bb01
6efa280
c64ad03
ce54fc0
9da5579
3bf73e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,11 +17,49 @@ | |||||
//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s | ||||||
//! optionally from the [`Lsn`] returned in step 1. | ||||||
//! | ||||||
//! Internally we get a snapshot by setting our transaction isolation level to | ||||||
//! [`TransactionIsolationLevel::Snapshot`], getting the current maximum LSN with | ||||||
//! [`crate::inspect::get_max_lsn`] and then running a `SELECT *`. We've observed that by | ||||||
//! using [`TransactionIsolationLevel::Snapshot`] the LSN remains stable for the entire | ||||||
//! transaction. | ||||||
//! The snapshot process is responsible for identifying a [`Lsn`] that provides | ||||||
//! a point-in-time view of the data for the table(s) being copied. Similarly to | ||||||
//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to | ||||||
//! achieve this. | ||||||
//! | ||||||
//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only | ||||||
//! see writes committed before the transaction began. More specficially, this | ||||||
//! snapshot is implemented using versions that are visibile based on the | ||||||
//! transaction sequence number (`XSN`). The `XSN` is set at the first | ||||||
//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17). | ||||||
//! This provides us a suitable starting point for capturing the table data. | ||||||
//! To force a `XSN` to be assigned, experiments have shown that a table must | ||||||
//! be read. We choose a well-known table that we should already have access to, | ||||||
//! [cdc.change_tables](https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-change-tables-transact-sql?view=sql-server-ver17), | ||||||
//! and read a single value from it. | ||||||
//! | ||||||
//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`] | ||||||
//! returned from any CDC tables or CDC functions will always be stale, | ||||||
//! in relation to the source table that CDC is tracking. The system table | ||||||
//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17) | ||||||
//! will contain a [`Lsn`] for any transaction that performs a write operation. | ||||||
//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17). | ||||||
//! is sufficient to generate a [`Lsn`] in this case. | ||||||
//! | ||||||
//! Unfortunately, it isn't sufficient to just fix the [`Lsn`] and `XSN`. There exists | ||||||
//! the possibility that a write transaction may have inserted a row into one | ||||||
//! of the tables in the snapshot, but that write has not comitted. In this case, | ||||||
//! the `INSERT` has already been written to the transaction log at a [`Lsn`] | ||||||
//! less than than the one captured, but the snapshot will *not* observe that INSERT | ||||||
//! because the transaction has not committed, and may not commit until after | ||||||
//! the snapshot is complete. In order to force a clear delineation of updates, | ||||||
//! the upstream tables in the snapshot must be locked. This lock only needs | ||||||
//! to exist long enough to establish the [`Lsn`] and `XSN`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something doesn't check out for me in this paragraph.
This sounds totally fine? If that transaction commits after the snapshot is complete then in the CDC stream that write will be associated with an LSN that is higher that then snapshot LSN we captured and so it will be correctly applied on top of the snapshot, which as you say will not observe the write. I think the danger, and reason why we need locks, is that we have no way establishing a transaction MVCC snapshot and an LSN boundary atomically. With postgres we get this API by creating a temp replication slot as the first statement of the transaction with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, that was me not proofreading. I unfortunately wrote it in parts. I trimmed it down. Thanks for catching that Petros! |
||||||
//! | ||||||
//! SQL server supports exclusive table locks, but those will only be released | ||||||
//! once the outermost transaction completes. For this reason, this module | ||||||
//! uses two connections for the snapshot process. The first conection is used | ||||||
martykulma marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
//! to initiate a transaction and lock the upstream tables. While the first | ||||||
//! connection maintains the locks, the second connection starts a transaction | ||||||
//! with [`TransactionIsolationLevel::Snapshot`] isolation and creates a | ||||||
//! savepoint. Once the savepoint is created and [`Lsn`] is captured, the first | ||||||
//! connection rolls back the transaction. The snapshot is created by the second | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
(depends on the discussion in the other comment) |
||||||
//! connection within the existing transaction. | ||||||
//! | ||||||
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return | ||||||
//! all changes between a `[lower, upper)` bound of [`Lsn`]s. | ||||||
|
@@ -36,6 +74,7 @@ use futures::{Stream, StreamExt}; | |||||
use mz_ore::retry::RetryResult; | ||||||
use proptest_derive::Arbitrary; | ||||||
use serde::{Deserialize, Serialize}; | ||||||
use tiberius::numeric::Numeric; | ||||||
|
||||||
use crate::{Client, SqlServerError, TransactionIsolationLevel}; | ||||||
|
||||||
|
@@ -56,7 +95,7 @@ pub struct CdcStream<'a> { | |||||
/// | ||||||
/// Note: When CDC is first enabled in an instance of SQL Server it can take a moment | ||||||
/// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot | ||||||
/// we'll wait this duration for SQL Server to report an LSN and thus indicate CDC is | ||||||
/// we'll wait this duration for SQL Server to report a [`Lsn`] and thus indicate CDC is | ||||||
/// ready to go. | ||||||
max_lsn_wait: Duration, | ||||||
} | ||||||
|
@@ -95,7 +134,7 @@ impl<'a> CdcStream<'a> { | |||||
self | ||||||
} | ||||||
|
||||||
/// The max duration we'll wait for SQL Server to return an LSN before taking a | ||||||
/// The max duration we'll wait for SQL Server to return a [`Lsn`] before taking a | ||||||
/// snapshot. | ||||||
/// | ||||||
/// When CDC is first enabled in SQL Server it can take a moment before it is fully | ||||||
|
@@ -122,6 +161,8 @@ impl<'a> CdcStream<'a> { | |||||
), | ||||||
SqlServerError, | ||||||
> { | ||||||
static SAVEPOINT_NAME: &str = "_mz_snap_"; | ||||||
|
||||||
// Determine what table we need to snapshot. | ||||||
let instances = self | ||||||
.capture_instances | ||||||
|
@@ -140,13 +181,57 @@ impl<'a> CdcStream<'a> { | |||||
// the upstream DB is ready for CDC. | ||||||
self.wait_for_ready().await?; | ||||||
|
||||||
tracing::info!("Upstream is ready"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: might want to add some additional context here and also are we sure we want this at info level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I see! would we ever see these concurrently on the same node? would it be clear which database is ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦 - just realized i didn't include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on adding context. The standard format we use is |
||||||
|
||||||
// The client that will be used for fencing does not need any special isolation level | ||||||
// as it will be just be locking the table(s). | ||||||
let mut fencing_client = self.client.new_connection().await?; | ||||||
let mut fence_txn = fencing_client.transaction().await?; | ||||||
|
||||||
// TODO (maz): we should consider a timeout or a lock + snapshot per-table instead of collectively | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what telemetry do we need to be able to make decision about the necessity of locking per table? I would guess we need to know whether the table would have been locked for less time that way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. follow up: maybe we can batch with a configurable number of tables per batch? we might want to track with a histogram/metric roughly how long we are locking a tables. seems like this only becomes a problem if there's a very large number of tables or a very slow lock acquisition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we know this is better, right? I would just create an issue for it so that we do it. (or, if it's not a big refactor do it now) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If possible, sure. Created a GA task! this PR has plenty of action already 😁 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! |
||||||
for (_capture_instance, schema, table) in &tables { | ||||||
tracing::trace!(%schema, %table, "locking table"); | ||||||
fence_txn.lock_table_exclusive(&*schema, &*table).await?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need an exclusive lock in the end? We don't care if other people concurrently read the table with us, we only want to block writes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's the lesser of 2 evils, can add a comment. There is an UPDLOCK, but it only applies to rows/pages. My understanding is we'd have to read the entire table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern with If you're not opposed, I'd like to tackle the lock improvement after this PR. Relaxing the locking will improve the user experience, but ultimately does not affect correctness. A quick test of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah sorry, the
Weird, this works fine for me (writes are prevented). This is what I'm doing:
If that's the case (can't repro this myself) we might have a problem if the table in question is empty during the initial snapshot. Can you share the commands you ran in your quick test to at least record it here for when we relax the lock?
Not opposed, let's make an issue in the epic to not forget There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I think I know why. The issue appears to be if the session that establishes the locks is using So it looks like we can use I inadvertently was running the commands in the wrong sessions:
session 2 (read committed isolation, and no explicit transaction)
both sessions show the same in the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh my! Why does it have to be so subtle!? 💀 So locks are ignored in snapshot isolation. Makes total sense.. |
||||||
} | ||||||
tracing::info!(?tables, "Locked tables"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same comment as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above, let's add context about the timely worker and source id here |
||||||
|
||||||
self.client | ||||||
.set_transaction_isolation(TransactionIsolationLevel::Snapshot) | ||||||
.await?; | ||||||
let txn = self.client.transaction().await?; | ||||||
let mut txn = self.client.transaction().await?; | ||||||
|
||||||
// Creating a savepoint forces a write to the transaction log, which will | ||||||
// assign a LSN, but it does not force a transaction sequence number to be | ||||||
// assigned as far as I can tell. I have not observed any entries added to | ||||||
// `sys.dm_tran_active_snapshot_database_transactions` when creating a savepoint | ||||||
// or when reading system views to retrieve the LSN. | ||||||
// | ||||||
// We choose cdc.change_tables because it is a system table that will exist | ||||||
// when CDC is enabled, it has a well known schema, and as a CDC client, | ||||||
// we should be able to read from it already. | ||||||
let res = txn | ||||||
.simple_query("SELECT TOP 1 object_id FROM cdc.change_tables") | ||||||
.await?; | ||||||
if res.len() != 1 { | ||||||
Err(SqlServerError::InvariantViolated( | ||||||
"No objects found in cdc.change_tables".into(), | ||||||
))? | ||||||
} | ||||||
|
||||||
// Because the tables are exclusively locked, any write operation has either | ||||||
martykulma marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
// completed, or is blocked. The LSN and XSN acquired now will represent a | ||||||
// consistent point-in-time view, such that any comitted write will be | ||||||
martykulma marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
// visible to this snapshot and the LSN of such a write will be less than | ||||||
// or equal to the LSN captured here. | ||||||
txn.create_savepoint(SAVEPOINT_NAME).await?; | ||||||
tracing::info!(%SAVEPOINT_NAME, "Created savepoint"); | ||||||
let lsn = txn.get_lsn().await?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can run this statement after we have committed the fencing transaction to reduce the amount of time we hold the locks for. The LSN is already established and it's fine to read it outside the critical section There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indeed! |
||||||
|
||||||
// Once the XSN is esablished and the LSN captured, the tables no longer | ||||||
// need to be locked. Any writes that happen to the upstream tables | ||||||
// will have a LSN higher than our captured LSN, and will be read from CDC. | ||||||
fence_txn.rollback().await?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should commit this transaction instead of rolling it back. What I'm thinking is that when we wrap up the fencing transaction we actually want to ensure that the server didn't decide to cancel it or otherwise mess with it while we weren't looking, since in that case the lock we think we have might not be there. I don't know if SQL does this too but for example pg cancels a transaction if it can't serialize it and you only learn it at commit time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not opposed to changing it. My reasoning for rollback is that this transaction (and the snapshot transaction) should never make any changes to the database. It prevents us accidentally committing things due to some innocuous change in the future. That's entirely based on my own battle scars 😁 A rollback without an active transaction does fail in SQL server
In the case of PG, when a transaction is canceled, are you saying that only the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to construct a case using |
||||||
|
||||||
// Get the current LSN of the database. | ||||||
let lsn = crate::inspect::get_max_lsn(txn.client).await?; | ||||||
tracing::info!(?tables, ?lsn, "starting snapshot"); | ||||||
|
||||||
// Get the size of each table we're about to snapshot. | ||||||
|
@@ -177,10 +262,10 @@ impl<'a> CdcStream<'a> { | |||||
tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot end"); | ||||||
} | ||||||
|
||||||
// Slightly awkward, but if the commit fails we need to conform to | ||||||
// Slightly awkward, but if the rollback fails we need to conform to | ||||||
// type of the stream. | ||||||
if let Err(e) = txn.commit().await { | ||||||
yield ("commit".into(), Err(e)); | ||||||
if let Err(e) = txn.rollback().await { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't follow this change. Is there something we want to undo here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as the above statement with commit vs. rollback. We do not intend to make any changes to the DB, so we rollback the transaction that read all the data. |
||||||
yield ("rollback".into(), Err(e)); | ||||||
} | ||||||
}; | ||||||
|
||||||
|
@@ -355,7 +440,7 @@ impl<'a> CdcStream<'a> { | |||||
} | ||||||
} | ||||||
|
||||||
// Ensure all of the capture instances are reporting an LSN. | ||||||
// Ensure all of the capture instances are reporting a LSN. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think "an LSN" is correct because the rule depends on of the pronunciation "el-es-en". Here is a similar example where it's "an LED" instead of "a LED" https://en.wikipedia.org/wiki/Light-emitting_diode We also write "a UV lamp" even though it's "an ultraviolet lamp" (example courtesy of ChatGPT) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fair.. though I feel this is the same type of nonsense that leads to people sayings ATM machine! English is hard. |
||||||
for instance in self.capture_instances.keys() { | ||||||
let (_client, min_result) = mz_ore::retry::Retry::default() | ||||||
.max_duration(self.max_lsn_wait) | ||||||
|
@@ -471,7 +556,7 @@ pub struct Lsn { | |||||
impl Lsn { | ||||||
const SIZE: usize = 10; | ||||||
|
||||||
/// Interpret the provided bytes as an [`Lsn`]. | ||||||
/// Interpret the provided bytes as a [`Lsn`]. | ||||||
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, String> { | ||||||
if bytes.len() != Self::SIZE { | ||||||
return Err(format!("incorrect length, expected 10 got {}", bytes.len())); | ||||||
|
@@ -545,6 +630,40 @@ impl TryFrom<&[u8]> for Lsn { | |||||
} | ||||||
} | ||||||
|
||||||
impl TryFrom<Numeric> for Lsn { | ||||||
type Error = String; | ||||||
|
||||||
fn try_from(value: Numeric) -> Result<Self, Self::Error> { | ||||||
if value.dec_part() != 0 { | ||||||
return Err(format!( | ||||||
"LSN expect Numeric(25,0), but found decimal portion {}", | ||||||
value.dec_part() | ||||||
)); | ||||||
} | ||||||
let mut decimal_lsn = value.int_part(); | ||||||
// LSN is composed of 4 bytes : 4 bytes : 2 bytes | ||||||
// and MS provided the method to decode that here | ||||||
// https://github.com/microsoft/sql-server-samples/blob/master/samples/features/ssms-templates/Sql/Change%20Data%20Capture/Enumeration/Create%20Function%20fn_convertnumericlsntobinary.sql | ||||||
|
||||||
let vlf_id = u32::try_from(decimal_lsn / 10_i128.pow(15)) | ||||||
.map_err(|e| format!("Failed to decode vlf_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
decimal_lsn -= i128::try_from(vlf_id).unwrap() * 10_i128.pow(15); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A direct
Suggested change
|
||||||
|
||||||
let block_id = u32::try_from(decimal_lsn / 10_i128.pow(5)) | ||||||
.map_err(|e| format!("Failed to decode block_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
decimal_lsn -= i128::try_from(block_id).unwrap() * 10_i128.pow(5); | ||||||
|
||||||
let record_id = u16::try_from(decimal_lsn) | ||||||
.map_err(|e| format!("Failed to decode record_id for lsn {decimal_lsn}: {e:?}"))?; | ||||||
|
||||||
Ok(Lsn { | ||||||
vlf_id, | ||||||
block_id, | ||||||
record_id, | ||||||
}) | ||||||
} | ||||||
} | ||||||
|
||||||
impl columnation::Columnation for Lsn { | ||||||
type InnerRegion = columnation::CopyRegion<Lsn>; | ||||||
} | ||||||
|
@@ -589,7 +708,7 @@ impl timely::order::PartialOrder for Lsn { | |||||
} | ||||||
impl timely::order::TotalOrder for Lsn {} | ||||||
|
||||||
/// Structured format of an [`Lsn`]. | ||||||
/// Structured format of a [`Lsn`]. | ||||||
/// | ||||||
/// Note: The derived impl of [`PartialOrd`] and [`Ord`] relies on the field | ||||||
/// ordering so do not change it. | ||||||
|
@@ -695,6 +814,7 @@ impl Operation { | |||||
mod tests { | ||||||
use super::Lsn; | ||||||
use proptest::prelude::*; | ||||||
use tiberius::numeric::Numeric; | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn smoketest_lsn_ordering() { | ||||||
|
@@ -776,4 +896,37 @@ mod tests { | |||||
test_case(random_bytes, num_increment) | ||||||
}) | ||||||
} | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn test_numeric_lsn_ordering() { | ||||||
let a = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00001_i128, 0)).unwrap(); | ||||||
let b = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00002_i128, 0)).unwrap(); | ||||||
let c = Lsn::try_from(Numeric::new_with_scale(45_0000008785_00002_i128, 0)).unwrap(); | ||||||
let d = Lsn::try_from(Numeric::new_with_scale(49_0000008784_00002_i128, 0)).unwrap(); | ||||||
assert!(a < b); | ||||||
assert!(b < c); | ||||||
assert!(c < d); | ||||||
assert!(a < d); | ||||||
|
||||||
assert_eq!(a, a); | ||||||
assert_eq!(b, b); | ||||||
assert_eq!(c, c); | ||||||
assert_eq!(d, d); | ||||||
} | ||||||
|
||||||
#[mz_ore::test] | ||||||
fn test_numeric_lsn_invalid() { | ||||||
let with_decimal = Numeric::new_with_scale(1, 20); | ||||||
assert!(Lsn::try_from(with_decimal).is_err()); | ||||||
|
||||||
for v in [ | ||||||
4294967296_0000000000_00000_i128, // vlf_id is too large | ||||||
1_4294967296_00000_i128, // block_id is too large | ||||||
1_0000000001_65536_i128, // record_id is too large | ||||||
-49_0000008784_00002_i128, // negative is invalid | ||||||
] { | ||||||
let invalid_lsn = Numeric::new_with_scale(v, 0); | ||||||
assert!(Lsn::try_from(invalid_lsn).is_err()); | ||||||
} | ||||||
} | ||||||
} |
Uh oh!
There was an error while loading. Please reload this page.