Skip to content

.Net MEVD: Unify collection deletion and creation #10958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ public async Task CreateCollectionInvokesValidMethodsAsync(bool indexExists, int
.Setup(l => l.Current)
.Returns(indexes);

var mockCursor = new Mock<IAsyncCursor<string>>();
mockCursor
.Setup(l => l.MoveNextAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(true);

mockCursor
.Setup(l => l.Current)
.Returns([]);

var mockMongoIndexManager = new Mock<IMongoIndexManager<BsonDocument>>();

mockMongoIndexManager
Expand All @@ -131,6 +140,10 @@ public async Task CreateCollectionInvokesValidMethodsAsync(bool indexExists, int
.Setup(l => l.Indexes)
.Returns(mockMongoIndexManager.Object);

this._mockMongoDatabase
.Setup(l => l.ListCollectionNamesAsync(It.IsAny<ListCollectionNamesOptions>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(mockCursor.Object);

var sut = new AzureCosmosDBMongoDBVectorStoreRecordCollection<AzureCosmosDBMongoDBHotelModel>(this._mockMongoDatabase.Object, CollectionName);

// Act
Expand All @@ -140,7 +153,11 @@ public async Task CreateCollectionInvokesValidMethodsAsync(bool indexExists, int
this._mockMongoDatabase.Verify(l => l.CreateCollectionAsync(
CollectionName,
It.IsAny<CreateCollectionOptions>(),
It.IsAny<CancellationToken>()), Times.Once());
It.IsAny<CancellationToken>()), Times.Once);

this._mockMongoDatabase.Verify(l => l.ListCollectionNamesAsync(
It.IsAny<ListCollectionNamesOptions>(),
It.IsAny<CancellationToken>()), Times.Once);

this._mockMongoDatabase.Verify(l => l.RunCommandAsync<BsonDocument>(
It.Is<BsonDocumentCommand<BsonDocument>>(command =>
Expand All @@ -151,9 +168,8 @@ public async Task CreateCollectionInvokesValidMethodsAsync(bool indexExists, int
It.IsAny<CancellationToken>()), Times.Exactly(actualIndexCreations));
}

[Theory]
[MemberData(nameof(CreateCollectionIfNotExistsData))]
public async Task CreateCollectionIfNotExistsInvokesValidMethodsAsync(List<string> collections, int actualCollectionCreations)
[Fact]
public async Task CreateCollectionIfNotExistsInvokesValidMethodsAsync()
{
// Arrange
const string CollectionName = "collection";
Expand All @@ -165,7 +181,7 @@ public async Task CreateCollectionIfNotExistsInvokesValidMethodsAsync(List<strin

mockCursor
.Setup(l => l.Current)
.Returns(collections);
.Returns([]);

this._mockMongoDatabase
.Setup(l => l.ListCollectionNamesAsync(It.IsAny<ListCollectionNamesOptions>(), It.IsAny<CancellationToken>()))
Expand Down Expand Up @@ -202,7 +218,11 @@ public async Task CreateCollectionIfNotExistsInvokesValidMethodsAsync(List<strin
this._mockMongoDatabase.Verify(l => l.CreateCollectionAsync(
CollectionName,
It.IsAny<CreateCollectionOptions>(),
It.IsAny<CancellationToken>()), Times.Exactly(actualCollectionCreations));
It.IsAny<CancellationToken>()), Times.Exactly(1));

this._mockMongoDatabase.Verify(l => l.ListCollectionNamesAsync(
It.IsAny<ListCollectionNamesOptions>(),
It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,19 @@ public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken can
/// <inheritdoc />
public virtual Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
{
return this.RunOperationAsync(
return this.RunOperationAsync<Response>(
"DeleteIndex",
() => this._searchIndexClient.DeleteIndexAsync(this._collectionName, cancellationToken));
async () =>
{
try
{
return await this._searchIndexClient.DeleteIndexAsync(this._collectionName, cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return null!;
}
});
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,29 @@ public virtual Task<bool> CollectionExistsAsync(CancellationToken cancellationTo
/// <inheritdoc />
public virtual async Task CreateCollectionAsync(CancellationToken cancellationToken = default)
{
await this.RunOperationAsync("CreateCollection",
() => this._mongoDatabase.CreateCollectionAsync(this.CollectionName, cancellationToken: cancellationToken)).ConfigureAwait(false);
// The IMongoDatabase.CreateCollectionAsync "Creates a new collection if not already available".
// To make sure that all the connectors are consistent, we throw when the collection exists.
if (await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
throw new VectorStoreOperationException("Collection already exists.")
{
VectorStoreType = DatabaseName,
CollectionName = this.CollectionName,
OperationName = "CreateCollection"
};
}

await this.RunOperationAsync("CreateIndexes",
() => this.CreateIndexesAsync(this.CollectionName, cancellationToken: cancellationToken)).ConfigureAwait(false);
await this.CreateCollectionIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken cancellationToken = default)
{
if (!await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
await this.CreateCollectionAsync(cancellationToken).ConfigureAwait(false);
}
await this.RunOperationAsync("CreateCollection",
() => this._mongoDatabase.CreateCollectionAsync(this.CollectionName, cancellationToken: cancellationToken)).ConfigureAwait(false);

await this.RunOperationAsync("CreateIndexes",
() => this.CreateIndexesAsync(this.CollectionName, cancellationToken: cancellationToken)).ConfigureAwait(false);
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,19 @@ public Task<bool> CollectionExistsAsync(CancellationToken cancellationToken = de
/// <inheritdoc />
public Task CreateCollectionAsync(CancellationToken cancellationToken = default)
{
if (!this._internalCollections.ContainsKey(this._collectionName))
if (!this._internalCollections.ContainsKey(this._collectionName)
&& this._internalCollections.TryAdd(this._collectionName, new ConcurrentDictionary<object, object>())
&& this._internalCollectionTypes.TryAdd(this._collectionName, typeof(TRecord)))
{
this._internalCollections.TryAdd(this._collectionName, new ConcurrentDictionary<object, object>());
this._internalCollectionTypes.TryAdd(this._collectionName, typeof(TRecord));
return Task.CompletedTask;
}

return Task.CompletedTask;
return Task.FromException(new VectorStoreOperationException("Collection already exists.")
{
VectorStoreType = "InMemory",
CollectionName = this.CollectionName,
OperationName = "CreateCollection"
});
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ public virtual Task<bool> CollectionExistsAsync(CancellationToken cancellationTo
/// <inheritdoc />
public virtual async Task CreateCollectionAsync(CancellationToken cancellationToken = default)
{
// The IMongoDatabase.CreateCollectionAsync "Creates a new collection if not already available".
// To make sure that all the connectors are consistent, we throw when the collection exists.
if (await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
throw new VectorStoreOperationException("Collection already exists.")
{
VectorStoreType = DatabaseName,
CollectionName = this.CollectionName,
OperationName = "CreateCollection"
};
}

await this.CreateCollectionIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken cancellationToken = default)
{
// The IMongoDatabase.CreateCollectionAsync "Creates a new collection if not already available".
// So for CreateCollectionIfNotExistsAsync, we don't perform an additional check.
await this.RunOperationAsync("CreateCollection",
() => this._mongoDatabase.CreateCollectionAsync(this.CollectionName, cancellationToken: cancellationToken)).ConfigureAwait(false);

Expand All @@ -115,15 +135,6 @@ await this.RunOperationWithRetryAsync(
cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc />
public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken cancellationToken = default)
{
if (!await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
await this.CreateCollectionAsync(cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc />
public virtual async Task DeleteAsync(string key, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,26 @@ public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken can

/// <inheritdoc />
public virtual Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
{
return this.RunOperationAsync(
"DeleteCollection",
() => this._qdrantClient.DeleteCollectionAsync(this._collectionName, null, cancellationToken));
}
=> this.RunOperationAsync("DeleteCollection",
async () =>
{
try
{
await this._qdrantClient.DeleteCollectionAsync(this._collectionName, null, cancellationToken).ConfigureAwait(false);
}
catch (QdrantException)
{
// There is no reliable way to check if the operation failed because the
// collection does not exist based on the exception itself.
// So we just check here if it exists, and if not, ignore the exception.
if (!await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
return;
}

throw;
}
});

/// <inheritdoc />
public virtual async Task<TRecord?> GetAsync(ulong key, GetRecordOptions? options = null, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,25 @@ public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken can
}

/// <inheritdoc />
public virtual Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
public virtual async Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
{
return this.RunOperationAsync("FT.DROPINDEX", () => this._database.FT().DropIndexAsync(this._collectionName));
try
{
await this.RunOperationAsync("FT.DROPINDEX",
() => this._database.FT().DropIndexAsync(this._collectionName)).ConfigureAwait(false);
}
catch (VectorStoreOperationException ex) when (ex.InnerException is RedisServerException)
{
// The RedisServerException does not expose any reliable way of checking if the index does not exist.
// It just sets the message to "Unknown index name".
// We catch the exception and ignore it, but only after checking that the index does not exist.
if (!await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
return;
}

throw;
}
}

/// <inheritdoc />
Expand Down Expand Up @@ -425,7 +441,7 @@ private async Task<T> RunOperationAsync<T>(string operationName, Func<Task<T>> o
{
return await operation.Invoke().ConfigureAwait(false);
}
catch (RedisConnectionException ex)
catch (RedisException ex)
{
throw new VectorStoreOperationException("Call to vector store failed.", ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,25 @@ public virtual async Task CreateCollectionIfNotExistsAsync(CancellationToken can
}

/// <inheritdoc />
public virtual Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
public virtual async Task DeleteCollectionAsync(CancellationToken cancellationToken = default)
{
return this.RunOperationAsync("FT.DROPINDEX", () => this._database.FT().DropIndexAsync(this._collectionName));
try
{
await this.RunOperationAsync("FT.DROPINDEX",
() => this._database.FT().DropIndexAsync(this._collectionName)).ConfigureAwait(false);
}
catch (VectorStoreOperationException ex) when (ex.InnerException is RedisServerException)
{
// The RedisServerException does not expose any reliable way of checking if the index does not exist.
// It just sets the message to "Unknown index name".
// We catch the exception and ignore it, but only after checking that the index does not exist.
if (!await this.CollectionExistsAsync(cancellationToken).ConfigureAwait(false))
{
return;
}

throw;
}
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public DbCommand BuildCreateVirtualTableCommand(

public DbCommand BuildDropTableCommand(string tableName)
{
string query = $"DROP TABLE [{tableName}];";
string query = $"DROP TABLE IF EXISTS [{tableName}];";

var command = this._connection.CreateCommand();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public WeaviateVectorStore(HttpClient httpClient, WeaviateVectorStoreOptions? op
}

/// <inheritdoc />
/// <remarks>The collection name must start with a capital letter and contain only ASCII letters and digits.</remarks>
public virtual IVectorStoreRecordCollection<TKey, TRecord> GetCollection<TKey, TRecord>(string name, VectorStoreRecordDefinition? vectorStoreRecordDefinition = null)
where TKey : notnull
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ public class WeaviateVectorStoreRecordCollection<TRecord> : IVectorStoreRecordCo
/// </param>
/// <param name="collectionName">The name of the collection that this <see cref="WeaviateVectorStoreRecordCollection{TRecord}"/> will access.</param>
/// <param name="options">Optional configuration options for this class.</param>
/// <remarks>The collection name must start with a capital letter and contain only ASCII letters and digits.</remarks>
public WeaviateVectorStoreRecordCollection(
HttpClient httpClient,
string collectionName,
WeaviateVectorStoreRecordCollectionOptions<TRecord>? options = default)
{
// Verify.
Verify.NotNull(httpClient);
Verify.NotNullOrWhiteSpace(collectionName);
VerifyCollectionName(collectionName);

var endpoint = (options?.Endpoint ?? httpClient.BaseAddress) ?? throw new ArgumentException($"Weaviate endpoint should be provided via HttpClient.BaseAddress property or {nameof(WeaviateVectorStoreRecordCollectionOptions<TRecord>)} options parameter.");

Expand Down Expand Up @@ -535,5 +536,24 @@ private static void VerifyVectorParam<TVector>(TVector vector)
}
}

private static void VerifyCollectionName(string collectionName)
{
Verify.NotNullOrWhiteSpace(collectionName);

// Based on https://weaviate.io/developers/weaviate/starter-guides/managing-collections#collection--property-names
char first = collectionName[0];
if (!(first is >= 'A' and <= 'Z'))
{
throw new ArgumentException("Collection name must start with an uppercase ASCII letter.", nameof(collectionName));
}

foreach (char character in collectionName)
{
if (!((character is >= 'a' and <= 'z') || (character is >= 'A' and <= 'Z') || (character is >= '0' and <= '9')))
{
throw new ArgumentException("Collection name must contain only ASCII letters and digits.", nameof(collectionName));
}
}
}
#endregion
}
Loading
Loading