From 494a60042e29efb60645dd969ea5d39eeccac8da Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Tue, 20 May 2025 20:34:49 +0100 Subject: [PATCH 1/3] refine handling of moving between replay and continue states --- .../Data/SqlClient/SqlCachedBuffer.cs | 2 +- .../Data/SqlClient/TdsParserStateObject.cs | 86 +++++++++++++------ .../SQL/DataReaderTest/DataReaderTest.cs | 56 ++++++++++++ 3 files changed, 115 insertions(+), 29 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs index d7fdf3fa48..42703d29e9 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs @@ -78,7 +78,7 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser byte[] byteArr = new byte[cb]; // pass false for the writeDataSizeToSnapshot parameter because we want to only take data // from the current packet and not try to do a continue-capable multi packet read - result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, writeDataSizeToSnapshot: false, compatibilityMode: false); + result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, isAvailable, writeDataSizeToSnapshot: false, compatibilityMode: false); if (result != TdsOperationStatus.Done) { if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length) diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs index 8cbc275043..e8057c3e58 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @@ -2037,14 +2037,15 @@ internal int ReadPlpBytesChunk(byte[] buff, int offset, int len) internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead) { + bool canContinue = false; bool isStarting = false; bool isContinuing = false; bool compatibilityMode = LocalAppContextSwitches.UseCompatibilityAsyncBehaviour; if (!compatibilityMode) { - (_, isStarting, isContinuing) = GetSnapshotStatuses(); + (canContinue, isStarting, isContinuing) = GetSnapshotStatuses(); } - return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, isStarting || isContinuing, compatibilityMode); + return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, canContinue, canContinue, compatibilityMode); } // Reads the requested number of bytes from a plp data stream, or the entire data if // requested length is -1 or larger than the actual length of data. First call to this method @@ -2052,7 +2053,7 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len // Returns the actual bytes read. // NOTE: This method must be retriable WITHOUT replaying a snapshot // Every time you call this method increment the offset and decrease len by the value of totalBytesRead - internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool writeDataSizeToSnapshot, bool compatibilityMode) + internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool canContinue, bool writeDataSizeToSnapshot, bool compatibilityMode) { totalBytesRead = 0; @@ -2077,9 +2078,16 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len // If total length is known up front, allocate the whole buffer in one shot instead of realloc'ing and copying over each time if (buff == null && _longlen != TdsEnums.SQL_PLP_UNKNOWNLEN) { - if (writeDataSizeToSnapshot) + if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive) + { + // legacy replay path perf optimization + // if there is a snapshot which contains a stored plp buffer take it + // and try to use it if it is the right length + buff = TryTakeSnapshotStorage() as byte[]; + } + else if (writeDataSizeToSnapshot && canContinue && _snapshot != null) { - // if there is a snapshot and it contains a stored plp buffer take it + // if there is a snapshot which it contains a stored plp buffer take it // and try to use it if it is the right length buff = TryTakeSnapshotStorage() as byte[]; if (buff != null) @@ -2088,13 +2096,7 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len totalBytesRead = offset; } } - else if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive) - { - // legacy replay path perf optimization - // if there is a snapshot and it contains a stored plp buffer take it - // and try to use it if it is the right length - buff = TryTakeSnapshotStorage() as byte[]; - } + if ((ulong)(buff?.Length ?? 0) != _longlen) { @@ -2146,23 +2148,29 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len _longlenleft -= (ulong)bytesRead; if (result != TdsOperationStatus.Done) { - if (writeDataSizeToSnapshot) + if (compatibilityMode && _snapshot != null) { + // legacy replay path perf optimization // a partial read has happened so store the target buffer in the snapshot // so it can be re-used when another packet arrives and we read again SetSnapshotStorage(buff); - SetSnapshotDataSize(bytesRead); - } - else if (compatibilityMode && _snapshot != null) + else if (canContinue) { - // legacy replay path perf optimization // a partial read has happened so store the target buffer in the snapshot // so it can be re-used when another packet arrives and we read again SetSnapshotStorage(buff); + if (writeDataSizeToSnapshot) + { + SetSnapshotDataSize(bytesRead); + } } return result; } + if (writeDataSizeToSnapshot && canContinue) + { + SetSnapshotDataSize(bytesRead); + } if (_longlenleft == 0) { @@ -2170,20 +2178,20 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len result = TryReadPlpLength(false, out _); if (result != TdsOperationStatus.Done) { - if (writeDataSizeToSnapshot) - { - if (result == TdsOperationStatus.NeedMoreData) - { - SetSnapshotStorage(buff); - SetSnapshotDataSize(bytesRead); - } - } - else if (compatibilityMode && _snapshot != null) + if (compatibilityMode && _snapshot != null) { // a partial read has happened so store the target buffer in the snapshot // so it can be re-used when another packet arrives and we read again SetSnapshotStorage(buff); } + else if (canContinue && result == TdsOperationStatus.NeedMoreData) + { + SetSnapshotStorage(buff); + if (writeDataSizeToSnapshot) + { + SetSnapshotDataSize(bytesRead); + } + } return result; } } @@ -3455,9 +3463,9 @@ internal bool IsSnapshotContinuing() _snapshotStatus == TdsParserStateObject.SnapshotStatus.ContinueRunning; } - internal (bool IsAvailable, bool IsStarting, bool IsContinuing) GetSnapshotStatuses() + internal (bool CanContinue, bool IsStarting, bool IsContinuing) GetSnapshotStatuses() { - bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled; + bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive; bool isStarting = false; bool isContinuing = false; if (isAvailable) @@ -4020,6 +4028,28 @@ internal void CheckStack(string trace) Debug.Assert(_stateObj._permitReplayStackTraceToDiffer || prev.Stack == trace, "The stack trace on subsequent replays should be the same"); } } + + public int CurrentPacketIndex + { + get + { + int value = -1; + if (_current != null) + { + PacketData current = _firstPacket; + while (current != null) + { + value += 1; + if (current == _current) + { + break; + } + current = current.NextPacket; + } + } + return value; + } + } #endif public bool ContinueEnabled => !LocalAppContextSwitches.UseCompatibilityAsyncBehaviour; diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs index e64d0fc362..c20a30c214 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs @@ -6,6 +6,8 @@ using System.Collections.Generic; using System.Data; using System.Data.SqlTypes; +using System.Diagnostics; +using System.Linq; using System.Reflection; using System.Text; using System.Threading; @@ -603,6 +605,60 @@ integrated into a comprehensive development } } + [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))] + public static async Task CanReadBinaryData() + { + const int Size = 20_000; + + byte[] data = Enumerable.Range(0, Size) + .Select(i => (byte)(i % 256)) + .ToArray(); + string tableName = DataTestUtility.GenerateObjectName(); + + using (var connection = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + await connection.OpenAsync(); + + try + { + using (var createCommand = connection.CreateCommand()) + { + createCommand.CommandText = $@" +DROP TABLE IF EXISTS [{tableName}] +CREATE TABLE [{tableName}] (Id INT IDENTITY(1,1) PRIMARY KEY, Data VARBINARY(MAX)); +INSERT INTO [{tableName}] (Data) VALUES (@data);"; + createCommand.Parameters.Add(new SqlParameter("@data", SqlDbType.VarBinary, Size) { Value = data }); + await createCommand.ExecuteNonQueryAsync(); + } + + using (var command = connection.CreateCommand()) + { + + command.CommandText = $"SELECT Data FROM [{tableName}]"; + command.Parameters.Clear(); + var result = (byte[])await command.ExecuteScalarAsync(); + + Debug.Assert(data.SequenceEqual(result)); + } + + } + finally + { + try + { + using (var dropCommand = connection.CreateCommand()) + { + dropCommand.CommandText = $"DROP TABLE IF EXISTS [{tableName}]"; + dropCommand.ExecuteNonQuery(); + } + } + catch + { + } + } + } + } + // Synapse: Cannot find data type 'rowversion'. [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureSynapse))] public static void CheckLegacyNullRowVersionIsEmptyArray() From 24f6d46fbfdf3b1bad83fddf25b079d002cdecda Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Wed, 21 May 2025 21:51:49 +0100 Subject: [PATCH 2/3] address review feedback --- .../src/Microsoft/Data/SqlClient/TdsParser.cs | 10 +++--- .../src/Microsoft/Data/SqlClient/TdsParser.cs | 10 +++--- .../Data/SqlClient/SqlCachedBuffer.cs | 8 ++--- .../Data/SqlClient/TdsParserStateObject.cs | 34 ++++--------------- 4 files changed, 20 insertions(+), 42 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs index d66984d8b7..c5d32a9a41 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -6500,8 +6500,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta bytes = null; int offset = 0; byte[] temp = null; - (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); - if (isAvailable) + (bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + if (canContinue) { if (isContinuing || isStarting) { @@ -12983,9 +12983,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj char[] temp = null; bool buffIsRented = false; int startOffset = 0; - (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + (bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); - if (isAvailable) + if (canContinue) { if (isContinuing || isStarting) { @@ -13004,7 +13004,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj length >> 1, stateObj, out length, - supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot + supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot rentedBuff: ref buffIsRented, startOffset, isStarting || isContinuing diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs index 130094dc53..3e9266f897 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -6696,8 +6696,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta bytes = null; int offset = 0; byte[] temp = null; - (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); - if (isAvailable) + (bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + if (canContinue) { if (isContinuing || isStarting) { @@ -13176,9 +13176,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj char[] temp = null; bool buffIsRented = false; int startOffset = 0; - (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + (bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); - if (isAvailable) + if (canContinue) { if (isContinuing || isStarting) { @@ -13197,7 +13197,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj length >> 1, stateObj, out length, - supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot + supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot rentedBuff: ref buffIsRented, startOffset, isStarting || isContinuing diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs index 42703d29e9..82ef37fb6b 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs @@ -39,10 +39,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser { buffer = null; - (bool isAvailable, bool isStarting, _) = stateObj.GetSnapshotStatuses(); + (bool canContinue, bool isStarting, _) = stateObj.GetSnapshotStatuses(); List cachedBytes = null; - if (isAvailable) + if (canContinue) { cachedBytes = stateObj.TryTakeSnapshotStorage() as List; if (isStarting) @@ -78,10 +78,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser byte[] byteArr = new byte[cb]; // pass false for the writeDataSizeToSnapshot parameter because we want to only take data // from the current packet and not try to do a continue-capable multi packet read - result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, isAvailable, writeDataSizeToSnapshot: false, compatibilityMode: false); + result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, canContinue, writeDataSizeToSnapshot: false, compatibilityMode: false); if (result != TdsOperationStatus.Done) { - if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length) + if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length) { // succeeded in getting the data but failed to find the next plp length returnAfterAdd = true; diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs index e8057c3e58..ea23fb6937 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @@ -1871,7 +1871,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En } byte[] buf = null; int offset = 0; - (bool isAvailable, bool isStarting, bool isContinuing) = GetSnapshotStatuses(); + (bool canContinue, bool isStarting, bool isContinuing) = GetSnapshotStatuses(); if (isPlp) { @@ -1889,7 +1889,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En if (((_inBytesUsed + length) > _inBytesRead) || (_inBytesPacket < length)) { int startOffset = 0; - if (isAvailable) + if (canContinue) { if (isContinuing || isStarting) { @@ -1907,7 +1907,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En buf = new byte[length]; } - TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, isAvailable); + TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, canContinue); if (result != TdsOperationStatus.Done) { @@ -3465,15 +3465,15 @@ internal bool IsSnapshotContinuing() internal (bool CanContinue, bool IsStarting, bool IsContinuing) GetSnapshotStatuses() { - bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive; + bool canContinue = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive; bool isStarting = false; bool isContinuing = false; - if (isAvailable) + if (canContinue) { isStarting = _snapshotStatus == SnapshotStatus.ReplayStarting; isContinuing = _snapshotStatus == SnapshotStatus.ContinueRunning; } - return (isAvailable, isStarting, isContinuing); + return (canContinue, isStarting, isContinuing); } internal int GetSnapshotStorageLength() @@ -4028,28 +4028,6 @@ internal void CheckStack(string trace) Debug.Assert(_stateObj._permitReplayStackTraceToDiffer || prev.Stack == trace, "The stack trace on subsequent replays should be the same"); } } - - public int CurrentPacketIndex - { - get - { - int value = -1; - if (_current != null) - { - PacketData current = _firstPacket; - while (current != null) - { - value += 1; - if (current == _current) - { - break; - } - current = current.NextPacket; - } - } - return value; - } - } #endif public bool ContinueEnabled => !LocalAppContextSwitches.UseCompatibilityAsyncBehaviour; From 648569129588fe011f96345bf9dbac34d6aa8355 Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Thu, 22 May 2025 18:51:35 +0100 Subject: [PATCH 3/3] additional review feedback changes --- .../tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs index c20a30c214..9c4baf8050 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs @@ -638,7 +638,7 @@ DROP TABLE IF EXISTS [{tableName}] command.Parameters.Clear(); var result = (byte[])await command.ExecuteScalarAsync(); - Debug.Assert(data.SequenceEqual(result)); + Assert.Equal(data, result); } }