Skip to content

Commit e502e99

Browse files
committed
Cleanup
1 parent aef6f3e commit e502e99

File tree

6 files changed

+156
-140
lines changed

6 files changed

+156
-140
lines changed

src/sql-server-util/src/cdc.rs

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,46 @@
1717
//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s
1818
//! optionally from the [`Lsn`] returned in step 1.
1919
//!
20-
//! Internally we get a snapshot by setting our transaction isolation level to
21-
//! [`TransactionIsolationLevel::Snapshot`], getting the current maximum LSN with
22-
//! [`crate::inspect::get_max_lsn`] and then running a `SELECT *`. We've observed that by
23-
//! using [`TransactionIsolationLevel::Snapshot`] the LSN remains stable for the entire
24-
//! transaction.
20+
//! The snapshot process is responsible for identifying a [`Lsn`] that provides
21+
//! a point-in-time view of the data for the table(s) being copied. Similarly to
22+
//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to
23+
//! achieve this.
24+
//!
25+
//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only
26+
//! see writes committed before the transaction began. More specficially, this
27+
//! snapshot is implemented using versions that are visibile based on the
28+
//! transaction sequence number (`XSN`). The `XSN` is set at the first
29+
//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17).
30+
//! This provides us a suitable starting point for capturing the table data.
31+
//!
32+
//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`]
33+
//! returned from any CDC tables or CDC functions will always be stale,
34+
//! in relation to the source table that CDC is tracking. The system table
35+
//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17)
36+
//! will contain a [`Lsn`] for any transaction that performs a write operation.
37+
//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17).
38+
//! is sufficient to generate a [`Lsn`] in this case, and will additionally
39+
//! establish the `XSN` used to determine data visibiity.
40+
//!
41+
//! Unfortunately, it isn't sufficient to just create a savepoint. There exists
42+
//! the possibility that a write transaction may have inserted a row into one
43+
//! of the tables in the snapshot, but that write has not comitted. In this case,
44+
//! the `INSERT` has already been written to the transaction log at a [`Lsn`]
45+
//! less than than the one captured, but the snapshot will *not* that INSERT
46+
//! because the transaction has not committed, and may not commit until after
47+
//! the snapshot is complete. In order to force a clear delineation of updates,
48+
//! the upstream tables in the snapshot must be locked. This lock only needs
49+
//! to exist long enough to establish the [`Lsn`] and `XSN`.
50+
//!
51+
//! SQL server supports exclusive table locks, but those will only be released
52+
//! once the outermost transaction completes. For this reason, this module
53+
//! uses two connections for the snapshot process. The first conection is used
54+
//! to initiate a transaction and lock the upstream tables. While the first
55+
//! connection maintains the locks, the second connection starts a transaction
56+
//! with [`TransactionIsolationLevel::Snapshot`] isolation and creates a
57+
//! savepoint. Once the savepoint is created and [`Lsn`] is captured, the first
58+
//! connection rolls back the transaction. The snapshot is created by the second
59+
//! connection within the existing transaction.
2560
//!
2661
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return
2762
//! all changes between a `[lower, upper)` bound of [`Lsn`]s.
@@ -57,7 +92,7 @@ pub struct CdcStream<'a> {
5792
///
5893
/// Note: When CDC is first enabled in an instance of SQL Server it can take a moment
5994
/// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot
60-
/// we'll wait this duration for SQL Server to report an LSN and thus indicate CDC is
95+
/// we'll wait this duration for SQL Server to report a [`Lsn`] and thus indicate CDC is
6196
/// ready to go.
6297
max_lsn_wait: Duration,
6398
}
@@ -96,7 +131,7 @@ impl<'a> CdcStream<'a> {
96131
self
97132
}
98133

99-
/// The max duration we'll wait for SQL Server to return an LSN before taking a
134+
/// The max duration we'll wait for SQL Server to return a [`Lsn`] before taking a
100135
/// snapshot.
101136
///
102137
/// When CDC is first enabled in SQL Server it can take a moment before it is fully
@@ -143,45 +178,39 @@ impl<'a> CdcStream<'a> {
143178
// the upstream DB is ready for CDC.
144179
self.wait_for_ready().await?;
145180

146-
let mut fencing_client = self.client.new_connection().await?;
181+
tracing::info!("Upstream is ready");
182+
147183
// The client that will be used for fencing does not need any special isolation level
148-
// as it will be just be locking the tables
184+
// as it will be just be locking the table(s).
185+
let mut fencing_client = self.client.new_connection().await?;
149186
let mut fence_txn = fencing_client.transaction().await?;
150-
// lock all the tables we are planning to snapshot so that we can ensure that
151-
// writes that might be in progress are properly ordered before or after this snapshot
152-
// in addition to the LSN being properly ordered.
153-
// TODO (maz): we should considering a timeout here because we may lock some tables,
154-
// and the next table may be locked for some extended period, resulting in a traffic
155-
// jam.
187+
188+
// TODO (maz): we should consider a timeout or a lock + snapshot per-table instead of collectively
156189
for (_capture_instance, schema, table) in &tables {
157190
tracing::trace!(%schema, %table, "locking table");
158-
crate::inspect::lock_table(&mut fence_txn, &*schema, &*table).await?;
191+
fence_txn.lock_table_exclusive(&*schema, &*table).await?;
159192
}
193+
tracing::info!(?tables, "Locked tables");
160194

161195
self.client
162196
.set_transaction_isolation(TransactionIsolationLevel::Snapshot)
163197
.await?;
164198
let mut txn = self.client.transaction().await?;
165-
// The result here is not important, what we are doing is establishing a transaction sequence number (XSN)
166-
// while using SNAPSHOT isolation that will be concurrent with a quiesced set of tables that we
167-
// wish to snapshot. Regardless what you might read in *many* articles on Microsoft's site, the XSN is not
168-
// set at BEGIN TRANSACTION, but at the first read/write.
169-
// The choice of table is driven by a few factors:
170-
// - it's a system table, we know it must exist and it will have a well defined schema
171-
// - MZ is a CDC client, so should be be able to read from it
172-
let res = txn
173-
.simple_query("SELECT TOP 1 object_id FROM cdc.change_tables;")
174-
.await?;
175-
// TODO (maz): nicer error if, somehow, there are no more change tables
176-
assert_eq!(res.len(), 1);
177199

178-
// Creating a savepoint forces a write to the transaction log, which will assign an LSN to this transaction
179-
// which is concurrent with the tables that are currently in a consistent state.
200+
// Creating a savepoint forces a write to the transaction log, which will
201+
// assign a LSN and a transaction sequence number (XSN).
202+
// Because the tables are exclusively locked, any write operation has either
203+
// completed, or is blocked. The LSN and XSN acquired now represents a
204+
// consistent point-in-time view, such that any comitted write will be
205+
// visible to this snapshot and the LSN of such a write will be less than
206+
// or equal to the LSN captured here.
180207
txn.create_savepoint(SAVEPOINT_NAME).await?;
208+
tracing::info!(%SAVEPOINT_NAME, "Created savepoint");
209+
let lsn = txn.get_lsn().await?;
181210

182-
let lsn = crate::inspect::get_lsn(&mut txn).await?;
183-
184-
// once we have the snapshot, we can rollback the fencing transaction to allow access to the tables.
211+
// Once the XSN is esablished and the LSN captured, the tables no longer
212+
// need to be locked. Any writes that happen to the upstream tables
213+
// will have a LSN higher than our captured LSN, and will be read from CDC.
185214
fence_txn.rollback().await?;
186215

187216
tracing::info!(?tables, ?lsn, "starting snapshot");
@@ -392,7 +421,7 @@ impl<'a> CdcStream<'a> {
392421
}
393422
}
394423

395-
// Ensure all of the capture instances are reporting an LSN.
424+
// Ensure all of the capture instances are reporting a LSN.
396425
for instance in self.capture_instances.keys() {
397426
let (_client, min_result) = mz_ore::retry::Retry::default()
398427
.max_duration(self.max_lsn_wait)
@@ -508,7 +537,7 @@ pub struct Lsn {
508537
impl Lsn {
509538
const SIZE: usize = 10;
510539

511-
/// Interpret the provided bytes as an [`Lsn`].
540+
/// Interpret the provided bytes as a [`Lsn`].
512541
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, String> {
513542
if bytes.len() != Self::SIZE {
514543
return Err(format!("incorrect length, expected 10 got {}", bytes.len()));
@@ -660,7 +689,7 @@ impl timely::order::PartialOrder for Lsn {
660689
}
661690
impl timely::order::TotalOrder for Lsn {}
662691

663-
/// Structured format of an [`Lsn`].
692+
/// Structured format of a [`Lsn`].
664693
///
665694
/// Note: The derived impl of [`PartialOrd`] and [`Ord`] relies on the field
666695
/// ordering so do not change it.

src/sql-server-util/src/inspect.rs

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tiberius::numeric::Numeric;
2020

2121
use crate::cdc::{Lsn, RowFilterOption};
2222
use crate::desc::{SqlServerColumnRaw, SqlServerTableRaw};
23-
use crate::{Client, SqlServerError, Transaction};
23+
use crate::{Client, SqlServerError};
2424

2525
/// Returns the minimum log sequence number for the specified `capture_instance`.
2626
///
@@ -60,55 +60,10 @@ pub async fn increment_lsn(client: &mut Client, lsn: Lsn) -> Result<Lsn, SqlServ
6060
parse_lsn(&result[..1])
6161
}
6262

63-
/// Retrieves the current LSN of the database by calling SAVE TRANSACTION to create a savepoint with the provided name,
64-
/// which forces an LSN to be written to the transaction log.
65-
///
66-
/// The savepoint name must follow rules for SQL Server identifiers
67-
/// - starts with letter or underscore
68-
/// - only contains letters, digits, and underscores
69-
/// - no resserved words
70-
/// - 32 char max
71-
pub async fn create_savepoint(
72-
txn: &mut Transaction<'_>,
73-
savepoint_name: &str,
74-
) -> Result<(), SqlServerError> {
75-
// TODO (maz): make sure savepoint name is safe
76-
let _result = txn
77-
.client
78-
.simple_query(format!("SAVE TRANSACTION [{savepoint_name}]"))
79-
.await?;
80-
Ok(())
81-
}
82-
83-
pub async fn get_lsn(txn: &mut Transaction<'_>) -> Result<Lsn, SqlServerError> {
84-
static CURRENT_LSN_QUERY: &str = "
85-
SELECT dt.database_transaction_begin_lsn
86-
FROM sys.dm_tran_database_transactions AS dt
87-
JOIN sys.dm_tran_session_transactions AS st
88-
ON dt.transaction_id = st.transaction_id
89-
WHERE st.session_id = @@SPID
90-
";
91-
let result = txn.client.simple_query(CURRENT_LSN_QUERY).await?;
92-
parse_numeric_lsn(&result)
93-
}
94-
95-
pub async fn lock_table(
96-
txn: &mut Transaction<'_>,
97-
schema: &str,
98-
table: &str,
99-
) -> Result<(), SqlServerError> {
100-
// This query probably seems odd, but there is no LOCK command in MS SQL. Locks are specified
101-
// in SELECT using the WITH keyword. This query does not need to return any rows to lock the table,
102-
// hence the 1=0, which is something short that always evaluates to false in this universe;
103-
let query = format!("SELECT * FROM {schema}.{table} WITH (TABLOCKX) WHERE 1=0;");
104-
let _result = txn.client.query(query, &[]).await?;
105-
Ok(())
106-
}
107-
108-
/// Parse an [`Lsn`] in Decimal(25,0) format of the provided [`tiberius::Row`].
63+
/// Parse a [`Lsn`] in Decimal(25,0) format of the provided [`tiberius::Row`].
10964
///
11065
/// Returns an error if the provided slice doesn't have exactly one row.
111-
fn parse_numeric_lsn(row: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
66+
pub(crate) fn parse_numeric_lsn(row: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
11267
match row {
11368
[r] => {
11469
let numeric_lsn = r
@@ -127,7 +82,7 @@ fn parse_numeric_lsn(row: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
12782
}
12883
}
12984

130-
/// Parse an [`Lsn`] from the first column of the provided [`tiberius::Row`].
85+
/// Parse a [`Lsn`] from the first column of the provided [`tiberius::Row`].
13186
///
13287
/// Returns an error if the provided slice doesn't have exactly one row.
13388
fn parse_lsn(result: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
@@ -214,7 +169,7 @@ SELECT @mz_cleanup_status_bit;
214169
let max_deletes = i64::cast_from(max_deletes);
215170

216171
// First we need to get a valid LSN as our low watermark. If we try to cleanup
217-
// a change table with an LSN that doesn't exist in the `cdc.lsn_time_mapping`
172+
// a change table with a LSN that doesn't exist in the `cdc.lsn_time_mapping`
218173
// table we'll get an error code `22964`.
219174
let result = client
220175
.query(GET_LSN_QUERY, &[&low_water_mark.as_bytes().as_slice()])

src/sql-server-util/src/lib.rs

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod inspect;
3434
pub use config::Config;
3535
pub use desc::{ProtoSqlServerColumnDesc, ProtoSqlServerTableDesc};
3636

37+
use crate::cdc::Lsn;
3738
use crate::config::TunnelConfig;
3839
use crate::desc::SqlServerColumnDecodeType;
3940

@@ -105,6 +106,8 @@ impl Client {
105106
Ok(client)
106107
}
107108

109+
/// Create a new Client instance with the same configuration that created
110+
/// this configuration.
108111
pub async fn new_connection(&self) -> Result<Self, SqlServerError> {
109112
Self::connect(self.config.clone()).await
110113
}
@@ -342,7 +345,7 @@ pub type RowStream<'a> =
342345
pub struct Transaction<'a> {
343346
client: &'a mut Client,
344347
closed: bool,
345-
nested_xact_names: Vec<String>,
348+
savepoints: Vec<String>,
346349
}
347350

348351
impl<'a> Transaction<'a> {
@@ -359,34 +362,68 @@ impl<'a> Transaction<'a> {
359362
Ok(Transaction {
360363
client,
361364
closed: false,
362-
nested_xact_names: Default::default(),
365+
savepoints: Default::default(),
363366
})
364367
}
365368
}
366369

367-
/// Creates a savepoint with a transaction that can be committed or rolled back
368-
/// without affecting the out transaction.
370+
/// Creates a savepoint via `SAVE TRANSACTION` with the provided name.
371+
/// Creating a savepoint forces a write to the transaction log, which will associate an
372+
/// [`Lsn`] with the current transaction.
373+
///
374+
/// The savepoint name must follow rules for SQL Server identifiers
375+
/// - starts with letter or underscore
376+
/// - only contains letters, digits, and underscores
377+
/// - no reserved words
378+
/// - 32 char max
369379
pub async fn create_savepoint(&mut self, savepoint_name: &str) -> Result<(), SqlServerError> {
380+
// Limit the name checks to prevent sending a potentially dangerous string to the SQL Server.
381+
// We prefer the server do the majority of the validation.
382+
if savepoint_name.is_empty()
383+
|| !savepoint_name
384+
.chars()
385+
.all(|c| c.is_alphanumeric() || c == '_')
386+
{
387+
Err(SqlServerError::ProgrammingError(format!(
388+
"Invalid savepoint name: '{savepoint_name}"
389+
)))?;
390+
}
391+
370392
let stmt = format!("SAVE TRANSACTION [{savepoint_name}]");
371393
let _result = self.client.simple_query(stmt).await?;
372-
self.nested_xact_names.push(savepoint_name.to_string());
394+
self.savepoints.push(savepoint_name.to_string());
373395
Ok(())
374396
}
375397

376-
pub async fn rollback_savepoint(&mut self, savepoint_name: &str) -> Result<(), SqlServerError> {
377-
let last_xact_name = self.nested_xact_names.pop();
378-
if last_xact_name
379-
.as_ref()
380-
.is_none_or(|last_xact_name| *last_xact_name != savepoint_name)
381-
{
382-
panic!(
383-
"Attempt to rollback savepoint {savepoint_name} doesn't match last savepoint {:?}",
384-
last_xact_name
385-
);
386-
}
387-
let stmt = format!("ROLLBACK TRANSACTION [{savepoint_name}]");
388-
let _result = self.client.simple_query(stmt).await?;
389-
self.nested_xact_names.push(savepoint_name.to_string());
398+
/// Retrieve the [`Lsn`] associated with the current session.
399+
///
400+
/// MS SQL Server will not assign a [`Lsn`] until a write is performed (e.g. via `SAVE TRANSACTION`).
401+
pub async fn get_lsn(&mut self) -> Result<Lsn, SqlServerError> {
402+
static CURRENT_LSN_QUERY: &str = "
403+
SELECT dt.database_transaction_begin_lsn
404+
FROM sys.dm_tran_database_transactions AS dt
405+
JOIN sys.dm_tran_session_transactions AS st
406+
ON dt.transaction_id = st.transaction_id
407+
WHERE st.session_id = @@SPID
408+
";
409+
let result = self.client.simple_query(CURRENT_LSN_QUERY).await?;
410+
crate::inspect::parse_numeric_lsn(&result)
411+
}
412+
413+
/// Exclusively lock the provided table, uses `TABLOCKX`.
414+
///
415+
/// The lock is obtained using a `SELECT` statement that will not read the table. The lock is released
416+
/// after transaction commit or rollback.
417+
pub async fn lock_table_exclusive(
418+
&mut self,
419+
schema: &str,
420+
table: &str,
421+
) -> Result<(), SqlServerError> {
422+
// This query probably seems odd, but there is no LOCK command in MS SQL. Locks are specified
423+
// in SELECT using the WITH keyword. This query does not need to return any rows to lock the table,
424+
// hence the 1=0, which is something short that always evaluates to false in this universe;
425+
let query = format!("SELECT * FROM [{schema}].[{table}] WITH (TABLOCKX) WHERE 1=0;");
426+
let _result = self.client.simple_query(query).await?;
390427
Ok(())
391428
}
392429

@@ -434,6 +471,7 @@ impl<'a> Transaction<'a> {
434471
// N.B. Mark closed _before_ running the query. This prevents us from
435472
// double closing the transaction if this query itself fails.
436473
self.closed = true;
474+
self.savepoints.clear();
437475
self.client.simple_query(ROLLBACK_QUERY).await?;
438476
Ok(())
439477
}
@@ -444,6 +482,7 @@ impl<'a> Transaction<'a> {
444482
// N.B. Mark closed _before_ running the query. This prevents us from
445483
// double closing the transaction if this query itself fails.
446484
self.closed = true;
485+
self.savepoints.clear();
447486
self.client.simple_query(COMMIT_QUERY).await?;
448487
Ok(())
449488
}

src/storage-types/src/sources/sql_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ include!(concat!(
3737
pub const SNAPSHOT_MAX_LSN_WAIT: Config<Duration> = Config::new(
3838
"sql_server_snapshot_max_lsn_wait",
3939
Duration::from_secs(30),
40-
"Maximum amount of time we'll wait for SQL Server to report an LSN (in other words for \
40+
"Maximum amount of time we'll wait for SQL Server to report a LSN (in other words for \
4141
CDC to be fully enabled) before taking an initial snapshot.",
4242
);
4343

0 commit comments

Comments
 (0)