Skip to content

Refine handling of moving between replay and continue states #3337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6500,8 +6500,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta
bytes = null;
int offset = 0;
byte[] temp = null;
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
if (isAvailable)
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
if (canContinue)
{
if (isContinuing || isStarting)
{
Expand Down Expand Up @@ -12983,9 +12983,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
char[] temp = null;
bool buffIsRented = false;
int startOffset = 0;
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();

if (isAvailable)
if (canContinue)
{
if (isContinuing || isStarting)
{
Expand All @@ -13004,7 +13004,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
length >> 1,
stateObj,
out length,
supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot
supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot
rentedBuff: ref buffIsRented,
startOffset,
isStarting || isContinuing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6696,8 +6696,8 @@ private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject sta
bytes = null;
int offset = 0;
byte[] temp = null;
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
if (isAvailable)
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
if (canContinue)
{
if (isContinuing || isStarting)
{
Expand Down Expand Up @@ -13176,9 +13176,9 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
char[] temp = null;
bool buffIsRented = false;
int startOffset = 0;
(bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();

if (isAvailable)
if (canContinue)
{
if (isContinuing || isStarting)
{
Expand All @@ -13197,7 +13197,7 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
length >> 1,
stateObj,
out length,
supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot
supportRentedBuff: !canContinue, // do not use the arraypool if we are going to keep the buffer in the snapshot
rentedBuff: ref buffIsRented,
startOffset,
isStarting || isContinuing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
{
buffer = null;

(bool isAvailable, bool isStarting, _) = stateObj.GetSnapshotStatuses();
(bool canContinue, bool isStarting, _) = stateObj.GetSnapshotStatuses();

List<byte[]> cachedBytes = null;
if (isAvailable)
if (canContinue)
{
cachedBytes = stateObj.TryTakeSnapshotStorage() as List<byte[]>;
if (isStarting)
Expand Down Expand Up @@ -78,10 +78,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
byte[] byteArr = new byte[cb];
// pass false for the writeDataSizeToSnapshot parameter because we want to only take data
// from the current packet and not try to do a continue-capable multi packet read
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, writeDataSizeToSnapshot: false, compatibilityMode: false);
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, canContinue, writeDataSizeToSnapshot: false, compatibilityMode: false);
if (result != TdsOperationStatus.Done)
{
if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length)
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length)
{
// succeeded in getting the data but failed to find the next plp length
returnAfterAdd = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
}
byte[] buf = null;
int offset = 0;
(bool isAvailable, bool isStarting, bool isContinuing) = GetSnapshotStatuses();
(bool canContinue, bool isStarting, bool isContinuing) = GetSnapshotStatuses();

if (isPlp)
{
Expand All @@ -1889,7 +1889,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
if (((_inBytesUsed + length) > _inBytesRead) || (_inBytesPacket < length))
{
int startOffset = 0;
if (isAvailable)
if (canContinue)
{
if (isContinuing || isStarting)
{
Expand All @@ -1907,7 +1907,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En
buf = new byte[length];
}

TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, isAvailable);
TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, canContinue);

if (result != TdsOperationStatus.Done)
{
Expand Down Expand Up @@ -2037,22 +2037,23 @@ internal int ReadPlpBytesChunk(byte[] buff, int offset, int len)

internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead)
{
bool canContinue = false;
bool isStarting = false;
bool isContinuing = false;
bool compatibilityMode = LocalAppContextSwitches.UseCompatibilityAsyncBehaviour;
if (!compatibilityMode)
{
(_, isStarting, isContinuing) = GetSnapshotStatuses();
(canContinue, isStarting, isContinuing) = GetSnapshotStatuses();
}
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, isStarting || isContinuing, compatibilityMode);
return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, canContinue, canContinue, compatibilityMode);
}
// Reads the requested number of bytes from a plp data stream, or the entire data if
// requested length is -1 or larger than the actual length of data. First call to this method
// should be preceeded by a call to ReadPlpLength or ReadDataLength.
// Returns the actual bytes read.
// NOTE: This method must be retriable WITHOUT replaying a snapshot
// Every time you call this method increment the offset and decrease len by the value of totalBytesRead
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool writeDataSizeToSnapshot, bool compatibilityMode)
internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool canContinue, bool writeDataSizeToSnapshot, bool compatibilityMode)
{
totalBytesRead = 0;

Expand All @@ -2077,9 +2078,16 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
// If total length is known up front, allocate the whole buffer in one shot instead of realloc'ing and copying over each time
if (buff == null && _longlen != TdsEnums.SQL_PLP_UNKNOWNLEN)
{
if (writeDataSizeToSnapshot)
if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
{
// legacy replay path perf optimization
// if there is a snapshot which contains a stored plp buffer take it
// and try to use it if it is the right length
buff = TryTakeSnapshotStorage() as byte[];
}
else if (writeDataSizeToSnapshot && canContinue && _snapshot != null)
{
// if there is a snapshot and it contains a stored plp buffer take it
// if there is a snapshot which it contains a stored plp buffer take it
// and try to use it if it is the right length
buff = TryTakeSnapshotStorage() as byte[];
if (buff != null)
Expand All @@ -2088,13 +2096,7 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
totalBytesRead = offset;
}
}
else if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive)
{
// legacy replay path perf optimization
// if there is a snapshot and it contains a stored plp buffer take it
// and try to use it if it is the right length
buff = TryTakeSnapshotStorage() as byte[];
}


if ((ulong)(buff?.Length ?? 0) != _longlen)
{
Expand Down Expand Up @@ -2146,44 +2148,50 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len
_longlenleft -= (ulong)bytesRead;
if (result != TdsOperationStatus.Done)
{
if (writeDataSizeToSnapshot)
if (compatibilityMode && _snapshot != null)
{
// legacy replay path perf optimization
// a partial read has happened so store the target buffer in the snapshot
// so it can be re-used when another packet arrives and we read again
SetSnapshotStorage(buff);
SetSnapshotDataSize(bytesRead);

}
else if (compatibilityMode && _snapshot != null)
else if (canContinue)
{
// legacy replay path perf optimization
// a partial read has happened so store the target buffer in the snapshot
// so it can be re-used when another packet arrives and we read again
SetSnapshotStorage(buff);
if (writeDataSizeToSnapshot)
{
SetSnapshotDataSize(bytesRead);
}
}
return result;
}
if (writeDataSizeToSnapshot && canContinue)
{
SetSnapshotDataSize(bytesRead);
}

if (_longlenleft == 0)
{
// Read the next chunk or cleanup state if hit the end
result = TryReadPlpLength(false, out _);
if (result != TdsOperationStatus.Done)
{
if (writeDataSizeToSnapshot)
{
if (result == TdsOperationStatus.NeedMoreData)
{
SetSnapshotStorage(buff);
SetSnapshotDataSize(bytesRead);
}
}
else if (compatibilityMode && _snapshot != null)
if (compatibilityMode && _snapshot != null)
{
// a partial read has happened so store the target buffer in the snapshot
// so it can be re-used when another packet arrives and we read again
SetSnapshotStorage(buff);
}
else if (canContinue && result == TdsOperationStatus.NeedMoreData)
{
SetSnapshotStorage(buff);
if (writeDataSizeToSnapshot)
{
SetSnapshotDataSize(bytesRead);
}
}
return result;
}
}
Expand Down Expand Up @@ -3455,17 +3463,17 @@ internal bool IsSnapshotContinuing()
_snapshotStatus == TdsParserStateObject.SnapshotStatus.ContinueRunning;
}

internal (bool IsAvailable, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
internal (bool CanContinue, bool IsStarting, bool IsContinuing) GetSnapshotStatuses()
{
bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled;
bool canContinue = _snapshot != null && _snapshot.ContinueEnabled && _snapshotStatus != SnapshotStatus.NotActive;
bool isStarting = false;
bool isContinuing = false;
if (isAvailable)
if (canContinue)
{
isStarting = _snapshotStatus == SnapshotStatus.ReplayStarting;
isContinuing = _snapshotStatus == SnapshotStatus.ContinueRunning;
}
return (isAvailable, isStarting, isContinuing);
return (canContinue, isStarting, isContinuing);
}

internal int GetSnapshotStorageLength<T>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Collections.Generic;
using System.Data;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -603,6 +605,60 @@ integrated into a comprehensive development
}
}

[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
public static async Task CanReadBinaryData()
{
const int Size = 20_000;

byte[] data = Enumerable.Range(0, Size)
.Select(i => (byte)(i % 256))
.ToArray();
string tableName = DataTestUtility.GenerateObjectName();

using (var connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
await connection.OpenAsync();

try
{
using (var createCommand = connection.CreateCommand())
{
createCommand.CommandText = $@"
DROP TABLE IF EXISTS [{tableName}]
CREATE TABLE [{tableName}] (Id INT IDENTITY(1,1) PRIMARY KEY, Data VARBINARY(MAX));
INSERT INTO [{tableName}] (Data) VALUES (@data);";
createCommand.Parameters.Add(new SqlParameter("@data", SqlDbType.VarBinary, Size) { Value = data });
await createCommand.ExecuteNonQueryAsync();
}

using (var command = connection.CreateCommand())
{

command.CommandText = $"SELECT Data FROM [{tableName}]";
command.Parameters.Clear();
var result = (byte[])await command.ExecuteScalarAsync();

Debug.Assert(data.SequenceEqual(result));
}

}
finally
{
try
{
using (var dropCommand = connection.CreateCommand())
{
dropCommand.CommandText = $"DROP TABLE IF EXISTS [{tableName}]";
dropCommand.ExecuteNonQuery();
}
}
catch
{
}
}
}
}

// Synapse: Cannot find data type 'rowversion'.
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureSynapse))]
public static void CheckLegacyNullRowVersionIsEmptyArray()
Expand Down
Loading