Skip to content

SQL server: consistent snapshot and lsn selection #32979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
191 changes: 175 additions & 16 deletions src/sql-server-util/src/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,49 @@
//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s
//! optionally from the [`Lsn`] returned in step 1.
//!
//! Internally we get a snapshot by setting our transaction isolation level to
//! [`TransactionIsolationLevel::Snapshot`], getting the current maximum LSN with
//! [`crate::inspect::get_max_lsn`] and then running a `SELECT *`. We've observed that by
//! using [`TransactionIsolationLevel::Snapshot`] the LSN remains stable for the entire
//! transaction.
//! The snapshot process is responsible for identifying a [`Lsn`] that provides
//! a point-in-time view of the data for the table(s) being copied. Similarly to
//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to
//! achieve this.
//!
//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only
//! see writes committed before the transaction began. More specficially, this
//! snapshot is implemented using versions that are visibile based on the
//! transaction sequence number (`XSN`). The `XSN` is set at the first
//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17).
//! This provides us a suitable starting point for capturing the table data.
//! To force a `XSN` to be assigned, experiments have shown that a table must
//! be read. We choose a well-known table that we should already have access to,
//! [cdc.change_tables](https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-change-tables-transact-sql?view=sql-server-ver17),
//! and read a single value from it.
//!
//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`]
//! returned from any CDC tables or CDC functions will always be stale,
//! in relation to the source table that CDC is tracking. The system table
//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17)
//! will contain a [`Lsn`] for any transaction that performs a write operation.
//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17).
//! is sufficient to generate a [`Lsn`] in this case.
//!
//! Unfortunately, it isn't sufficient to just fix the [`Lsn`] and `XSN`. There exists
//! the possibility that a write transaction may have inserted a row into one
//! of the tables in the snapshot, but that write has not comitted. In this case,
//! the `INSERT` has already been written to the transaction log at a [`Lsn`]
//! less than than the one captured, but the snapshot will *not* observe that INSERT
//! because the transaction has not committed, and may not commit until after
//! the snapshot is complete. In order to force a clear delineation of updates,
//! the upstream tables in the snapshot must be locked. This lock only needs
//! to exist long enough to establish the [`Lsn`] and `XSN`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something doesn't check out for me in this paragraph.

There exists the possibility that a write transaction may have inserted a row into one of the tables in the snapshot, but that write has not committed. In this case, the INSERT has already been written to the transaction log at a [Lsn] less than than the one captured, but the snapshot will not observe that INSERT because the transaction has not committed, and may not commit until after the snapshot is complete.

This sounds totally fine? If that transaction commits after the snapshot is complete then in the CDC stream that write will be associated with an LSN that is higher that then snapshot LSN we captured and so it will be correctly applied on top of the snapshot, which as you say will not observe the write.

I think the danger, and reason why we need locks, is that we have no way establishing a transaction MVCC snapshot and an LSN boundary atomically. With postgres we get this API by creating a temp replication slot as the first statement of the transaction with USE_SNAPSHOT, which links the two together. In SQL Server we are forced to perform two independent steps, and the bad scenario is if a write transaction commits between the MVCC point and the LSN boundary point. If that happens we get the table snapshot at the MVCC point, assume it happened at the LSN boundary point, and we completely miss the in-between write during the replication phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that was me not proofreading. I unfortunately wrote it in parts. I trimmed it down. Thanks for catching that Petros!

//!
//! SQL server supports exclusive table locks, but those will only be released
//! once the outermost transaction completes. For this reason, this module
//! uses two connections for the snapshot process. The first conection is used
//! to initiate a transaction and lock the upstream tables. While the first
//! connection maintains the locks, the second connection starts a transaction
//! with [`TransactionIsolationLevel::Snapshot`] isolation and creates a
//! savepoint. Once the savepoint is created and [`Lsn`] is captured, the first
//! connection rolls back the transaction. The snapshot is created by the second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! connection rolls back the transaction. The snapshot is created by the second
//! connection commits the transaction. The snapshot is created by the second

(depends on the discussion in the other comment)

//! connection within the existing transaction.
//!
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return
//! all changes between a `[lower, upper)` bound of [`Lsn`]s.
Expand All @@ -36,6 +74,7 @@ use futures::{Stream, StreamExt};
use mz_ore::retry::RetryResult;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use tiberius::numeric::Numeric;

use crate::{Client, SqlServerError, TransactionIsolationLevel};

Expand All @@ -56,7 +95,7 @@ pub struct CdcStream<'a> {
///
/// Note: When CDC is first enabled in an instance of SQL Server it can take a moment
/// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot
/// we'll wait this duration for SQL Server to report an LSN and thus indicate CDC is
/// we'll wait this duration for SQL Server to report a [`Lsn`] and thus indicate CDC is
/// ready to go.
max_lsn_wait: Duration,
}
Expand Down Expand Up @@ -95,7 +134,7 @@ impl<'a> CdcStream<'a> {
self
}

/// The max duration we'll wait for SQL Server to return an LSN before taking a
/// The max duration we'll wait for SQL Server to return a [`Lsn`] before taking a
/// snapshot.
///
/// When CDC is first enabled in SQL Server it can take a moment before it is fully
Expand All @@ -122,6 +161,8 @@ impl<'a> CdcStream<'a> {
),
SqlServerError,
> {
static SAVEPOINT_NAME: &str = "_mz_snap_";

// Determine what table we need to snapshot.
let instances = self
.capture_instances
Expand All @@ -140,13 +181,63 @@ impl<'a> CdcStream<'a> {
// the upstream DB is ready for CDC.
self.wait_for_ready().await?;

// Intentionally logging this at info for debugging. This section won't get entered
// often, but if there are problems here, it will be much easier to troubleshoot
// knowing where stall/hang might be happening.
tracing::info!("Upstream is ready");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might want to add some additional context here and also are we sure we want this at info level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see! would we ever see these concurrently on the same node? would it be clear which database is ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 - just realized i didn't include ?tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on adding context. The standard format we use is timely-{worker_id} and also include the source id this message is for


// The client that will be used for fencing does not need any special isolation level
// as it will be just be locking the table(s).
let mut fencing_client = self.client.new_connection().await?;
let mut fence_txn = fencing_client.transaction().await?;

// TODO (maz): we should consider a timeout or a lock + snapshot per-table instead of collectively
Copy link
Contributor

@ptravers ptravers Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what telemetry do we need to be able to make decision about the necessity of locking per table?

I would guess we need to know whether the table would have been locked for less time that way?

Copy link
Contributor

@ptravers ptravers Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up: maybe we can batch with a configurable number of tables per batch? we might want to track with a histogram/metric roughly how long we are locking a tables.

seems like this only becomes a problem if there's a very large number of tables or a very slow lock acquisition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a lock + snapshot per-table instead of collectively

I think we know this is better, right? I would just create an issue for it so that we do it. (or, if it's not a big refactor do it now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, sure. Created a GA task! this PR has plenty of action already 😁

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

for (_capture_instance, schema, table) in &tables {
tracing::trace!(%schema, %table, "locking table");
fence_txn.lock_table_exclusive(&*schema, &*table).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an exclusive lock in the end? We don't care if other people concurrently read the table with us, we only want to block writes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the lesser of 2 evils, can add a comment. There is an UPDLOCK, but it only applies to rows/pages. My understanding is we'd have to read the entire table.
If it's combined with TABLOCK, it ends up promoted to an exclusive table lock anyway (tested to make sure this was the case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of TABLOCK with HOLDLOCK which according to the docs here will get a shared lock until the end of the transaction. From a quick test I did this allows other transactions to read the table but makes all writes block, which is what we need. The statement I used was select * from t1 WITH (TABLOCK, HOLDLOCK)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern with SELECT * FROM t1 WITH (TABLOCK, HOLDLOCK) is that it will read the entire table in the fencing path. We are currently using SELECT * FROM t1 WITH (TABLOCKX) WHERE 1=0 which reads no rows.

If you're not opposed, I'd like to tackle the lock improvement after this PR. Relaxing the locking will improve the user experience, but ultimately does not affect correctness.

A quick test of using WHERE 1=0 with HOLDLOCK did not prevent any writes, so we know the query must return some data. For example, SELECT COUNT(*) FROM (SELECT TOP 1 * FROM t1 WITH (TABLOCK, HOLDLOCK)) AS t should read a minimal amount of data, but I would feel better with some testing with large tables to validate that is the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, the * in there wasn't load bearing, it was just me typing in the terminal.

A quick test of using WHERE 1=0 with HOLDLOCK did not prevent any writes

Weird, this works fine for me (writes are prevented). This is what I'm doing:

# terminal A
begin transaction
select * from t1 WITH (TABLOCK, HOLDLOCK) where 1=0
go  <-- returns zero rows

# terminal B
select * from t1
go  <-- works
insert into t1 values (42)
go  <-- blocks

so we know the query must return some data.

If that's the case (can't repro this myself) we might have a problem if the table in question is empty during the initial snapshot. Can you share the commands you ran in your quick test to at least record it here for when we relax the lock?

If you're not opposed, I'd like to tackle the lock improvement after this PR.

Not opposed, let's make an issue in the epic to not forget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think I know why. The issue appears to be if the session that establishes the locks is using SNAPSHOT isolation, it doesn't behave the same way!

So it looks like we can use (TABLOCK, HOLDLOCK) and probably wouldn't be a bad idea to explicitly set the transaction level to READ COMMITTED.

I inadvertently was running the commands in the wrong sessions:
session 1 (snapshot isolation)

1> begin tran;
2> go
1> select * from t1 with (TABLOCK, HOLDLOCK) where 1=0;
2> go

(0 rows affected)

session 2 (read committed isolation, and no explicit transaction)

1> insert into t1 values (4,4);
2> go
(1 row affected)

both sessions show the same in the select *, and commit returns no error in session 2

1> select * from t1;
2> go
id  1
val 1

id  2
val 2

id  3
val 3

id  4
val 4


(4 rows affected)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my! Why does it have to be so subtle!? 💀 So locks are ignored in snapshot isolation. Makes total sense..

}

// So we know that we locked that tables and roughly how long that took based on the time diff
// from the last message.
tracing::info!(?tables, "Locked tables");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same comment as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, let's add context about the timely worker and source id here


self.client
.set_transaction_isolation(TransactionIsolationLevel::Snapshot)
.await?;
let txn = self.client.transaction().await?;
let mut txn = self.client.transaction().await?;

// Creating a savepoint forces a write to the transaction log, which will
// assign a LSN, but it does not force a transaction sequence number to be
// assigned as far as I can tell. I have not observed any entries added to
// `sys.dm_tran_active_snapshot_database_transactions` when creating a savepoint
// or when reading system views to retrieve the LSN.
//
// We choose cdc.change_tables because it is a system table that will exist
// when CDC is enabled, it has a well known schema, and as a CDC client,
// we should be able to read from it already.
let res = txn
.simple_query("SELECT TOP 1 object_id FROM cdc.change_tables")
.await?;
if res.len() != 1 {
Err(SqlServerError::InvariantViolated(
"No objects found in cdc.change_tables".into(),
))?
}

// Because the tables are exclusively locked, any write operation has either
// completed, or is blocked. The LSN and XSN acquired now will represent a
// consistent point-in-time view, such that any comitted write will be
// visible to this snapshot and the LSN of such a write will be less than
// or equal to the LSN captured here.
txn.create_savepoint(SAVEPOINT_NAME).await?;
tracing::info!(%SAVEPOINT_NAME, "Created savepoint");
let lsn = txn.get_lsn().await?;
Copy link
Contributor

@petrosagg petrosagg Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can run this statement after we have committed the fencing transaction to reduce the amount of time we hold the locks for. The LSN is already established and it's fine to read it outside the critical section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed!


// Once the XSN is esablished and the LSN captured, the tables no longer
// need to be locked. Any writes that happen to the upstream tables
// will have a LSN higher than our captured LSN, and will be read from CDC.
fence_txn.rollback().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should commit this transaction instead of rolling it back. What I'm thinking is that when we wrap up the fencing transaction we actually want to ensure that the server didn't decide to cancel it or otherwise mess with it while we weren't looking, since in that case the lock we think we have might not be there.

I don't know if SQL does this too but for example pg cancels a transaction if it can't serialize it and you only learn it at commit time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to changing it. My reasoning for rollback is that this transaction (and the snapshot transaction) should never make any changes to the database. It prevents us accidentally committing things due to some innocuous change in the future. That's entirely based on my own battle scars 😁

A rollback without an active transaction does fail in SQL server

1> rollback;
2> go
Msg 3903, Level 16, State 1, Server 41d33a81ef67, Line 1
The ROLLBACK TRANSACTION request has no corresponding BEGIN TRANSACTION.

In the case of PG, when a transaction is canceled, are you saying that only the commit returns the error? I would expect that commit or rollback would fail because there is no transaction to operate on at that point (or it's in an error state).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to construct a case using KILL but I can't, rollback hangs if I kill the session under its nose. I don't feel strongly about this, it should be fine to use rollback here if any error gets surfaced instead of being swept under the carpet.


// Get the current LSN of the database.
let lsn = crate::inspect::get_max_lsn(txn.client).await?;
tracing::info!(?tables, ?lsn, "starting snapshot");

// Get the size of each table we're about to snapshot.
Expand Down Expand Up @@ -177,10 +268,10 @@ impl<'a> CdcStream<'a> {
tracing::trace!(%capture_instance, %schema_name, %table_name, "snapshot end");
}

// Slightly awkward, but if the commit fails we need to conform to
// Slightly awkward, but if the rollback fails we need to conform to
// type of the stream.
if let Err(e) = txn.commit().await {
yield ("commit".into(), Err(e));
if let Err(e) = txn.rollback().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't follow this change. Is there something we want to undo here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as the above statement with commit vs. rollback. We do not intend to make any changes to the DB, so we rollback the transaction that read all the data.

yield ("rollback".into(), Err(e));
}
};

Expand Down Expand Up @@ -355,7 +446,7 @@ impl<'a> CdcStream<'a> {
}
}

// Ensure all of the capture instances are reporting an LSN.
// Ensure all of the capture instances are reporting a LSN.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "an LSN" is correct because the rule depends on of the pronunciation "el-es-en". Here is a similar example where it's "an LED" instead of "a LED" https://en.wikipedia.org/wiki/Light-emitting_diode

We also write "a UV lamp" even though it's "an ultraviolet lamp" (example courtesy of ChatGPT)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair.. though I feel this is the same type of nonsense that leads to people sayings ATM machine! English is hard.

for instance in self.capture_instances.keys() {
let (_client, min_result) = mz_ore::retry::Retry::default()
.max_duration(self.max_lsn_wait)
Expand Down Expand Up @@ -471,7 +562,7 @@ pub struct Lsn {
impl Lsn {
const SIZE: usize = 10;

/// Interpret the provided bytes as an [`Lsn`].
/// Interpret the provided bytes as a [`Lsn`].
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, String> {
if bytes.len() != Self::SIZE {
return Err(format!("incorrect length, expected 10 got {}", bytes.len()));
Expand Down Expand Up @@ -545,6 +636,40 @@ impl TryFrom<&[u8]> for Lsn {
}
}

impl TryFrom<Numeric> for Lsn {
type Error = String;

fn try_from(value: Numeric) -> Result<Self, Self::Error> {
if value.dec_part() != 0 {
return Err(format!(
"LSN expect Numeric(25,0), but found decimal portion {}",
value.dec_part()
));
}
let mut decimal_lsn = value.int_part();
// LSN is composed of 4 bytes : 4 bytes : 2 bytes
// and MS provided the method to decode that here
// https://github.com/microsoft/sql-server-samples/blob/master/samples/features/ssms-templates/Sql/Change%20Data%20Capture/Enumeration/Create%20Function%20fn_convertnumericlsntobinary.sql

let vlf_id = u32::try_from(decimal_lsn / 10_i128.pow(15))
.map_err(|e| format!("Failed to decode vlf_id for lsn {decimal_lsn}: {e:?}"))?;
decimal_lsn -= i128::try_from(vlf_id).unwrap() * 10_i128.pow(15);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A direct From implementation exists to go from u32 (and others) to i128 https://doc.rust-lang.org/std/primitive.i128.html#impl-From%3Cu32%3E-for-i128

Suggested change
decimal_lsn -= i128::try_from(vlf_id).unwrap() * 10_i128.pow(15);
decimal_lsn -= i128::from(vlf_id) * 10_i128.pow(15);


let block_id = u32::try_from(decimal_lsn / 10_i128.pow(5))
.map_err(|e| format!("Failed to decode block_id for lsn {decimal_lsn}: {e:?}"))?;
decimal_lsn -= i128::try_from(block_id).unwrap() * 10_i128.pow(5);

let record_id = u16::try_from(decimal_lsn)
.map_err(|e| format!("Failed to decode record_id for lsn {decimal_lsn}: {e:?}"))?;

Ok(Lsn {
vlf_id,
block_id,
record_id,
})
}
}

impl columnation::Columnation for Lsn {
type InnerRegion = columnation::CopyRegion<Lsn>;
}
Expand Down Expand Up @@ -589,7 +714,7 @@ impl timely::order::PartialOrder for Lsn {
}
impl timely::order::TotalOrder for Lsn {}

/// Structured format of an [`Lsn`].
/// Structured format of a [`Lsn`].
///
/// Note: The derived impl of [`PartialOrd`] and [`Ord`] relies on the field
/// ordering so do not change it.
Expand Down Expand Up @@ -695,6 +820,7 @@ impl Operation {
mod tests {
use super::Lsn;
use proptest::prelude::*;
use tiberius::numeric::Numeric;

#[mz_ore::test]
fn smoketest_lsn_ordering() {
Expand Down Expand Up @@ -776,4 +902,37 @@ mod tests {
test_case(random_bytes, num_increment)
})
}

#[mz_ore::test]
fn test_numeric_lsn_ordering() {
let a = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00001_i128, 0)).unwrap();
let b = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00002_i128, 0)).unwrap();
let c = Lsn::try_from(Numeric::new_with_scale(45_0000008785_00002_i128, 0)).unwrap();
let d = Lsn::try_from(Numeric::new_with_scale(49_0000008784_00002_i128, 0)).unwrap();
assert!(a < b);
assert!(b < c);
assert!(c < d);
assert!(a < d);

assert_eq!(a, a);
assert_eq!(b, b);
assert_eq!(c, c);
assert_eq!(d, d);
}

#[mz_ore::test]
fn test_numeric_lsn_invalid() {
let with_decimal = Numeric::new_with_scale(1, 20);
assert!(Lsn::try_from(with_decimal).is_err());

for v in [
4294967296_0000000000_00000_i128, // vlf_id is too large
1_4294967296_00000_i128, // block_id is too large
1_0000000001_65536_i128, // record_id is too large
-49_0000008784_00002_i128, // negative is invalid
] {
let invalid_lsn = Numeric::new_with_scale(v, 0);
assert!(Lsn::try_from(invalid_lsn).is_err());
}
}
}
27 changes: 25 additions & 2 deletions src/sql-server-util/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use smallvec::SmallVec;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tiberius::numeric::Numeric;

use crate::cdc::{Lsn, RowFilterOption};
use crate::desc::{SqlServerColumnRaw, SqlServerTableRaw};
Expand Down Expand Up @@ -59,7 +60,29 @@ pub async fn increment_lsn(client: &mut Client, lsn: Lsn) -> Result<Lsn, SqlServ
parse_lsn(&result[..1])
}

/// Parse an [`Lsn`] from the first column of the provided [`tiberius::Row`].
/// Parse a [`Lsn`] in Decimal(25,0) format of the provided [`tiberius::Row`].
///
/// Returns an error if the provided slice doesn't have exactly one row.
pub(crate) fn parse_numeric_lsn(row: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
match row {
[r] => {
let numeric_lsn = r
.try_get::<Numeric, _>(0)?
.ok_or_else(|| SqlServerError::NullLsn)?;
let lsn = Lsn::try_from(numeric_lsn).map_err(|msg| SqlServerError::InvalidData {
column_name: "lsn".to_string(),
error: msg,
})?;
Ok(lsn)
}
other => Err(SqlServerError::InvalidData {
column_name: "lsn".to_string(),
error: format!("expected 1 column, got {other:?}"),
}),
}
}

/// Parse a [`Lsn`] from the first column of the provided [`tiberius::Row`].
///
/// Returns an error if the provided slice doesn't have exactly one row.
fn parse_lsn(result: &[tiberius::Row]) -> Result<Lsn, SqlServerError> {
Expand Down Expand Up @@ -146,7 +169,7 @@ SELECT @mz_cleanup_status_bit;
let max_deletes = i64::cast_from(max_deletes);

// First we need to get a valid LSN as our low watermark. If we try to cleanup
// a change table with an LSN that doesn't exist in the `cdc.lsn_time_mapping`
// a change table with a LSN that doesn't exist in the `cdc.lsn_time_mapping`
// table we'll get an error code `22964`.
let result = client
.query(GET_LSN_QUERY, &[&low_water_mark.as_bytes().as_slice()])
Expand Down
Loading