Skip to content

Commit 494a600

Browse files
committed
refine handling of moving between replay and continue states
1 parent b8948f2 commit 494a600

File tree

3 files changed

+115
-29
lines changed

3 files changed

+115
-29
lines changed

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
7878
byte[] byteArr = new byte[cb];
7979
// pass false for the writeDataSizeToSnapshot parameter because we want to only take data
8080
// from the current packet and not try to do a continue-capable multi packet read
81-
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, writeDataSizeToSnapshot: false, compatibilityMode: false);
81+
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, isAvailable, writeDataSizeToSnapshot: false, compatibilityMode: false);
8282
if (result != TdsOperationStatus.Done)
8383
{
8484
if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length)

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,22 +2037,23 @@ internal int ReadPlpBytesChunk(byte[] buff, int offset, int len)
20372037

20382038
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead)
20392039
{
2040+
bool canContinue = false;
20402041
bool isStarting = false;
20412042
bool isContinuing = false;
20422043
bool compatibilityMode = LocalAppContextSwitches.UseCompatibilityAsyncBehaviour;
20432044
if (!compatibilityMode)
20442045
{
2045-
(_, isStarting, isContinuing) = GetSnapshotStatuses();
2046+
(canContinue, isStarting, isContinuing) = GetSnapshotStatuses();
20462047
}
2047-
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, isStarting || isContinuing, compatibilityMode);
2048+
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, canContinue, canContinue, compatibilityMode);
20482049
}
20492050
// Reads the requested number of bytes from a plp data stream, or the entire data if
20502051
// requested length is -1 or larger than the actual length of data. First call to this method
20512052
// should be preceeded by a call to ReadPlpLength or ReadDataLength.
20522053
// Returns the actual bytes read.
20532054
// NOTE: This method must be retriable WITHOUT replaying a snapshot
20542055
// Every time you call this method increment the offset and decrease len by the value of totalBytesRead
2055-
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool writeDataSizeToSnapshot, bool compatibilityMode)
2056+
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool canContinue, bool writeDataSizeToSnapshot, bool compatibilityMode)
20562057
{
20572058
totalBytesRead = 0;
20582059

@@ -2077,9 +2078,16 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
20772078
// If total length is known up front, allocate the whole buffer in one shot instead of realloc'ing and copying over each time
20782079
if (buff == null && _longlen != TdsEnums.SQL_PLP_UNKNOWNLEN)
20792080
{
2080-
if (writeDataSizeToSnapshot)
2081+
if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
2082+
{
2083+
// legacy replay path perf optimization
2084+
// if there is a snapshot which contains a stored plp buffer take it
2085+
// and try to use it if it is the right length
2086+
buff = TryTakeSnapshotStorage() as byte[];
2087+
}
2088+
else if (writeDataSizeToSnapshot && canContinue && _snapshot != null)
20812089
{
2082-
// if there is a snapshot and it contains a stored plp buffer take it
2090+
// if there is a snapshot which it contains a stored plp buffer take it
20832091
// and try to use it if it is the right length
20842092
buff = TryTakeSnapshotStorage() as byte[];
20852093
if (buff != null)
@@ -2088,13 +2096,7 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
20882096
totalBytesRead = offset;
20892097
}
20902098
}
2091-
else if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
2092-
{
2093-
// legacy replay path perf optimization
2094-
// if there is a snapshot and it contains a stored plp buffer take it
2095-
// and try to use it if it is the right length
2096-
buff = TryTakeSnapshotStorage() as byte[];
2097-
}
2099+
20982100

20992101
if ((ulong)(buff?.Length ?? 0) != _longlen)
21002102
{
@@ -2146,44 +2148,50 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
21462148
_longlenleft -= (ulong)bytesRead;
21472149
if (result != TdsOperationStatus.Done)
21482150
{
2149-
if (writeDataSizeToSnapshot)
2151+
if (compatibilityMode && _snapshot != null)
21502152
{
2153+
// legacy replay path perf optimization
21512154
// a partial read has happened so store the target buffer in the snapshot
21522155
// so it can be re-used when another packet arrives and we read again
21532156
SetSnapshotStorage(buff);
2154-
SetSnapshotDataSize(bytesRead);
2155-
21562157
}
2157-
else if (compatibilityMode && _snapshot != null)
2158+
else if (canContinue)
21582159
{
2159-
// legacy replay path perf optimization
21602160
// a partial read has happened so store the target buffer in the snapshot
21612161
// so it can be re-used when another packet arrives and we read again
21622162
SetSnapshotStorage(buff);
2163+
if (writeDataSizeToSnapshot)
2164+
{
2165+
SetSnapshotDataSize(bytesRead);
2166+
}
21632167
}
21642168
return result;
21652169
}
2170+
if (writeDataSizeToSnapshot && canContinue)
2171+
{
2172+
SetSnapshotDataSize(bytesRead);
2173+
}
21662174

21672175
if (_longlenleft == 0)
21682176
{
21692177
// Read the next chunk or cleanup state if hit the end
21702178
result = TryReadPlpLength(false, out _);
21712179
if (result != TdsOperationStatus.Done)
21722180
{
2173-
if (writeDataSizeToSnapshot)
2174-
{
2175-
if (result == TdsOperationStatus.NeedMoreData)
2176-
{
2177-
SetSnapshotStorage(buff);
2178-
SetSnapshotDataSize(bytesRead);
2179-
}
2180-
}
2181-
else if (compatibilityMode && _snapshot != null)
2181+
if (compatibilityMode && _snapshot != null)
21822182
{
21832183
// a partial read has happened so store the target buffer in the snapshot
21842184
// so it can be re-used when another packet arrives and we read again
21852185
SetSnapshotStorage(buff);
21862186
}
2187+
else if (canContinue && result == TdsOperationStatus.NeedMoreData)
2188+
{
2189+
SetSnapshotStorage(buff);
2190+
if (writeDataSizeToSnapshot)
2191+
{
2192+
SetSnapshotDataSize(bytesRead);
2193+
}
2194+
}
21872195
return result;
21882196
}
21892197
}
@@ -3455,9 +3463,9 @@ internal bool IsSnapshotContinuing()
34553463
_snapshotStatus == TdsParserStateObject.SnapshotStatus.ContinueRunning;
34563464
}
34573465

3458-
internal (bool IsAvailable, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
3466+
internal (bool CanContinue, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
34593467
{
3460-
bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled;
3468+
bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive;
34613469
bool isStarting = false;
34623470
bool isContinuing = false;
34633471
if (isAvailable)
@@ -4020,6 +4028,28 @@ internal void CheckStack(string trace)
40204028
Debug.Assert(_stateObj._permitReplayStackTraceToDiffer || prev.Stack == trace, "The stack trace on subsequent replays should be the same");
40214029
}
40224030
}
4031+
4032+
public int CurrentPacketIndex
4033+
{
4034+
get
4035+
{
4036+
int value = -1;
4037+
if (_current != null)
4038+
{
4039+
PacketData current = _firstPacket;
4040+
while (current != null)
4041+
{
4042+
value += 1;
4043+
if (current == _current)
4044+
{
4045+
break;
4046+
}
4047+
current = current.NextPacket;
4048+
}
4049+
}
4050+
return value;
4051+
}
4052+
}
40234053
#endif
40244054
public bool ContinueEnabled => !LocalAppContextSwitches.UseCompatibilityAsyncBehaviour;
40254055

src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Collections.Generic;
77
using System.Data;
88
using System.Data.SqlTypes;
9+
using System.Diagnostics;
10+
using System.Linq;
911
using System.Reflection;
1012
using System.Text;
1113
using System.Threading;
@@ -603,6 +605,60 @@ integrated into a comprehensive development
603605
}
604606
}
605607

608+
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
609+
public static async Task CanReadBinaryData()
610+
{
611+
const int Size = 20_000;
612+
613+
byte[] data = Enumerable.Range(0, Size)
614+
.Select(i => (byte)(i % 256))
615+
.ToArray();
616+
string tableName = DataTestUtility.GenerateObjectName();
617+
618+
using (var connection = new SqlConnection(DataTestUtility.TCPConnectionString))
619+
{
620+
await connection.OpenAsync();
621+
622+
try
623+
{
624+
using (var createCommand = connection.CreateCommand())
625+
{
626+
createCommand.CommandText = $@"
627+
DROP TABLE IF EXISTS [{tableName}]
628+
CREATE TABLE [{tableName}] (Id INT IDENTITY(1,1) PRIMARY KEY, Data VARBINARY(MAX));
629+
INSERT INTO [{tableName}] (Data) VALUES (@data);";
630+
createCommand.Parameters.Add(new SqlParameter("@data", SqlDbType.VarBinary, Size) { Value = data });
631+
await createCommand.ExecuteNonQueryAsync();
632+
}
633+
634+
using (var command = connection.CreateCommand())
635+
{
636+
637+
command.CommandText = $"SELECT Data FROM [{tableName}]";
638+
command.Parameters.Clear();
639+
var result = (byte[])await command.ExecuteScalarAsync();
640+
641+
Debug.Assert(data.SequenceEqual(result));
642+
}
643+
644+
}
645+
finally
646+
{
647+
try
648+
{
649+
using (var dropCommand = connection.CreateCommand())
650+
{
651+
dropCommand.CommandText = $"DROP TABLE IF EXISTS [{tableName}]";
652+
dropCommand.ExecuteNonQuery();
653+
}
654+
}
655+
catch
656+
{
657+
}
658+
}
659+
}
660+
}
661+
606662
// Synapse: Cannot find data type 'rowversion'.
607663
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureSynapse))]
608664
public static void CheckLegacyNullRowVersionIsEmptyArray()

0 commit comments

Comments
 (0)