diff --git a/packages/common/core-interfaces/api-report/core-interfaces.legacy.alpha.api.md b/packages/common/core-interfaces/api-report/core-interfaces.legacy.alpha.api.md index 4c0da5c39083..03a8ee95e5d3 100644 --- a/packages/common/core-interfaces/api-report/core-interfaces.legacy.alpha.api.md +++ b/packages/common/core-interfaces/api-report/core-interfaces.legacy.alpha.api.md @@ -266,6 +266,11 @@ export interface IFluidHandleContext extends IProvideFluidHandleContext { export interface IFluidHandleErased extends ErasedType { } +// @alpha @legacy +export interface IFluidHandleEvents { + payloadShared: () => void; +} + // @alpha @legacy export interface IFluidHandleInternal extends IFluidHandle, IProvideFluidHandle { readonly absolutePath: string; @@ -273,6 +278,12 @@ export interface IFluidHandleInternal extends IFluidHandle, bind(handle: IFluidHandleInternal): void; } +// @alpha @legacy +export interface IFluidHandlePayloadPending extends IFluidHandle { + readonly events: Listenable; + readonly payloadState: PayloadState; +} + // @public (undocumented) export const IFluidLoadable: keyof IProvideFluidLoadable; @@ -281,6 +292,17 @@ export interface IFluidLoadable extends IProvideFluidLoadable { readonly handle: IFluidHandle; } +// @alpha @legacy +export interface ILocalFluidHandle extends IFluidHandlePayloadPending { + readonly events: Listenable; + readonly payloadShareError: unknown; +} + +// @alpha @legacy +export interface ILocalFluidHandleEvents extends IFluidHandleEvents { + payloadShareFailed: (error: unknown) => void; +} + // @alpha @legacy export interface ILoggingError extends Error { getTelemetryProperties(): ITelemetryBaseProperties; @@ -387,6 +409,9 @@ export type LogLevel = (typeof LogLevel)[keyof typeof LogLevel]; // @public export type Off = () => void; +// @alpha @legacy +export type PayloadState = "pending" | "shared"; + // @public export type ReplaceIEventThisPlaceHolder = L extends any[] ? { [K in keyof L]: L[K] extends IEventThisPlaceHolder ? TThis : L[K]; diff --git a/packages/common/core-interfaces/src/handles.ts b/packages/common/core-interfaces/src/handles.ts index 998bd53e6190..df9453bbd58b 100644 --- a/packages/common/core-interfaces/src/handles.ts +++ b/packages/common/core-interfaces/src/handles.ts @@ -5,6 +5,7 @@ import type { ErasedType } from "./erasedType.js"; import type { IRequest, IResponse } from "./fluidRouter.js"; +import type { Listenable } from "./internal.js"; /** * @legacy @@ -117,6 +118,77 @@ export interface IFluidHandleInternalPayloadPending< readonly payloadPending: boolean; } +/** + * The state of the handle's payload. + * - "pending" - The payload is not shared to all collaborators + * - "shared" - The payload is available to both the local client and remote collaborators + * + * @remarks + * Clients will see a transition of "pending" to "shared" when the payload has been shared to all collaborators. + * @legacy + * @alpha + */ +export type PayloadState = "pending" | "shared"; + +/** + * Events which fire from an IFluidHandle. + * @legacy + * @alpha + */ +export interface IFluidHandleEvents { + /** + * Emitted when the payload becomes available to remote collaborators. + */ + payloadShared: () => void; +} + +/** + * Observable state on the handle regarding its payload sharing state. + * + * @privateRemarks + * Contents to be merged to IFluidHandle, and then this separate interface should be removed. + * @legacy + * @alpha + */ +export interface IFluidHandlePayloadPending extends IFluidHandle { + /** + * The current state of the handle's payload. + */ + readonly payloadState: PayloadState; + /** + * Event emitter, with events that emit as the payload state transitions. + */ + readonly events: Listenable; +} + +/** + * Additional events which fire as a local handle's payload state transitions. + * @legacy + * @alpha + */ +export interface ILocalFluidHandleEvents extends IFluidHandleEvents { + /** + * Emitted for locally created handles when the payload fails sharing to remote collaborators. + */ + payloadShareFailed: (error: unknown) => void; +} + +/** + * Additional observable state on a local handle regarding its payload sharing state. + * @legacy + * @alpha + */ +export interface ILocalFluidHandle extends IFluidHandlePayloadPending { + /** + * The error encountered by the handle while sharing the payload, if one has occurred. Undefined if no error has occurred. + */ + readonly payloadShareError: unknown; + /** + * Event emitter, with events that emit as the payload state transitions. + */ + readonly events: Listenable; +} + /** * Symbol which must only be used on an {@link (IFluidHandle:interface)}, and is used to identify such objects. * diff --git a/packages/common/core-interfaces/src/index.ts b/packages/common/core-interfaces/src/index.ts index 0d6e8f70f55c..586790d45274 100644 --- a/packages/common/core-interfaces/src/index.ts +++ b/packages/common/core-interfaces/src/index.ts @@ -28,11 +28,16 @@ export { IFluidLoadable, IFluidRunnable } from "./fluidLoadable.js"; export type { IRequest, IRequestHeader, IResponse } from "./fluidRouter.js"; export type { - IProvideFluidHandleContext, - IProvideFluidHandle, + IFluidHandleErased, + IFluidHandleEvents, IFluidHandleInternal, IFluidHandleInternalPayloadPending, - IFluidHandleErased, + IFluidHandlePayloadPending, + ILocalFluidHandle, + ILocalFluidHandleEvents, + IProvideFluidHandle, + IProvideFluidHandleContext, + PayloadState, } from "./handles.js"; export { IFluidHandleContext, IFluidHandle, fluidHandleSymbol } from "./handles.js"; diff --git a/packages/runtime/container-runtime/src/blobManager/blobManager.ts b/packages/runtime/container-runtime/src/blobManager/blobManager.ts index a741ebc616c5..183189a8db27 100644 --- a/packages/runtime/container-runtime/src/blobManager/blobManager.ts +++ b/packages/runtime/container-runtime/src/blobManager/blobManager.ts @@ -3,22 +3,22 @@ * Licensed under the MIT License. */ -import { - TypedEventEmitter, - bufferToString, - stringToBuffer, -} from "@fluid-internal/client-utils"; +import { bufferToString, createEmitter, stringToBuffer } from "@fluid-internal/client-utils"; import { AttachState } from "@fluidframework/container-definitions"; import { IContainerRuntime, IContainerRuntimeEvents, } from "@fluidframework/container-runtime-definitions/internal"; import type { - IEvent, + IEmitter, IEventProvider, IFluidHandleContext, IFluidHandleInternal, IFluidHandleInternalPayloadPending, + ILocalFluidHandle, + ILocalFluidHandleEvents, + Listenable, + PayloadState, } from "@fluidframework/core-interfaces/internal"; import { assert, Deferred } from "@fluidframework/core-utils/internal"; import { @@ -65,7 +65,9 @@ import { */ export class BlobHandle extends FluidHandleBase - implements IFluidHandleInternalPayloadPending + implements + ILocalFluidHandle, + IFluidHandleInternalPayloadPending { private attached: boolean = false; @@ -73,9 +75,30 @@ export class BlobHandle return this.routeContext.isAttached && this.attached; } + private _events: + | (Listenable & IEmitter) + | undefined; + public get events(): Listenable { + return (this._events ??= createEmitter()); + } + + private _state: PayloadState = "pending"; + public get payloadState(): PayloadState { + return this._state; + } + + /** + * The error property starts undefined, signalling that there has been no error yet. + * If an error occurs, the property will contain the error. + */ + private _payloadShareError: unknown; + public get payloadShareError(): unknown { + return this._payloadShareError; + } + public readonly absolutePath: string; - constructor( + public constructor( public readonly path: string, public readonly routeContext: IFluidHandleContext, public get: () => Promise, @@ -86,6 +109,16 @@ export class BlobHandle this.absolutePath = generateHandleContextPath(path, this.routeContext); } + public readonly notifyShared = (): void => { + this._state = "shared"; + this._events?.emit("payloadShared"); + }; + + public readonly notifyFailed = (error: unknown): void => { + this._payloadShareError = error; + this._events?.emit("payloadShareFailed", error); + }; + public attachGraph(): void { if (!this.attached) { this.attached = true; @@ -104,7 +137,7 @@ export type IBlobManagerRuntime = Pick< IContainerRuntime, "attachState" | "connected" | "baseLogger" | "clientDetails" | "disposed" > & - TypedEventEmitter; + IEventProvider; type ICreateBlobResponseWithTTL = ICreateBlobResponse & Partial>; @@ -133,13 +166,14 @@ export interface IPendingBlobs { }; } -export interface IBlobManagerEvents extends IEvent { - (event: "noPendingBlobs", listener: () => void); +export interface IBlobManagerEvents { + noPendingBlobs: () => void; } -interface IBlobManagerInternalEvents extends IEvent { - (event: "handleAttached", listener: (pending: PendingBlob) => void); - (event: "processedBlobAttach", listener: (localId: string, storageId: string) => void); +interface IBlobManagerInternalEvents { + uploadFailed: (localId: string, error: unknown) => void; + handleAttached: (pending: PendingBlob) => void; + processedBlobAttach: (localId: string, storageId: string) => void; } const stashedPendingBlobOverrides: Pick< @@ -157,11 +191,11 @@ export const blobManagerBasePath = "_blobs" as const; export class BlobManager { private readonly mc: MonitoringContext; - private readonly publicEvents = new TypedEventEmitter(); - public get events(): IEventProvider { + private readonly publicEvents = createEmitter(); + public get events(): Listenable { return this.publicEvents; } - private readonly internalEvents = new TypedEventEmitter(); + private readonly internalEvents = createEmitter(); /** * Map of local IDs to storage IDs. Contains identity entries (storageId → storageId) for storage IDs. All requested IDs should @@ -201,6 +235,8 @@ export class BlobManager { new Map(); public readonly stashedBlobsUploadP: Promise<(void | ICreateBlobResponse)[]>; + private readonly createBlobPayloadPending: boolean; + public constructor(props: { readonly routeContext: IFluidHandleContext; @@ -238,6 +274,7 @@ export class BlobManager { runtime, stashedBlobs, localBlobIdGenerator, + createBlobPayloadPending, } = props; this.routeContext = routeContext; this.storage = storage; @@ -245,6 +282,7 @@ export class BlobManager { this.isBlobDeleted = isBlobDeleted; this.runtime = runtime; this.localBlobIdGenerator = localBlobIdGenerator ?? uuid; + this.createBlobPayloadPending = createBlobPayloadPending; this.mc = createChildMonitoringContext({ logger: this.runtime.baseLogger, @@ -362,6 +400,12 @@ export class BlobManager { return this.redirectTable.get(blobId) !== undefined; } + /** + * Retrieve the blob with the given local blob id. + * @param blobId - The local blob id. Likely coming from a handle. + * @param payloadPending - Whether we suspect the payload may be pending and not available yet. + * @returns A promise which resolves to the blob contents + */ public async getBlob(blobId: string, payloadPending: boolean): Promise { // Verify that the blob is not deleted, i.e., it has not been garbage collected. If it is, this will throw // an error, failing the call. @@ -386,7 +430,9 @@ export class BlobManager { } else { const attachedStorageId = this.redirectTable.get(blobId); if (!payloadPending) { - assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */); + // Only blob handles explicitly marked with pending payload are permitted to exist without + // yet knowing their storage id. Otherwise they must already be associated with a storage id. + assert(attachedStorageId !== undefined, 0x11f /* "requesting unknown blobs" */); } // If we didn't find it in the redirectTable, assume the attach op is coming eventually and wait. // We do this even if the local client doesn't have the blob payloadPending flag enabled, in case a @@ -428,11 +474,11 @@ export class BlobManager { 0x384 /* requesting handle for unknown blob */, ); const pending = this.pendingBlobs.get(localId); - // Create a callback function for once the blob has been attached + // Create a callback function for once the handle has been attached const callback = pending ? () => { pending.attached = true; - // Notify listeners (e.g. serialization process) that blob has been attached + // Notify listeners (e.g. serialization process) that handle has been attached this.internalEvents.emit("handleAttached", pending); this.deletePendingBlobMaybe(localId); } @@ -448,7 +494,7 @@ export class BlobManager { private async createBlobDetached( blob: ArrayBufferLike, - ): Promise> { + ): Promise> { // Blobs created while the container is detached are stored in IDetachedBlobStorage. // The 'IDocumentStorageService.createBlob()' call below will respond with a localId. const response = await this.storage.createBlob(blob); @@ -459,7 +505,7 @@ export class BlobManager { public async createBlob( blob: ArrayBufferLike, signal?: AbortSignal, - ): Promise> { + ): Promise> { if (this.runtime.attachState === AttachState.Detached) { return this.createBlobDetached(blob); } @@ -473,6 +519,15 @@ export class BlobManager { 0x385 /* For clarity and paranoid defense against adding future attachment states */, ); + return this.createBlobPayloadPending + ? this.createBlobWithPayloadPending(blob) + : this.createBlobLegacy(blob, signal); + } + + private async createBlobLegacy( + blob: ArrayBufferLike, + signal?: AbortSignal, + ): Promise> { if (signal?.aborted) { throw this.createAbortError(); } @@ -503,6 +558,48 @@ export class BlobManager { }); } + private createBlobWithPayloadPending( + blob: ArrayBufferLike, + ): IFluidHandleInternalPayloadPending { + const localId = this.localBlobIdGenerator(); + + const blobHandle = new BlobHandle( + getGCNodePathFromBlobId(localId), + this.routeContext, + async () => blob, + true, // payloadPending + () => { + const pendingEntry: PendingBlob = { + blob, + handleP: new Deferred(), + uploadP: this.uploadBlob(localId, blob), + attached: true, + acked: false, + opsent: false, + }; + this.pendingBlobs.set(localId, pendingEntry); + }, + ); + + const onProcessedBlobAttach = (_localId: string, _storageId: string): void => { + if (_localId === localId) { + this.internalEvents.off("processedBlobAttach", onProcessedBlobAttach); + blobHandle.notifyShared(); + } + }; + this.internalEvents.on("processedBlobAttach", onProcessedBlobAttach); + + const onUploadFailed = (_localId: string, error: unknown): void => { + if (_localId === localId) { + this.internalEvents.off("uploadFailed", onUploadFailed); + blobHandle.notifyFailed(error); + } + }; + this.internalEvents.on("uploadFailed", onUploadFailed); + + return blobHandle; + } + private async uploadBlob( localId: string, blob: ArrayBufferLike, @@ -546,6 +643,7 @@ export class BlobManager { // the promise but not throw any error outside. this.pendingBlobs.get(localId)?.handleP.reject(error); this.deletePendingBlob(localId); + this.internalEvents.emit("uploadFailed", localId, error); }, ); } @@ -619,7 +717,9 @@ export class BlobManager { // an existing blob, we don't have to wait for the op to be ack'd since this step has already // happened before and so, the server won't delete it. this.setRedirection(localId, response.id); - entry.handleP.resolve(this.getBlobHandle(localId)); + const blobHandle = this.getBlobHandle(localId); + blobHandle.notifyShared(); + entry.handleP.resolve(blobHandle); this.deletePendingBlobMaybe(localId); } else { // If there is already an op for this storage ID, append the local ID to the list. Once any op for @@ -682,8 +782,8 @@ export class BlobManager { // set identity (id -> id) entry this.setRedirection(blobId, blobId); + assert(localId !== undefined, 0x50e /* local ID not present in blob attach message */); if (local) { - assert(localId !== undefined, 0x50e /* local ID not present in blob attach message */); const waitingBlobs = this.opsInFlight.get(blobId); if (waitingBlobs !== undefined) { // For each op corresponding to this storage ID that we are waiting for, resolve the pending blob. @@ -697,7 +797,9 @@ export class BlobManager { ); this.setRedirection(pendingLocalId, blobId); entry.acked = true; - entry.handleP.resolve(this.getBlobHandle(pendingLocalId)); + const blobHandle = this.getBlobHandle(pendingLocalId); + blobHandle.notifyShared(); + entry.handleP.resolve(blobHandle); this.deletePendingBlobMaybe(pendingLocalId); } this.opsInFlight.delete(blobId); @@ -705,7 +807,9 @@ export class BlobManager { const localEntry = this.pendingBlobs.get(localId); if (localEntry) { localEntry.acked = true; - localEntry.handleP.resolve(this.getBlobHandle(localId)); + const blobHandle = this.getBlobHandle(localId); + blobHandle.notifyShared(); + localEntry.handleP.resolve(blobHandle); this.deletePendingBlobMaybe(localId); } } @@ -875,7 +979,7 @@ export class BlobManager { const localBlobs = new Set(); // This while is used to stash blobs created while attaching and getting blobs while (localBlobs.size < this.pendingBlobs.size) { - const attachBlobsP: Promise[] = []; + const attachHandlesP: Promise[] = []; for (const [localId, entry] of this.pendingBlobs) { if (!localBlobs.has(entry)) { localBlobs.add(entry); @@ -890,8 +994,8 @@ export class BlobManager { // original createBlob call) and let them attach the blob. This is a lie we told since the upload // hasn't finished yet, but it's fine since we will retry on rehydration. entry.handleP.resolve(this.getBlobHandle(localId)); - // Array of promises that will resolve when blobs get attached. - attachBlobsP.push( + // Array of promises that will resolve when handles get attached. + attachHandlesP.push( new Promise((resolve, reject) => { stopBlobAttachingSignal?.addEventListener( "abort", @@ -901,16 +1005,16 @@ export class BlobManager { }, { once: true }, ); - const onBlobAttached = (attachedEntry: PendingBlob): void => { + const onHandleAttached = (attachedEntry: PendingBlob): void => { if (attachedEntry === entry) { - this.internalEvents.off("handleAttached", onBlobAttached); + this.internalEvents.off("handleAttached", onHandleAttached); resolve(); } }; if (entry.attached) { resolve(); } else { - this.internalEvents.on("handleAttached", onBlobAttached); + this.internalEvents.on("handleAttached", onHandleAttached); } }), ); @@ -918,7 +1022,7 @@ export class BlobManager { } // Wait for all blobs to be attached. This is important, otherwise serialized container // could send the blobAttach op without any op that references the blob, making it useless. - await Promise.allSettled(attachBlobsP); + await Promise.allSettled(attachHandlesP); } for (const [localId, entry] of this.pendingBlobs) { diff --git a/packages/runtime/container-runtime/src/test/blobHandles.spec.ts b/packages/runtime/container-runtime/src/test/blobHandles.spec.ts index edec196b99a5..3c50025d3a65 100644 --- a/packages/runtime/container-runtime/src/test/blobHandles.spec.ts +++ b/packages/runtime/container-runtime/src/test/blobHandles.spec.ts @@ -51,6 +51,7 @@ function createBlobManager( localBlobIdGenerator: undefined, isBlobDeleted: () => false, blobRequested: () => {}, + createBlobPayloadPending: false, // overrides ...overrides, }), @@ -66,7 +67,7 @@ const blobAttachMessage = { timestamp: Date.now(), }; -describe("BlobManager ", () => { +describe("BlobHandles", () => { it("Create blob", async () => { // Deferred promise that will be resolve once we send a blob attach. It is used mainly // to simulate correct order or blob operations: create -> onUploadResolve -> process. diff --git a/packages/runtime/container-runtime/src/test/blobManager.spec.ts b/packages/runtime/container-runtime/src/test/blobManager.spec.ts index 22a74d7a2422..58ee19da4e13 100644 --- a/packages/runtime/container-runtime/src/test/blobManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/blobManager.spec.ts @@ -13,12 +13,7 @@ import { } from "@fluid-internal/client-utils"; import { AttachState } from "@fluidframework/container-definitions"; import { IContainerRuntimeEvents } from "@fluidframework/container-runtime-definitions/internal"; -import { - ConfigTypes, - IConfigProviderBase, - IErrorBase, - IFluidHandle, -} from "@fluidframework/core-interfaces"; +import { ConfigTypes, IConfigProviderBase, IErrorBase } from "@fluidframework/core-interfaces"; import { type IFluidHandleContext, type IFluidHandleInternal, @@ -27,7 +22,11 @@ import { Deferred } from "@fluidframework/core-utils/internal"; import { IClientDetails, SummaryType } from "@fluidframework/driver-definitions"; import { IDocumentStorageService } from "@fluidframework/driver-definitions/internal"; import type { ISequencedMessageEnvelope } from "@fluidframework/runtime-definitions/internal"; -import { isFluidHandleInternalPayloadPending } from "@fluidframework/runtime-utils/internal"; +import { + isFluidHandleInternalPayloadPending, + isFluidHandlePayloadPending, + isLocalFluidHandle, +} from "@fluidframework/runtime-utils/internal"; import { LoggingError, MockLogger, @@ -87,6 +86,7 @@ export class MockRuntime public readonly clientDetails: IClientDetails = { capabilities: { interactive: true } }; constructor( public mc: MonitoringContext, + createBlobPayloadPending: boolean, blobManagerLoadInfo: IBlobManagerLoadInfo = {}, attached = false, stashed: unknown[] = [[], {}], @@ -105,7 +105,7 @@ export class MockRuntime isBlobDeleted: (blobPath: string) => this.isBlobDeleted(blobPath), runtime: this, stashedBlobs: stashed[1] as IPendingBlobs | undefined, - createBlobPayloadPending: false, + createBlobPayloadPending, }); } @@ -333,817 +333,940 @@ export const validateSummary = ( return { ids, redirectTable }; }; -describe("BlobManager", () => { - const handlePs: Promise>[] = []; - const mockLogger = new MockLogger(); - let runtime: MockRuntime; - let createBlob: (blob: ArrayBufferLike, signal?: AbortSignal) => Promise; - let waitForBlob: (blob: ArrayBufferLike) => Promise; - let mc: MonitoringContext; - let injectedSettings: Record = {}; - - beforeEach(() => { - const configProvider = (settings: Record): IConfigProviderBase => ({ - getRawConfig: (name: string): ConfigTypes => settings[name], - }); - mc = mixinMonitoringContext( - createChildLogger({ logger: mockLogger }), - configProvider(injectedSettings), - ); - runtime = new MockRuntime(mc); - handlePs.length = 0; - - // ensures this blob will be processed next time runtime.processBlobs() is called - waitForBlob = async (blob) => { - if (!runtime.unprocessedBlobs.has(blob)) { - await new Promise((resolve) => - runtime.on("blob", () => { - if (!runtime.unprocessedBlobs.has(blob)) { - resolve(); - } - }), - ); - } - }; - - // create blob and await the handle after the test - createBlob = async (blob: ArrayBufferLike, signal?: AbortSignal) => { - const handleP = runtime.createBlob(blob, signal); - handlePs.push(handleP); - await waitForBlob(blob); - }; - - const onNoPendingBlobs = () => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Accessing private property - assert((runtime.blobManager as any).pendingBlobs.size === 0); - }; - - runtime.blobManager.events.on("noPendingBlobs", () => onNoPendingBlobs()); - }); - - afterEach(async () => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Accessing private property - assert((runtime.blobManager as any).pendingBlobs.size === 0); - injectedSettings = {}; - mockLogger.clear(); - }); - - it("empty snapshot", () => { - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); - }); - - it("non empty snapshot", async () => { - await runtime.attach(); - await runtime.connect(); - - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); - - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); - }); - - it("hasPendingBlobs", async () => { - await runtime.attach(); - await runtime.connect(); - - assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob2", "utf8")); - assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); - await runtime.processAll(); - assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 2); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); +for (const createBlobPayloadPending of [false, true]) { + describe(`BlobManager (pending payloads): ${createBlobPayloadPending}`, () => { + const mockLogger = new MockLogger(); + let runtime: MockRuntime; + let createBlob: (blob: ArrayBufferLike, signal?: AbortSignal) => Promise; + let waitForBlob: (blob: ArrayBufferLike) => Promise; + let mc: MonitoringContext; + let injectedSettings: Record = {}; - it("NoPendingBlobs count", async () => { - await runtime.attach(); - await runtime.connect(); - let count = 0; - runtime.blobManager.events.on("noPendingBlobs", () => count++); - - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); - assert.strictEqual(count, 1); - await createBlob(IsoBuffer.from("blob2", "utf8")); - await createBlob(IsoBuffer.from("blob3", "utf8")); - await runtime.processAll(); - assert.strictEqual(count, 2); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 3); - assert.strictEqual(summaryData.redirectTable?.length, 3); - }); - - it("detached snapshot", async () => { - assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); - assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); + beforeEach(() => { + const configProvider = (settings: Record): IConfigProviderBase => ({ + getRawConfig: (name: string): ConfigTypes => settings[name], + }); + mc = mixinMonitoringContext( + createChildLogger({ logger: mockLogger }), + configProvider(injectedSettings), + ); + runtime = new MockRuntime(mc, createBlobPayloadPending); + + // ensures this blob will be processed next time runtime.processBlobs() is called + waitForBlob = async (blob) => { + if (!runtime.unprocessedBlobs.has(blob)) { + await new Promise((resolve) => + runtime.on("blob", () => { + if (runtime.unprocessedBlobs.has(blob)) { + resolve(); + } + }), + ); + } + }; - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable, undefined); - }); + // create blob and await the handle after the test + createBlob = async (blob: ArrayBufferLike, signal?: AbortSignal) => { + runtime + .createBlob(blob, signal) + .then((handle) => { + if (createBlobPayloadPending) { + handle.attachGraph(); + } + return handle; + }) + // Suppress errors here, we expect them to be detected elsewhere + .catch(() => {}); + await waitForBlob(blob); + }; - it("detached->attached snapshot", async () => { - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); - assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); - await runtime.attach(); - assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); - }); + const onNoPendingBlobs = () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Accessing private property + assert((runtime.blobManager as any).pendingBlobs.size === 0); + }; - it("uploads while disconnected", async () => { - await runtime.attach(); - const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.connect(); - await runtime.processAll(); - await assert.doesNotReject(handleP); + runtime.blobManager.events.on("noPendingBlobs", () => onNoPendingBlobs()); + }); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); - }); + afterEach(async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Accessing private property + assert.strictEqual((runtime.blobManager as any).pendingBlobs.size, 0); + injectedSettings = {}; + mockLogger.clear(); + }); - it("reupload blob if expired", async () => { - await runtime.attach(); - await runtime.connect(); - runtime.attachedStorage.minTTL = 0.001; // force expired TTL being less than connection time (50ms) - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(true); - runtime.disconnect(); - await new Promise((resolve) => setTimeout(resolve, 50)); - await runtime.connect(); - await runtime.processAll(); - }); + it("empty snapshot", () => { + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - it("completes after disconnection while upload pending", async () => { - await runtime.attach(); - await runtime.connect(); + it("non empty snapshot", async () => { + await runtime.attach(); + await runtime.connect(); - const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); - runtime.disconnect(); - await runtime.connect(10); // adding some delay to reconnection - await runtime.processAll(); - await assert.doesNotReject(handleP); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); - }); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - it("upload fails gracefully", async () => { - await runtime.attach(); - await runtime.connect(); - - const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(false); - runtime.processOps(); - try { - await handleP; - assert.fail("should fail"); - } catch (error: unknown) { - assert.strictEqual((error as Error).message, "fake driver error"); - } - await assert.rejects(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); - }); + it("hasPendingBlobs", async () => { + await runtime.attach(); + await runtime.connect(); - it.skip("upload fails and retries for retriable errors", async () => { - // Needs to use some sort of fake timer or write test in a different way as it is waiting - // for actual time which is causing timeouts. - await runtime.attach(); - await runtime.connect(); - const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(false, true, 0); - // wait till next retry - await new Promise((resolve) => setTimeout(resolve, 1)); - // try again successfully - await runtime.processBlobs(true); - runtime.processOps(); - await runtime.processHandles(); - assert(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); - }); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob2", "utf8")); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); + await runtime.processAll(); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 2); + assert.strictEqual(summaryData.redirectTable?.length, 2); + }); - it("completes after disconnection while op in flight", async () => { - await runtime.attach(); - await runtime.connect(); + it("NoPendingBlobs count", async () => { + await runtime.attach(); + await runtime.connect(); + let count = 0; + runtime.blobManager.events.on("noPendingBlobs", () => count++); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(true); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); + assert.strictEqual(count, 1); + await createBlob(IsoBuffer.from("blob2", "utf8")); + await createBlob(IsoBuffer.from("blob3", "utf8")); + await runtime.processAll(); + assert.strictEqual(count, 2); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 3); + assert.strictEqual(summaryData.redirectTable?.length, 3); + }); - runtime.disconnect(); - await runtime.connect(); - await runtime.processAll(); + it("detached snapshot", async () => { + assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - it("multiple disconnect/connects", async () => { - await runtime.attach(); - await runtime.connect(); - - const blob = IsoBuffer.from("blob", "utf8"); - const handleP = runtime.createBlob(blob); - runtime.disconnect(); - await runtime.connect(10); - - const blob2 = IsoBuffer.from("blob2", "utf8"); - const handleP2 = runtime.createBlob(blob2); - runtime.disconnect(); - - await runtime.connect(10); - await runtime.processAll(); - await assert.doesNotReject(handleP); - await assert.doesNotReject(handleP2); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 2); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); + it("detached->attached snapshot", async () => { + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, true); + await runtime.attach(); + assert.strictEqual(runtime.blobManager.hasPendingBlobs, false); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - it("handles deduped IDs", async () => { - await runtime.attach(); - await runtime.connect(); + it("uploads while disconnected", async () => { + await runtime.attach(); + const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.connect(); + await runtime.processAll(); + await assert.doesNotReject(handleP); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - runtime.disconnect(); - await runtime.connect(); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(true); + it("reupload blob if expired", async () => { + await runtime.attach(); + await runtime.connect(); + runtime.attachedStorage.minTTL = 0.001; // force expired TTL being less than connection time (50ms) + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processBlobs(true); + runtime.disconnect(); + await new Promise((resolve) => setTimeout(resolve, 50)); + await runtime.connect(); + await runtime.processAll(); + }); - runtime.disconnect(); - await runtime.connect(); + it("completes after disconnection while upload pending", async () => { + await runtime.attach(); + await runtime.connect(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + runtime.disconnect(); + await runtime.connect(10); // adding some delay to reconnection + await runtime.processAll(); + await assert.doesNotReject(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 6); - }); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - it("handles deduped IDs in detached", async () => { - runtime.detachedStorage = new DedupeStorage(); + it("upload fails gracefully", async () => { + await runtime.attach(); + await runtime.connect(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + if (createBlobPayloadPending) { + const handle = await runtime.createBlob(IsoBuffer.from("blob", "utf8")); + assert.strict(isFluidHandlePayloadPending(handle)); + assert.strict(isLocalFluidHandle(handle)); + assert.strictEqual( + handle.payloadState, + "pending", + "Handle should be in pending state", + ); + assert.strictEqual( + handle.payloadShareError, + undefined, + "handle should not have an error yet", + ); + let failed = false; + const onPayloadShareFailed = (error: unknown): void => { + failed = true; + assert.strictEqual( + (error as Error).message, + "fake driver error", + "Did not receive the expected error", + ); + handle.events.off("payloadShareFailed", onPayloadShareFailed); + }; + handle.events.on("payloadShareFailed", onPayloadShareFailed); + await runtime.processHandles(); + await runtime.processBlobs(false); + runtime.processOps(); + assert.strict(failed, "should fail"); + assert.strictEqual( + handle.payloadState, + "pending", + "Handle should still be in pending state", + ); + assert.strictEqual( + (handle.payloadShareError as unknown as Error).message, + "fake driver error", + "Handle did not have the expected error", + ); + } else { + // If the blobs are created without pending payloads, we don't get to see the handle at + // all so we can't inspect its state. + const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processBlobs(false); + runtime.processOps(); + try { + await handleP; + assert.fail("should fail"); + } catch (error: unknown) { + assert.strictEqual((error as Error).message, "fake driver error"); + } + await assert.rejects(handleP); + } + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable, undefined); - }); + it("updates handle state after success", async () => { + await runtime.attach(); + await runtime.connect(); - it("handles deduped IDs in detached->attached", async () => { - runtime.detachedStorage = new DedupeStorage(); + if (createBlobPayloadPending) { + const handle = await runtime.createBlob(IsoBuffer.from("blob", "utf8")); + assert.strict(isFluidHandlePayloadPending(handle)); + assert.strictEqual( + handle.payloadState, + "pending", + "Handle should be in pending state", + ); + let shared = false; + const onPayloadShared = (): void => { + shared = true; + handle.events.off("payloadShared", onPayloadShared); + }; + handle.events.on("payloadShared", onPayloadShared); + await runtime.processHandles(); + await runtime.processBlobs(true); + runtime.processOps(); + assert.strict(shared, "should become shared"); + assert.strictEqual(handle.payloadState, "shared", "Handle should be in shared state"); + } else { + // Without placeholder blobs, we don't get to see the handle before it reaches "shared" state + // but we can still verify it's in the expected state when we get it. + const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); + const handle = await handleP; + assert.strict(isFluidHandlePayloadPending(handle)); + assert.strictEqual(handle.payloadState, "shared", "Handle should be in shared state"); + } + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + it.skip("upload fails and retries for retriable errors", async () => { + // Needs to use some sort of fake timer or write test in a different way as it is waiting + // for actual time which is causing timeouts. + await runtime.attach(); + await runtime.connect(); + const handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processBlobs(false, true, 0); + // wait till next retry + await new Promise((resolve) => setTimeout(resolve, 1)); + // try again successfully + await runtime.processBlobs(true); + runtime.processOps(); + await runtime.processHandles(); + assert(handleP); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); - await runtime.attach(); - await runtime.connect(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); + it("completes after disconnection while op in flight", async () => { + await runtime.attach(); + await runtime.connect(); - runtime.disconnect(); - await runtime.connect(); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processBlobs(true); - await createBlob(IsoBuffer.from("blob", "utf8")); - await createBlob(IsoBuffer.from("blob", "utf8")); + runtime.disconnect(); + await runtime.connect(); + await runtime.processAll(); - await runtime.processAll(); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 2); + }); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 4); - }); + it("multiple disconnect/connects", async () => { + await runtime.attach(); + await runtime.connect(); - it("can load from summary", async () => { - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + const blob = IsoBuffer.from("blob", "utf8"); + const handleP = runtime.createBlob(blob); + runtime.disconnect(); + await runtime.connect(10); - await runtime.attach(); - const handle = runtime.createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.connect(); + const blob2 = IsoBuffer.from("blob2", "utf8"); + const handleP2 = runtime.createBlob(blob2); + runtime.disconnect(); - await runtime.processAll(); - await assert.doesNotReject(handle); + await runtime.connect(10); + await runtime.processAll(); + await assert.doesNotReject(handleP); + await assert.doesNotReject(handleP2); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 2); + assert.strictEqual(summaryData.redirectTable?.length, 2); + }); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + it("handles deduped IDs", async () => { + await runtime.attach(); + await runtime.connect(); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 3); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + runtime.disconnect(); + await runtime.connect(); - const runtime2 = new MockRuntime(mc, summaryData, true); - const summaryData2 = validateSummary(runtime2); - assert.strictEqual(summaryData2.ids.length, 1); - assert.strictEqual(summaryData2.redirectTable?.length, 3); - }); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processBlobs(true); - it("handles duplicate remote upload", async () => { - await runtime.attach(); - await runtime.connect(); + runtime.disconnect(); + await runtime.connect(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 6); + }); - it("handles duplicate remote upload between upload and op", async () => { - await runtime.attach(); - await runtime.connect(); + it("handles deduped IDs in detached", async () => { + runtime.detachedStorage = new DedupeStorage(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.processBlobs(true); - await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - it("handles duplicate remote upload with local ID", async () => { - await runtime.attach(); + it("handles deduped IDs in detached->attached", async () => { + runtime.detachedStorage = new DedupeStorage(); - await createBlob(IsoBuffer.from("blob", "utf8")); - await runtime.connect(); - await runtime.processBlobs(true); - await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); - await runtime.processAll(); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 2); - }); + await runtime.attach(); + await runtime.connect(); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); - it("includes blob IDs in summary while attaching", async () => { - await createBlob(IsoBuffer.from("blob1", "utf8")); - await createBlob(IsoBuffer.from("blob2", "utf8")); - await createBlob(IsoBuffer.from("blob3", "utf8")); - await runtime.processAll(); - - // While attaching with blobs, Container takes a summary while still in "Detached" - // state. BlobManager should know to include the list of attached blob - // IDs since this summary will be used to create the document - const summaryData = await runtime.attach(); - assert.strictEqual(summaryData?.ids.length, 3); - assert.strictEqual(summaryData?.redirectTable?.length, 3); - }); + runtime.disconnect(); + await runtime.connect(); - it("all blobs attached", async () => { - await runtime.attach(); - await runtime.connect(); - assert.strictEqual(runtime.blobManager.allBlobsAttached, true); - await createBlob(IsoBuffer.from("blob1", "utf8")); - assert.strictEqual(runtime.blobManager.allBlobsAttached, false); - await runtime.processBlobs(true); - assert.strictEqual(runtime.blobManager.allBlobsAttached, false); - await runtime.processAll(); - assert.strictEqual(runtime.blobManager.allBlobsAttached, true); - await createBlob(IsoBuffer.from("blob1", "utf8")); - await createBlob(IsoBuffer.from("blob2", "utf8")); - await createBlob(IsoBuffer.from("blob3", "utf8")); - assert.strictEqual(runtime.blobManager.allBlobsAttached, false); - await runtime.processAll(); - assert.strictEqual(runtime.blobManager.allBlobsAttached, true); - }); + await createBlob(IsoBuffer.from("blob", "utf8")); + await createBlob(IsoBuffer.from("blob", "utf8")); - it("runtime disposed during readBlob - log no error", async () => { - const someId = "someId"; - // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call -- Accessing private property - (runtime.blobManager as any).setRedirection(someId, undefined); // To appease an assert + await runtime.processAll(); - // Mock storage.readBlob to dispose the runtime and throw an error - Sinon.stub(runtime.storage, "readBlob").callsFake(async (_id: string) => { - runtime.disposed = true; - throw new Error("BOOM!"); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 4); }); - await assert.rejects( - async () => runtime.blobManager.getBlob(someId, false), - (e: Error) => e.message === "BOOM!", - "Expected getBlob to throw with test error message", - ); - assert(runtime.disposed, "Runtime should be disposed"); - mockLogger.assertMatchNone( - [{ category: "error" }], - "Should not have logged any errors", - undefined, - false /* clearEventsAfterCheck */, - ); - mockLogger.assertMatch( - [{ category: "generic", eventName: "BlobManager:AttachmentReadBlob_cancel" }], - "Expected the _cancel event to be logged with 'generic' category", - ); - }); - - it("waits for blobs from handles with pending payloads without error", async () => { - await runtime.attach(); + it("can load from summary", async () => { + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - // Part of remoteUpload, but stop short of processing the message - const response = await runtime.storage.createBlob(IsoBuffer.from("blob", "utf8")); - const op = { metadata: { localId: uuid(), blobId: response.id } }; + await runtime.attach(); + const handle = runtime.createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.connect(); - await assert.rejects( - runtime.blobManager.getBlob(op.metadata.localId, false), - "Rejects when attempting to get non-existent, shared-payload blobs", - ); + await runtime.processAll(); + await assert.doesNotReject(handle); - // Try to get the blob that we haven't processed the attach op for yet. - // This simulates having found this ID in a handle with a pending payload that the remote client would have sent - const blobP = runtime.blobManager.getBlob(op.metadata.localId, true); + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - // Process the op as if it were arriving from the remote client, which should cause the blobP promise to resolve - runtime.blobManager.processBlobAttachMessage(op as ISequencedMessageEnvelope, false); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 3); - // Await the promise to confirm it settles and does not reject - await blobP; - }); + const runtime2 = new MockRuntime(mc, createBlobPayloadPending, summaryData, true); + const summaryData2 = validateSummary(runtime2); + assert.strictEqual(summaryData2.ids.length, 1); + assert.strictEqual(summaryData2.redirectTable?.length, 3); + }); - describe("Abort Signal", () => { - it("abort before upload", async () => { + it("handles duplicate remote upload", async () => { await runtime.attach(); await runtime.connect(); - const ac = new AbortController(); - ac.abort("abort test"); - try { - const blob = IsoBuffer.from("blob", "utf8"); - await runtime.createBlob(blob, ac.signal); - assert.fail("Should not succeed"); - - // TODO: better typing - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.status, undefined); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.uploadTime, undefined); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.acked, undefined); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.message, "uploadBlob aborted"); - } + + await createBlob(IsoBuffer.from("blob", "utf8")); + await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); + const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 2); }); - it("abort while upload", async () => { + it("handles duplicate remote upload between upload and op", async () => { await runtime.attach(); await runtime.connect(); - const ac = new AbortController(); - const blob = IsoBuffer.from("blob", "utf8"); - const handleP = runtime.createBlob(blob, ac.signal); - ac.abort("abort test"); - assert.strictEqual(runtime.unprocessedBlobs.size, 1); + + await createBlob(IsoBuffer.from("blob", "utf8")); await runtime.processBlobs(true); - try { - await handleP; - assert.fail("Should not succeed"); - // TODO: better typing - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.uploadTime, undefined); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.acked, false); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.message, "uploadBlob aborted"); - } - assert(handleP); - await assert.rejects(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); - }); + await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - it("abort while failed upload", async () => { - await runtime.attach(); - await runtime.connect(); - const ac = new AbortController(); - const blob = IsoBuffer.from("blob", "utf8"); - const handleP = runtime.createBlob(blob, ac.signal); - const handleP2 = runtime.createBlob(IsoBuffer.from("blob2", "utf8")); - ac.abort("abort test"); - assert.strictEqual(runtime.unprocessedBlobs.size, 2); - await runtime.processBlobs(false); - try { - await handleP; - assert.fail("Should not succeed"); - } catch (error: unknown) { - assert.strictEqual((error as Error).message, "uploadBlob aborted"); - } - try { - await handleP2; - assert.fail("Should not succeed"); - } catch (error: unknown) { - assert.strictEqual((error as Error).message, "fake driver error"); - } - await assert.rejects(handleP); - await assert.rejects(handleP2); const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 2); }); - it("abort while disconnected", async () => { + it("handles duplicate remote upload with local ID", async () => { await runtime.attach(); + + await createBlob(IsoBuffer.from("blob", "utf8")); await runtime.connect(); - const ac = new AbortController(); - const blob = IsoBuffer.from("blob", "utf8"); - const handleP = runtime.createBlob(blob, ac.signal); - runtime.disconnect(); - ac.abort(); await runtime.processBlobs(true); - try { - await handleP; - assert.fail("Should not succeed"); - } catch (error: unknown) { - assert.strictEqual((error as Error).message, "uploadBlob aborted"); - } - await assert.rejects(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); - }); + await runtime.remoteUpload(IsoBuffer.from("blob", "utf8")); + await runtime.processAll(); - it("abort after blob suceeds", async () => { - await runtime.attach(); - await runtime.connect(); - const ac = new AbortController(); - let handleP: Promise> | undefined; - try { - const blob = IsoBuffer.from("blob", "utf8"); - handleP = runtime.createBlob(blob, ac.signal); - await runtime.processAll(); - ac.abort(); - } catch { - assert.fail("abort after processing should not throw"); - } - assert(handleP); - await assert.doesNotReject(handleP); const summaryData = validateSummary(runtime); assert.strictEqual(summaryData.ids.length, 1); - assert.strictEqual(summaryData.redirectTable?.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 2); }); - it("abort while waiting for op", async () => { - await runtime.attach(); - await runtime.connect(); - const ac = new AbortController(); - const blob = IsoBuffer.from("blob", "utf8"); - const handleP = runtime.createBlob(blob, ac.signal); - const p1 = runtime.processBlobs(true); - const p2 = runtime.processHandles(); - // finish upload - await Promise.race([p1, p2]); - ac.abort(); - runtime.processOps(); - try { - // finish op - await handleP; - assert.fail("Should not succeed"); - - // TODO: better typing - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.message, "uploadBlob aborted"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.ok(error.uploadTime); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.acked, false); - } - await assert.rejects(handleP); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); + it("includes blob IDs in summary while attaching", async () => { + await createBlob(IsoBuffer.from("blob1", "utf8")); + await createBlob(IsoBuffer.from("blob2", "utf8")); + await createBlob(IsoBuffer.from("blob3", "utf8")); + await runtime.processAll(); + + // While attaching with blobs, Container takes a summary while still in "Detached" + // state. BlobManager should know to include the list of attached blob + // IDs since this summary will be used to create the document + const summaryData = await runtime.attach(); + assert.strictEqual(summaryData?.ids.length, 3); + assert.strictEqual(summaryData?.redirectTable?.length, 3); }); - it("resubmit on aborted pending op", async () => { + it("all blobs attached", async () => { await runtime.attach(); await runtime.connect(); - const ac = new AbortController(); - let handleP: Promise> | undefined; - try { - handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8"), ac.signal); - const p1 = runtime.processBlobs(true); - const p2 = runtime.processHandles(); - // finish upload - await Promise.race([p1, p2]); - runtime.disconnect(); - ac.abort(); - await handleP; - assert.fail("Should not succeed"); - // TODO: better typing - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.message, "uploadBlob aborted"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.ok(error.uploadTime); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert.strictEqual(error.acked, false); - } - await runtime.connect(); - runtime.processOps(); + assert.strictEqual(runtime.blobManager.allBlobsAttached, true); + await createBlob(IsoBuffer.from("blob1", "utf8")); + // We immediately attach the handle in createBlob if pending payloads are enabled + assert.strictEqual(runtime.blobManager.allBlobsAttached, createBlobPayloadPending); + await runtime.processBlobs(true); + assert.strictEqual(runtime.blobManager.allBlobsAttached, createBlobPayloadPending); + await runtime.processAll(); + assert.strictEqual(runtime.blobManager.allBlobsAttached, true); + await createBlob(IsoBuffer.from("blob1", "utf8")); + await createBlob(IsoBuffer.from("blob2", "utf8")); + await createBlob(IsoBuffer.from("blob3", "utf8")); + assert.strictEqual(runtime.blobManager.allBlobsAttached, createBlobPayloadPending); + await runtime.processAll(); + assert.strictEqual(runtime.blobManager.allBlobsAttached, true); + }); - // TODO: `handleP` can be `undefined`; this should be made safer. - await assert.rejects(handleP as Promise>); - const summaryData = validateSummary(runtime); - assert.strictEqual(summaryData.ids.length, 0); - assert.strictEqual(summaryData.redirectTable, undefined); + it("runtime disposed during readBlob - log no error", async () => { + const someId = "someId"; + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call -- Accessing private property + (runtime.blobManager as any).setRedirection(someId, undefined); // To appease an assert + + // Mock storage.readBlob to dispose the runtime and throw an error + Sinon.stub(runtime.storage, "readBlob").callsFake(async (_id: string) => { + runtime.disposed = true; + throw new Error("BOOM!"); + }); + + await assert.rejects( + async () => runtime.blobManager.getBlob(someId, false), + (e: Error) => e.message === "BOOM!", + "Expected getBlob to throw with test error message", + ); + assert(runtime.disposed, "Runtime should be disposed"); + mockLogger.assertMatchNone( + [{ category: "error" }], + "Should not have logged any errors", + undefined, + false /* clearEventsAfterCheck */, + ); + mockLogger.assertMatch( + [{ category: "generic", eventName: "BlobManager:AttachmentReadBlob_cancel" }], + "Expected the _cancel event to be logged with 'generic' category", + ); }); - }); - describe("Garbage Collection", () => { - let redirectTable: Map; + it("waits for blobs from handles with pending payloads without error", async () => { + await runtime.attach(); - /** - * Creates a blob with the given content and returns its local and storage id. - */ - async function createBlobAndGetIds(content: string) { - // For a given blob's GC node id, returns the blob id. - const getBlobIdFromGCNodeId = (gcNodeId: string) => { - const pathParts = gcNodeId.split("/"); - assert( - pathParts.length === 3 && pathParts[1] === blobManagerBasePath, - "Invalid blob node path", - ); - return pathParts[2]; - }; + // Part of remoteUpload, but stop short of processing the message + const response = await runtime.storage.createBlob(IsoBuffer.from("blob", "utf8")); + const op = { metadata: { localId: uuid(), blobId: response.id } }; - // For a given blob's id, returns the GC node id. - const getGCNodeIdFromBlobId = (blobId: string) => { - return `/${blobManagerBasePath}/${blobId}`; - }; + await assert.rejects( + runtime.blobManager.getBlob(op.metadata.localId, false), + "Rejects when attempting to get non-existent, shared-payload blobs", + ); - const blobContents = IsoBuffer.from(content, "utf8"); - const handleP = runtime.createBlob(blobContents); - await runtime.processAll(); + // Try to get the blob that we haven't processed the attach op for yet. + // This simulates having found this ID in a handle with a pending payload that the remote client would have sent + const blobP = runtime.blobManager.getBlob(op.metadata.localId, true); - const blobHandle = await handleP; - const localId = getBlobIdFromGCNodeId(blobHandle.absolutePath); - assert(redirectTable.has(localId), "blob not found in redirect table"); - const storageId = redirectTable.get(localId); - assert(storageId !== undefined, "storage id not found in redirect table"); - return { - localId, - localGCNodeId: getGCNodeIdFromBlobId(localId), - storageId, - storageGCNodeId: getGCNodeIdFromBlobId(storageId), - }; - } + // Process the op as if it were arriving from the remote client, which should cause the blobP promise to resolve + runtime.blobManager.processBlobAttachMessage(op as ISequencedMessageEnvelope, false); - beforeEach(() => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment -- Mutating private property - redirectTable = (runtime.blobManager as any).redirectTable; + // Await the promise to confirm it settles and does not reject + await blobP; }); - it("fetching deleted blob fails", async () => { - await runtime.attach(); - await runtime.connect(); - const blob1Contents = IsoBuffer.from("blob1", "utf8"); - const blob2Contents = IsoBuffer.from("blob2", "utf8"); - const handle1P = runtime.createBlob(blob1Contents); - const handle2P = runtime.createBlob(blob2Contents); - await runtime.processAll(); + describe("Abort Signal", () => { + it("abort before upload", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + ac.abort("abort test"); + try { + const blob = IsoBuffer.from("blob", "utf8"); + await runtime.createBlob(blob, ac.signal); + assert.fail("Should not succeed"); + + // TODO: better typing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (error: any) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.status, undefined); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.uploadTime, undefined); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.acked, undefined); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.message, "uploadBlob aborted"); + } + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - const blob1Handle = await handle1P; - const blob2Handle = await handle2P; + it("abort while upload", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + const blob = IsoBuffer.from("blob", "utf8"); + const handleP = runtime.createBlob(blob, ac.signal); + ac.abort("abort test"); + assert.strictEqual(runtime.unprocessedBlobs.size, 1); + await runtime.processBlobs(true); + try { + await handleP; + assert.fail("Should not succeed"); + // TODO: better typing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (error: any) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.uploadTime, undefined); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.acked, false); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.message, "uploadBlob aborted"); + } + assert(handleP); + await assert.rejects(handleP); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - // Validate that the blobs can be retrieved. - assert.strictEqual(await runtime.getBlob(blob1Handle), blob1Contents); - assert.strictEqual(await runtime.getBlob(blob2Handle), blob2Contents); + it("abort while failed upload", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + const blob = IsoBuffer.from("blob", "utf8"); + const handleP = runtime.createBlob(blob, ac.signal); + const handleP2 = runtime.createBlob(IsoBuffer.from("blob2", "utf8")); + ac.abort("abort test"); + assert.strictEqual(runtime.unprocessedBlobs.size, 2); + await runtime.processBlobs(false); + try { + await handleP; + assert.fail("Should not succeed"); + } catch (error: unknown) { + assert.strictEqual((error as Error).message, "uploadBlob aborted"); + } + try { + await handleP2; + assert.fail("Should not succeed"); + } catch (error: unknown) { + assert.strictEqual((error as Error).message, "fake driver error"); + } + await assert.rejects(handleP); + await assert.rejects(handleP2); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - // Delete blob1. Retrieving it should result in an error. - runtime.deleteBlob(blob1Handle); - await assert.rejects( - async () => runtime.getBlob(blob1Handle), - (error: IErrorBase & { code: number | undefined }) => { - const blob1Id = blob1Handle.absolutePath.split("/")[2]; - const correctErrorType = error.code === 404; - const correctErrorMessage = error.message === `Blob was deleted: ${blob1Id}`; - return correctErrorType && correctErrorMessage; - }, - "Deleted blob2 fetch should have failed", - ); + it("abort while disconnected", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + const blob = IsoBuffer.from("blob", "utf8"); + const handleP = runtime.createBlob(blob, ac.signal); + runtime.disconnect(); + ac.abort(); + await runtime.processBlobs(true); + try { + await handleP; + assert.fail("Should not succeed"); + } catch (error: unknown) { + assert.strictEqual((error as Error).message, "uploadBlob aborted"); + } + await assert.rejects(handleP); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); - // Delete blob2. Retrieving it should result in an error. - runtime.deleteBlob(blob2Handle); - await assert.rejects( - async () => runtime.getBlob(blob2Handle), - (error: IErrorBase & { code: number | undefined }) => { - const blob2Id = blob2Handle.absolutePath.split("/")[2]; - const correctErrorType = error.code === 404; - const correctErrorMessage = error.message === `Blob was deleted: ${blob2Id}`; - return correctErrorType && correctErrorMessage; - }, - "Deleted blob2 fetch should have failed", - ); + it("abort after blob succeeds", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + let handleP: Promise> | undefined; + try { + const blob = IsoBuffer.from("blob", "utf8"); + handleP = runtime.createBlob(blob, ac.signal); + await runtime.processAll(); + ac.abort(); + } catch { + assert.fail("abort after processing should not throw"); + } + assert(handleP); + await assert.doesNotReject(handleP); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 1); + assert.strictEqual(summaryData.redirectTable?.length, 1); + }); + + it("abort while waiting for op", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + const blob = IsoBuffer.from("blob", "utf8"); + const handleP = runtime.createBlob(blob, ac.signal); + const p1 = runtime.processBlobs(true); + const p2 = runtime.processHandles(); + // finish upload + await Promise.race([p1, p2]); + ac.abort(); + runtime.processOps(); + try { + // finish op + await handleP; + assert.fail("Should not succeed"); + + // TODO: better typing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (error: any) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.message, "uploadBlob aborted"); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.ok(error.uploadTime); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.acked, false); + } + await assert.rejects(handleP); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); + + it("resubmit on aborted pending op", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + await runtime.attach(); + await runtime.connect(); + const ac = new AbortController(); + let handleP: Promise> | undefined; + try { + handleP = runtime.createBlob(IsoBuffer.from("blob", "utf8"), ac.signal); + const p1 = runtime.processBlobs(true); + const p2 = runtime.processHandles(); + // finish upload + await Promise.race([p1, p2]); + runtime.disconnect(); + ac.abort(); + await handleP; + assert.fail("Should not succeed"); + // TODO: better typing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (error: any) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.message, "uploadBlob aborted"); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.ok(error.uploadTime); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + assert.strictEqual(error.acked, false); + } + await runtime.connect(); + runtime.processOps(); + + // TODO: `handleP` can be `undefined`; this should be made safer. + await assert.rejects(handleP as Promise>); + const summaryData = validateSummary(runtime); + assert.strictEqual(summaryData.ids.length, 0); + assert.strictEqual(summaryData.redirectTable, undefined); + }); }); - // Support for this config has been removed. - const legacyKey_disableAttachmentBlobSweep = - "Fluid.GarbageCollection.DisableAttachmentBlobSweep"; - for (const disableAttachmentBlobsSweep of [true, undefined]) - it(`deletes unused blobs regardless of DisableAttachmentBlobsSweep setting [DisableAttachmentBlobsSweep=${disableAttachmentBlobsSweep}]`, async () => { - injectedSettings[legacyKey_disableAttachmentBlobSweep] = disableAttachmentBlobsSweep; + describe("Garbage Collection", () => { + let redirectTable: Map; + + /** + * Creates a blob with the given content and returns its local and storage id. + */ + async function createBlobAndGetIds(content: string) { + // For a given blob's GC node id, returns the blob id. + const getBlobIdFromGCNodeId = (gcNodeId: string) => { + const pathParts = gcNodeId.split("/"); + assert( + pathParts.length === 3 && pathParts[1] === blobManagerBasePath, + "Invalid blob node path", + ); + return pathParts[2]; + }; + + // For a given blob's id, returns the GC node id. + const getGCNodeIdFromBlobId = (blobId: string) => { + return `/${blobManagerBasePath}/${blobId}`; + }; + + const blobContents = IsoBuffer.from(content, "utf8"); + const handleP = runtime.createBlob(blobContents); + await runtime.processAll(); + + const blobHandle = await handleP; + const localId = getBlobIdFromGCNodeId(blobHandle.absolutePath); + assert(redirectTable.has(localId), "blob not found in redirect table"); + const storageId = redirectTable.get(localId); + assert(storageId !== undefined, "storage id not found in redirect table"); + return { + localId, + localGCNodeId: getGCNodeIdFromBlobId(localId), + storageId, + storageGCNodeId: getGCNodeIdFromBlobId(storageId), + }; + } + + beforeEach(() => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment -- Mutating private property + redirectTable = (runtime.blobManager as any).redirectTable; + }); + it("fetching deleted blob fails", async () => { await runtime.attach(); await runtime.connect(); + const blob1Contents = IsoBuffer.from("blob1", "utf8"); + const blob2Contents = IsoBuffer.from("blob2", "utf8"); + const handle1P = runtime.createBlob(blob1Contents); + const handle2P = runtime.createBlob(blob2Contents); + await runtime.processAll(); + + const blob1Handle = await handle1P; + const blob2Handle = await handle2P; + + // Validate that the blobs can be retrieved. + assert.strictEqual(await runtime.getBlob(blob1Handle), blob1Contents); + assert.strictEqual(await runtime.getBlob(blob2Handle), blob2Contents); + + // Delete blob1. Retrieving it should result in an error. + runtime.deleteBlob(blob1Handle); + await assert.rejects( + async () => runtime.getBlob(blob1Handle), + (error: IErrorBase & { code: number | undefined }) => { + const blob1Id = blob1Handle.absolutePath.split("/")[2]; + const correctErrorType = error.code === 404; + const correctErrorMessage = error.message === `Blob was deleted: ${blob1Id}`; + return correctErrorType && correctErrorMessage; + }, + "Deleted blob2 fetch should have failed", + ); + + // Delete blob2. Retrieving it should result in an error. + runtime.deleteBlob(blob2Handle); + await assert.rejects( + async () => runtime.getBlob(blob2Handle), + (error: IErrorBase & { code: number | undefined }) => { + const blob2Id = blob2Handle.absolutePath.split("/")[2]; + const correctErrorType = error.code === 404; + const correctErrorMessage = error.message === `Blob was deleted: ${blob2Id}`; + return correctErrorType && correctErrorMessage; + }, + "Deleted blob2 fetch should have failed", + ); + }); + + // Support for this config has been removed. + const legacyKey_disableAttachmentBlobSweep = + "Fluid.GarbageCollection.DisableAttachmentBlobSweep"; + for (const disableAttachmentBlobsSweep of [true, undefined]) + it(`deletes unused blobs regardless of DisableAttachmentBlobsSweep setting [DisableAttachmentBlobsSweep=${disableAttachmentBlobsSweep}]`, async () => { + injectedSettings[legacyKey_disableAttachmentBlobSweep] = disableAttachmentBlobsSweep; + + await runtime.attach(); + await runtime.connect(); + + const blob1 = await createBlobAndGetIds("blob1"); + const blob2 = await createBlobAndGetIds("blob2"); + + // Delete blob1's local id. The local id and the storage id should both be deleted from the redirect table + // since the blob only had one reference. + runtime.blobManager.deleteSweepReadyNodes([blob1.localGCNodeId]); + assert(!redirectTable.has(blob1.localId)); + assert(!redirectTable.has(blob1.storageId)); + + // Delete blob2's local id. The local id and the storage id should both be deleted from the redirect table + // since the blob only had one reference. + runtime.blobManager.deleteSweepReadyNodes([blob2.localGCNodeId]); + assert(!redirectTable.has(blob2.localId)); + assert(!redirectTable.has(blob2.storageId)); + }); + it("deletes unused de-duped blobs", async () => { + await runtime.attach(); + await runtime.connect(); + + // Create 2 blobs with the same content. They should get de-duped. const blob1 = await createBlobAndGetIds("blob1"); + const blob1Duplicate = await createBlobAndGetIds("blob1"); + assert(blob1.storageId === blob1Duplicate.storageId, "blob1 not de-duped"); + + // Create another 2 blobs with the same content. They should get de-duped. const blob2 = await createBlobAndGetIds("blob2"); + const blob2Duplicate = await createBlobAndGetIds("blob2"); + assert(blob2.storageId === blob2Duplicate.storageId, "blob2 not de-duped"); - // Delete blob1's local id. The local id and the storage id should both be deleted from the redirect table - // since the blob only had one reference. + // Delete blob1's local id. The local id should both be deleted from the redirect table but the storage id + // should not because the blob has another referenced from the de-duped blob. runtime.blobManager.deleteSweepReadyNodes([blob1.localGCNodeId]); - assert(!redirectTable.has(blob1.localId)); - assert(!redirectTable.has(blob1.storageId)); + assert(!redirectTable.has(blob1.localId), "blob1 localId should have been deleted"); + assert( + redirectTable.has(blob1.storageId), + "blob1 storageId should not have been deleted", + ); + // Delete blob1's de-duped local id. The local id and the storage id should both be deleted from the redirect table + // since all the references for the blob are now deleted. + runtime.blobManager.deleteSweepReadyNodes([blob1Duplicate.localGCNodeId]); + assert( + !redirectTable.has(blob1Duplicate.localId), + "blob1Duplicate localId should have been deleted", + ); + assert( + !redirectTable.has(blob1.storageId), + "blob1 storageId should have been deleted", + ); - // Delete blob2's local id. The local id and the storage id should both be deleted from the redirect table - // since the blob only had one reference. + // Delete blob2's local id. The local id should both be deleted from the redirect table but the storage id + // should not because the blob has another referenced from the de-duped blob. runtime.blobManager.deleteSweepReadyNodes([blob2.localGCNodeId]); - assert(!redirectTable.has(blob2.localId)); - assert(!redirectTable.has(blob2.storageId)); + assert(!redirectTable.has(blob2.localId), "blob2 localId should have been deleted"); + assert( + redirectTable.has(blob2.storageId), + "blob2 storageId should not have been deleted", + ); + // Delete blob2's de-duped local id. The local id and the storage id should both be deleted from the redirect table + // since all the references for the blob are now deleted. + runtime.blobManager.deleteSweepReadyNodes([blob2Duplicate.localGCNodeId]); + assert( + !redirectTable.has(blob2Duplicate.localId), + "blob2Duplicate localId should have been deleted", + ); + assert( + !redirectTable.has(blob2.storageId), + "blob2 storageId should have been deleted", + ); }); - - it("deletes unused de-duped blobs", async () => { - await runtime.attach(); - await runtime.connect(); - - // Create 2 blobs with the same content. They should get de-duped. - const blob1 = await createBlobAndGetIds("blob1"); - const blob1Duplicate = await createBlobAndGetIds("blob1"); - assert(blob1.storageId === blob1Duplicate.storageId, "blob1 not de-duped"); - - // Create another 2 blobs with the same content. They should get de-duped. - const blob2 = await createBlobAndGetIds("blob2"); - const blob2Duplicate = await createBlobAndGetIds("blob2"); - assert(blob2.storageId === blob2Duplicate.storageId, "blob2 not de-duped"); - - // Delete blob1's local id. The local id should both be deleted from the redirect table but the storage id - // should not because the blob has another referenced from the de-duped blob. - runtime.blobManager.deleteSweepReadyNodes([blob1.localGCNodeId]); - assert(!redirectTable.has(blob1.localId), "blob1 localId should have been deleted"); - assert( - redirectTable.has(blob1.storageId), - "blob1 storageId should not have been deleted", - ); - // Delete blob1's de-duped local id. The local id and the storage id should both be deleted from the redirect table - // since all the references for the blob are now deleted. - runtime.blobManager.deleteSweepReadyNodes([blob1Duplicate.localGCNodeId]); - assert( - !redirectTable.has(blob1Duplicate.localId), - "blob1Duplicate localId should have been deleted", - ); - assert(!redirectTable.has(blob1.storageId), "blob1 storageId should have been deleted"); - - // Delete blob2's local id. The local id should both be deleted from the redirect table but the storage id - // should not because the blob has another referenced from the de-duped blob. - runtime.blobManager.deleteSweepReadyNodes([blob2.localGCNodeId]); - assert(!redirectTable.has(blob2.localId), "blob2 localId should have been deleted"); - assert( - redirectTable.has(blob2.storageId), - "blob2 storageId should not have been deleted", - ); - // Delete blob2's de-duped local id. The local id and the storage id should both be deleted from the redirect table - // since all the references for the blob are now deleted. - runtime.blobManager.deleteSweepReadyNodes([blob2Duplicate.localGCNodeId]); - assert( - !redirectTable.has(blob2Duplicate.localId), - "blob2Duplicate localId should have been deleted", - ); - assert(!redirectTable.has(blob2.storageId), "blob2 storageId should have been deleted"); }); }); -}); +} diff --git a/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts b/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts index 8c553f75136e..653adcd404a1 100644 --- a/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts +++ b/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts @@ -55,7 +55,7 @@ function createBlobManager(overrides?: Partial {}, blobRequested: () => {}, isBlobDeleted: () => false, - createBlobPlaceholders: false, + createBlobPayloadPending: false, // overrides ...overrides, diff --git a/packages/runtime/container-runtime/src/test/getPendingBlobs.spec.ts b/packages/runtime/container-runtime/src/test/getPendingBlobs.spec.ts index 357c0d995724..4c47e01e5456 100644 --- a/packages/runtime/container-runtime/src/test/getPendingBlobs.spec.ts +++ b/packages/runtime/container-runtime/src/test/getPendingBlobs.spec.ts @@ -22,7 +22,7 @@ describe("getPendingLocalState", () => { beforeEach(() => { mc = mixinMonitoringContext(createChildLogger(), undefined); - runtime = new MockRuntime(mc); + runtime = new MockRuntime(mc, false /* createBlobPayloadPending */); }); it("get blobs while uploading", async () => { @@ -43,7 +43,13 @@ describe("getPendingLocalState", () => { assert.strictEqual(summaryData.ids.length, 0); assert.strictEqual(summaryData.redirectTable, undefined); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(); await runtime2.processAll(); @@ -72,7 +78,13 @@ describe("getPendingLocalState", () => { assert.strictEqual(summaryData.ids.length, 0); assert.strictEqual(summaryData.redirectTable, undefined); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(); await runtime2.processAll(); @@ -102,7 +114,13 @@ describe("getPendingLocalState", () => { assert.strictEqual(summaryData.ids.length, 0); assert.strictEqual(summaryData.redirectTable, undefined); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(); await runtime2.processAll(); @@ -136,7 +154,13 @@ describe("getPendingLocalState", () => { assert.strictEqual(summaryData.ids.length, 0); assert.strictEqual(summaryData.redirectTable, undefined); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(); await runtime2.processAll(); @@ -164,7 +188,13 @@ describe("getPendingLocalState", () => { assert.strictEqual(summaryData.ids.length, 0); assert.strictEqual(summaryData.redirectTable, undefined); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(0, true); await runtime2.processAll(); @@ -188,7 +218,13 @@ describe("getPendingLocalState", () => { assert.ok(pendingBlobs[Object.keys(pendingBlobs)[0]].storageId); const summaryData = validateSummary(runtime); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); assert.strictEqual(runtime2.unprocessedBlobs.size, 0); await runtime2.connect(); @@ -215,7 +251,13 @@ describe("getPendingLocalState", () => { assert.ok(pendingBlobs[Object.keys(pendingBlobs)[0]].storageId); const summaryData = validateSummary(runtime); - const runtime2 = new MockRuntime(mc, summaryData, false, pendingState); + const runtime2 = new MockRuntime( + mc, + false, // createBlobPayloadPending + summaryData, + false, + pendingState, + ); await runtime2.attach(); await runtime2.connect(); await runtime2.processAll(); diff --git a/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md b/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md index 93d8e08b6dd2..3f2bc84f1c08 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md @@ -80,7 +80,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; // (undocumented) resolveHandle(request: IRequest): Promise; - reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; + reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown, squash: boolean): void; rollback?(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; // (undocumented) get rootRoutingContext(): this; diff --git a/packages/runtime/runtime-utils/api-report/runtime-utils.legacy.alpha.api.md b/packages/runtime/runtime-utils/api-report/runtime-utils.legacy.alpha.api.md index f61778a9b6dd..f16fcea183f2 100644 --- a/packages/runtime/runtime-utils/api-report/runtime-utils.legacy.alpha.api.md +++ b/packages/runtime/runtime-utils/api-report/runtime-utils.legacy.alpha.api.md @@ -33,6 +33,12 @@ export abstract class FluidHandleBase implements IFluidHandleInternal { // @public export function isFluidHandle(value: unknown): value is IFluidHandle; +// @alpha @legacy +export const isFluidHandlePayloadPending: (handle: IFluidHandle) => handle is IFluidHandlePayloadPending; + +// @alpha @legacy +export const isLocalFluidHandle: (handle: IFluidHandle) => handle is ILocalFluidHandle; + // @alpha @legacy export class RequestParser implements IRequest { protected constructor(request: Readonly); diff --git a/packages/runtime/runtime-utils/src/handles.ts b/packages/runtime/runtime-utils/src/handles.ts index 3c5eeb7d1e06..0ec0a092623b 100644 --- a/packages/runtime/runtime-utils/src/handles.ts +++ b/packages/runtime/runtime-utils/src/handles.ts @@ -8,6 +8,8 @@ import { IFluidHandle, fluidHandleSymbol } from "@fluidframework/core-interfaces import type { IFluidHandleInternal, IFluidHandleInternalPayloadPending, + IFluidHandlePayloadPending, + ILocalFluidHandle, } from "@fluidframework/core-interfaces/internal"; /** @@ -48,6 +50,29 @@ export const isFluidHandleInternalPayloadPending = ( ): fluidHandleInternal is IFluidHandleInternalPayloadPending => "payloadPending" in fluidHandleInternal && fluidHandleInternal.payloadPending === true; +/** + * Check if the handle is an IFluidHandlePayloadPending. + * @privateRemarks + * This should be true for locally-created BlobHandles currently. When IFluidHandlePayloadPending is merged + * to IFluidHandle, this type guard will no longer be necessary. + * @legacy + * @alpha + */ +export const isFluidHandlePayloadPending = ( + handle: IFluidHandle, +): handle is IFluidHandlePayloadPending => + "payloadState" in handle && + (handle.payloadState === "shared" || handle.payloadState === "pending"); + +/** + * Check if the handle is an ILocalFluidHandle. + * @legacy + * @alpha + */ +export const isLocalFluidHandle = ( + handle: IFluidHandle, +): handle is ILocalFluidHandle => + isFluidHandlePayloadPending(handle) && "payloadShareError" in handle; /** * Encodes the given IFluidHandle into a JSON-serializable form, * @param handle - The IFluidHandle to serialize. diff --git a/packages/runtime/runtime-utils/src/index.ts b/packages/runtime/runtime-utils/src/index.ts index 62424a16139b..4dd504761ac1 100644 --- a/packages/runtime/runtime-utils/src/index.ts +++ b/packages/runtime/runtime-utils/src/index.ts @@ -11,15 +11,17 @@ export { responseToException, } from "./dataStoreHelpers.js"; export { + compareFluidHandles, encodeHandleForSerialization, + FluidHandleBase, ISerializedHandle, - isSerializedHandle, isFluidHandle, isFluidHandleInternalPayloadPending, + isFluidHandlePayloadPending, + isLocalFluidHandle, + isSerializedHandle, toFluidHandleErased, toFluidHandleInternal, - FluidHandleBase, - compareFluidHandles, } from "./handles.js"; export { ObjectStoragePartition } from "./objectstoragepartition.js"; export { diff --git a/packages/test/test-end-to-end-tests/src/test/blobs.spec.ts b/packages/test/test-end-to-end-tests/src/test/blobs.spec.ts index f0c095b6ef0d..01da961261ec 100644 --- a/packages/test/test-end-to-end-tests/src/test/blobs.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/blobs.spec.ts @@ -19,7 +19,7 @@ import { DefaultSummaryConfiguration, type IContainerRuntimeOptionsInternal, } from "@fluidframework/container-runtime/internal"; -import { IErrorBase, IFluidHandle } from "@fluidframework/core-interfaces"; +import type { IErrorBase, IFluidHandle } from "@fluidframework/core-interfaces"; import { Deferred } from "@fluidframework/core-utils/internal"; import { IDocumentServiceFactory } from "@fluidframework/driver-definitions/internal"; import { ReferenceType } from "@fluidframework/merge-tree/internal"; @@ -43,7 +43,10 @@ import { getUrlFromDetachedBlobStorage, } from "./mockDetachedBlobStorage.js"; -function makeTestContainerConfig(registry: ChannelFactoryRegistry): ITestContainerConfig { +function makeTestContainerConfig( + registry: ChannelFactoryRegistry, + createBlobPayloadPending: true | undefined, +): ITestContainerConfig { return { runtimeOptions: { summaryOptions: { @@ -60,6 +63,8 @@ function makeTestContainerConfig(registry: ChannelFactoryRegistry): ITestContain }, }, }, + explicitSchemaControl: createBlobPayloadPending, + createBlobPayloadPending, }, registry, }; @@ -89,350 +94,372 @@ const ContainerStateEventsOrErrors: ExpectedEvents = { ], }; -describeCompat("blobs", "FullCompat", (getTestObjectProvider, apis) => { - const { SharedString } = apis.dds; - const testContainerConfig = makeTestContainerConfig([ - ["sharedString", SharedString.getFactory()], - ]); - - let provider: ITestObjectProvider; - beforeEach("getTestObjectProvider", async function () { - provider = getTestObjectProvider(); - // Currently, AFR does not support blob API. - if (provider.driver.type === "routerlicious" && provider.driver.endpointName === "frs") { - this.skip(); - } - }); - - it("attach sends an op", async function () { - const container = await provider.makeTestContainer(testContainerConfig); - - const dataStore = await getContainerEntryPointBackCompat(container); - - const blobOpP = new Promise((resolve, reject) => - dataStore._context.containerRuntime.on("op", (op) => { - if (op.type === ContainerMessageType.BlobAttach) { - if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { - resolve(); - } else { - reject(new Error("no op metadata")); - } +for (const createBlobPayloadPending of [undefined, true] as const) { + describeCompat( + `blobs (createBlobPayloadPending: ${createBlobPayloadPending})`, + "FullCompat", + (getTestObjectProvider, apis) => { + const { SharedString } = apis.dds; + const testContainerConfig = makeTestContainerConfig( + [["sharedString", SharedString.getFactory()]], + createBlobPayloadPending, + ); + + let provider: ITestObjectProvider; + beforeEach("getTestObjectProvider", async function () { + provider = getTestObjectProvider(); + // Currently, AFR does not support blob API. + if ( + provider.driver.type === "routerlicious" && + provider.driver.endpointName === "frs" + ) { + this.skip(); } - }), - ); - - const blob = await dataStore._runtime.uploadBlob( - stringToBuffer("some random text", "utf-8"), - ); - - dataStore._root.set("my blob", blob); - - await blobOpP; - }); - - it("can get remote attached blob", async function () { - // TODO: Re-enable after cross version compat bugs are fixed - ADO:6286 - if (provider.type === "TestObjectProviderWithVersionedLoad") { - this.skip(); - } - const testString = "this is a test string"; - const testKey = "a blob"; - const container1 = await provider.makeTestContainer(testContainerConfig); - - const dataStore1 = await getContainerEntryPointBackCompat(container1); - - const blob = await dataStore1._runtime.uploadBlob(stringToBuffer(testString, "utf-8")); - dataStore1._root.set(testKey, blob); - - const container2 = await provider.loadTestContainer(testContainerConfig); - const dataStore2 = await getContainerEntryPointBackCompat(container2); - - await provider.ensureSynchronized(); - - const blobHandle = dataStore2._root.get>(testKey); - assert(blobHandle); - assert.strictEqual(bufferToString(await blobHandle.get(), "utf-8"), testString); - }); - - it("round trip blob handle on shared string property", async function () { - // TODO: Re-enable after cross version compat bugs are fixed - ADO:6286 - if (provider.type === "TestObjectProviderWithVersionedLoad") { - this.skip(); - } - const container1 = await provider.makeTestContainer(testContainerConfig); - const container2 = await provider.loadTestContainer(testContainerConfig); - const testString = "this is a test string"; - // setup - { - const dataStore = await getContainerEntryPointBackCompat(container2); - const sharedString = SharedString.create(dataStore._runtime, uuid()); - dataStore._root.set("sharedString", sharedString.handle); + }); - const blob = await dataStore._runtime.uploadBlob(stringToBuffer(testString, "utf-8")); + it("attach sends an op", async function () { + const container = await provider.makeTestContainer(testContainerConfig); - sharedString.insertMarker(0, ReferenceType.Simple, { blob }); + const dataStore = await getContainerEntryPointBackCompat(container); - // wait for summarize, then summary ack so the next container will load from snapshot - await new Promise((resolve, reject) => { - let summarized = false; - container1.on("op", (op) => { - switch (op.type) { - case "summaryAck": { - if (summarized) { + const blobOpP = new Promise((resolve, reject) => + dataStore._context.containerRuntime.on("op", (op) => { + if (op.type === ContainerMessageType.BlobAttach) { + if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { resolve(); + } else { + reject(new Error("no op metadata")); } - break; - } - case "summaryNack": { - reject(new Error("summaryNack")); - break; - } - case "summarize": { - summarized = true; - break; } - default: { - break; - } - } - }); + }), + ); + + const blob = await dataStore._runtime.uploadBlob( + stringToBuffer("some random text", "utf-8"), + ); + + dataStore._root.set("my blob", blob); + + await blobOpP; }); - } - - // validate on remote container, local container, and container loaded from summary - for (const container of [ - container1, - container2, - await provider.loadTestContainer(testContainerConfig), - ]) { - const dataStore2 = await getContainerEntryPointBackCompat(container); - await provider.ensureSynchronized(); - const handle = dataStore2._root.get>("sharedString"); - assert(handle); - const sharedString2 = await handle.get(); - - const props = sharedString2.getPropertiesAtPosition(0); - - assert.strictEqual(bufferToString(await props?.blob.get(), "utf-8"), testString); - } - }); - - it("correctly handles simultaneous identical blob upload on one container", async () => { - const container = await provider.makeTestContainer(testContainerConfig); - const dataStore = await getContainerEntryPointBackCompat(container); - const blob = stringToBuffer("some different yet still random text", "utf-8"); - - // upload the blob twice and make sure nothing bad happens. - await Promise.all([ - dataStore._runtime.uploadBlob(blob), - dataStore._runtime.uploadBlob(blob), - ]); - }); - - [false, true].forEach((enableGroupedBatching) => { - it(`attach sends ops with compression enabled and ${ - enableGroupedBatching ? "grouped" : "regular" - } batching`, async function () { - // Tracked by AB#4130, the test run on the tinylicous driver is disabled temporarily to ensure normal operation of the build-client package pipeline - if (provider.driver.type === "tinylicious" || provider.driver.type === "t9s") { - this.skip(); - } - // Skip this test for standard r11s as its flaky and non-reproducible - if (provider.driver.type === "r11s" && provider.driver.endpointName !== "frs") { - this.skip(); - } + it("can get remote attached blob", async function () { + // TODO: Re-enable after cross version compat bugs are fixed - ADO:6286 + if (provider.type === "TestObjectProviderWithVersionedLoad") { + this.skip(); + } + const testString = "this is a test string"; + const testKey = "a blob"; + const container1 = await provider.makeTestContainer(testContainerConfig); - const runtimeOptions: IContainerRuntimeOptionsInternal = { - ...testContainerConfig.runtimeOptions, - compressionOptions: { - minimumBatchSizeInBytes: enableGroupedBatching ? 1 : Number.POSITIVE_INFINITY, - compressionAlgorithm: CompressionAlgorithms.lz4, - }, - enableGroupedBatching, - }; + const dataStore1 = await getContainerEntryPointBackCompat(container1); + + const blob = await dataStore1._runtime.uploadBlob(stringToBuffer(testString, "utf-8")); + dataStore1._root.set(testKey, blob); + + const container2 = await provider.loadTestContainer(testContainerConfig); + const dataStore2 = await getContainerEntryPointBackCompat(container2); + + await provider.ensureSynchronized(); - const container = await provider.makeTestContainer({ - ...testContainerConfig, - runtimeOptions, + const blobHandle = dataStore2._root.get>(testKey); + assert(blobHandle); + assert.strictEqual(bufferToString(await blobHandle.get(), "utf-8"), testString); }); - const dataStore = await getContainerEntryPointBackCompat(container); - const blobOpP = timeoutPromise((resolve, reject) => - dataStore._context.containerRuntime.on("op", (op) => { - if (op.type === ContainerMessageType.BlobAttach) { - if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { - resolve(); - } else { - reject(new Error("no op metadata")); - } - } - }), - ); + it("round trip blob handle on shared string property", async function () { + // TODO: Re-enable after cross version compat bugs are fixed - ADO:6286 + if (provider.type === "TestObjectProviderWithVersionedLoad") { + this.skip(); + } + const container1 = await provider.makeTestContainer(testContainerConfig); + const container2 = await provider.loadTestContainer(testContainerConfig); + const testString = "this is a test string"; + // setup + { + const dataStore = + await getContainerEntryPointBackCompat(container2); + const sharedString = SharedString.create(dataStore._runtime, uuid()); + dataStore._root.set("sharedString", sharedString.handle); + + const blob = await dataStore._runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ); - for (let i = 0; i < 5; i++) { - const blob = await dataStore._runtime.uploadBlob( - stringToBuffer("some random text", "utf-8"), - ); + sharedString.insertMarker(0, ReferenceType.Simple, { blob }); + + // wait for summarize, then summary ack so the next container will load from snapshot + await new Promise((resolve, reject) => { + let summarized = false; + container1.on("op", (op) => { + switch (op.type) { + case "summaryAck": { + if (summarized) { + resolve(); + } + break; + } + case "summaryNack": { + reject(new Error("summaryNack")); + break; + } + case "summarize": { + summarized = true; + break; + } + default: { + break; + } + } + }); + }); + } - dataStore._root.set(`Blob #${i}`, blob); - } + // validate on remote container, local container, and container loaded from summary + for (const container of [ + container1, + container2, + await provider.loadTestContainer(testContainerConfig), + ]) { + const dataStore2 = + await getContainerEntryPointBackCompat(container); + await provider.ensureSynchronized(); + const handle = dataStore2._root.get>("sharedString"); + assert(handle); + const sharedString2 = await handle.get(); - await blobOpP; - }); - }); -}); - -// this functionality was added in 0.47 and can be added to the compat-enabled -// tests above when the LTS version is bumped > 0.47 -describeCompat("blobs", "NoCompat", (getTestObjectProvider, apis) => { - const { SharedString } = apis.dds; - const testContainerConfig = makeTestContainerConfig([ - ["sharedString", SharedString.getFactory()], - ]); - - let provider: ITestObjectProvider; - let testPersistedCache: TestPersistedCache; - beforeEach("getTestObjectProvider", async function () { - testPersistedCache = new TestPersistedCache(); - provider = getTestObjectProvider({ persistedCache: testPersistedCache }); - // Currently AFR does not support blob API. - if (provider.driver.type === "routerlicious" && provider.driver.endpointName === "frs") { - this.skip(); - } - }); - - // this test relies on an internal function that has been renamed (snapshot -> summarize) - it("loads from snapshot", async function () { - // GitHub Issue: #9534 - if (!driverSupportsBlobs(provider.driver)) { - this.skip(); - } - const container1 = await provider.makeTestContainer(testContainerConfig); - const dataStore = (await container1.getEntryPoint()) as ITestDataObject; - - const attachOpP = new Promise((resolve, reject) => - container1.on("op", (op) => { - if ( - typeof op.contents === "string" && - (JSON.parse(op.contents) as { type?: unknown })?.type === - ContainerMessageType.BlobAttach - ) { - if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { - resolve(); - } else { - reject(new Error("no op metadata")); - } + const props = sharedString2.getPropertiesAtPosition(0); + + assert.strictEqual(bufferToString(await props?.blob.get(), "utf-8"), testString); } - }), - ); - - const blob = await dataStore._runtime.uploadBlob( - stringToBuffer("some random text", "utf-8"), - ); - - // this will send the blob attach op on < 0.41 runtime (otherwise it's sent at time of upload) - dataStore._root.set("my blob", blob); - await attachOpP; - - const snapshot1 = (container1 as any).runtime.blobManager.summarize(); - - // wait for summarize, then summary ack so the next container will load from snapshot - await new Promise((resolve, reject) => { - let summarized = false; - container1.on("op", (op) => { - switch (op.type) { - case "summaryAck": { - if (summarized) { - resolve(); - } - break; - } - case "summaryNack": { - reject(new Error("summaryNack")); - break; + }); + + it("correctly handles simultaneous identical blob upload on one container", async () => { + const container = await provider.makeTestContainer(testContainerConfig); + const dataStore = await getContainerEntryPointBackCompat(container); + const blob = stringToBuffer("some different yet still random text", "utf-8"); + + // upload the blob twice and make sure nothing bad happens. + await Promise.all([ + dataStore._runtime.uploadBlob(blob), + dataStore._runtime.uploadBlob(blob), + ]); + }); + + [false, true].forEach((enableGroupedBatching) => { + it(`attach sends ops with compression enabled and ${ + enableGroupedBatching ? "grouped" : "regular" + } batching`, async function () { + // Tracked by AB#4130, the test run on the tinylicous driver is disabled temporarily to ensure normal operation of the build-client package pipeline + if (provider.driver.type === "tinylicious" || provider.driver.type === "t9s") { + this.skip(); } - case "summarize": { - summarized = true; - break; + + // Skip this test for standard r11s as its flaky and non-reproducible + if (provider.driver.type === "r11s" && provider.driver.endpointName !== "frs") { + this.skip(); } - default: { - break; + + const runtimeOptions: IContainerRuntimeOptionsInternal = { + ...testContainerConfig.runtimeOptions, + compressionOptions: { + minimumBatchSizeInBytes: enableGroupedBatching ? 1 : Number.POSITIVE_INFINITY, + compressionAlgorithm: CompressionAlgorithms.lz4, + }, + enableGroupedBatching, + }; + + const container = await provider.makeTestContainer({ + ...testContainerConfig, + runtimeOptions, + }); + + const dataStore = await getContainerEntryPointBackCompat(container); + const blobOpP = timeoutPromise((resolve, reject) => + dataStore._context.containerRuntime.on("op", (op) => { + if (op.type === ContainerMessageType.BlobAttach) { + if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { + resolve(); + } else { + reject(new Error("no op metadata")); + } + } + }), + ); + + for (let i = 0; i < 5; i++) { + const blob = await dataStore._runtime.uploadBlob( + stringToBuffer("some random text", "utf-8"), + ); + + dataStore._root.set(`Blob #${i}`, blob); } + + await blobOpP; + }); + }); + }, + ); + + // this functionality was added in 0.47 and can be added to the compat-enabled + // tests above when the LTS version is bumped > 0.47 + describeCompat( + `blobs (createBlobPayloadPending: ${createBlobPayloadPending})`, + "NoCompat", + (getTestObjectProvider, apis) => { + const { SharedString } = apis.dds; + const testContainerConfig = makeTestContainerConfig( + [["sharedString", SharedString.getFactory()]], + createBlobPayloadPending, + ); + + let provider: ITestObjectProvider; + let testPersistedCache: TestPersistedCache; + beforeEach("getTestObjectProvider", async function () { + testPersistedCache = new TestPersistedCache(); + provider = getTestObjectProvider({ persistedCache: testPersistedCache }); + // Currently AFR does not support blob API. + if ( + provider.driver.type === "routerlicious" && + provider.driver.endpointName === "frs" + ) { + this.skip(); } }); - }); - - // Make sure the next container loads from the network so as to get latest snapshot. - testPersistedCache.clearCache(); - const container2 = await provider.loadTestContainer(testContainerConfig); - const snapshot2 = (container2 as any).runtime.blobManager.summarize(); - assert.strictEqual(snapshot2.stats.treeNodeCount, 1); - assert.strictEqual(snapshot1.summary.tree[0].id, snapshot2.summary.tree[0].id); - }); - - // regression test for https://github.com/microsoft/FluidFramework/issues/9702 - // this was fixed in 0.58.3000 - it("correctly handles simultaneous identical blob upload on separate containers", async () => { - const container1 = await provider.makeTestContainer(testContainerConfig); - const container2 = await provider.loadTestContainer(testContainerConfig); - const dataStore1 = (await container1.getEntryPoint()) as ITestDataObject; - const dataStore2 = (await container2.getEntryPoint()) as ITestDataObject; - const blob = stringToBuffer("some different yet still random text", "utf-8"); - await waitForContainerConnection(container1); - await waitForContainerConnection(container2); - // pause so the ops are in flight at the same time - await provider.opProcessingController.pauseProcessing(); - - // upload the blob twice and make sure nothing bad happens. - const uploadP = Promise.all([ - dataStore1._runtime.uploadBlob(blob), - dataStore2._runtime.uploadBlob(blob), - ]); - provider.opProcessingController.resumeProcessing(); - await uploadP; - }); - - it("reconnection does not block ops when having pending blobs", async () => { - const uploadBlobPromise = new Deferred(); - const container1 = await provider.makeTestContainer({ - ...testContainerConfig, - loaderProps: { - documentServiceFactory: wrapObjectAndOverride(provider.documentServiceFactory, { - createDocumentService: { - connectToStorage: { - createBlob: (dss) => async (blob) => { - // Wait for the uploadBlobPromise to be resolved - await uploadBlobPromise.promise; - return dss.createBlob(blob); + + // this test relies on an internal function that has been renamed (snapshot -> summarize) + it("loads from snapshot", async function () { + // GitHub Issue: #9534 + if (!driverSupportsBlobs(provider.driver)) { + this.skip(); + } + const container1 = await provider.makeTestContainer(testContainerConfig); + const dataStore = (await container1.getEntryPoint()) as ITestDataObject; + + const attachOpP = new Promise((resolve, reject) => + container1.on("op", (op) => { + if ( + typeof op.contents === "string" && + (JSON.parse(op.contents) as { type?: unknown })?.type === + ContainerMessageType.BlobAttach + ) { + if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) { + resolve(); + } else { + reject(new Error("no op metadata")); + } + } + }), + ); + + const blob = await dataStore._runtime.uploadBlob( + stringToBuffer("some random text", "utf-8"), + ); + + // this will send the blob attach op on < 0.41 runtime (otherwise it's sent at time of upload) + dataStore._root.set("my blob", blob); + await attachOpP; + + const snapshot1 = (container1 as any).runtime.blobManager.summarize(); + + // wait for summarize, then summary ack so the next container will load from snapshot + await new Promise((resolve, reject) => { + let summarized = false; + container1.on("op", (op) => { + switch (op.type) { + case "summaryAck": { + if (summarized) { + resolve(); + } + break; + } + case "summaryNack": { + reject(new Error("summaryNack")); + break; + } + case "summarize": { + summarized = true; + break; + } + default: { + break; + } + } + }); + }); + + // Make sure the next container loads from the network so as to get latest snapshot. + testPersistedCache.clearCache(); + const container2 = await provider.loadTestContainer(testContainerConfig); + const snapshot2 = (container2 as any).runtime.blobManager.summarize(); + assert.strictEqual(snapshot2.stats.treeNodeCount, 1); + assert.strictEqual(snapshot1.summary.tree[0].id, snapshot2.summary.tree[0].id); + }); + + // regression test for https://github.com/microsoft/FluidFramework/issues/9702 + // this was fixed in 0.58.3000 + it("correctly handles simultaneous identical blob upload on separate containers", async () => { + const container1 = await provider.makeTestContainer(testContainerConfig); + const container2 = await provider.loadTestContainer(testContainerConfig); + const dataStore1 = (await container1.getEntryPoint()) as ITestDataObject; + const dataStore2 = (await container2.getEntryPoint()) as ITestDataObject; + const blob = stringToBuffer("some different yet still random text", "utf-8"); + await waitForContainerConnection(container1); + await waitForContainerConnection(container2); + // pause so the ops are in flight at the same time + await provider.opProcessingController.pauseProcessing(); + + // upload the blob twice and make sure nothing bad happens. + const uploadP = Promise.all([ + dataStore1._runtime.uploadBlob(blob), + dataStore2._runtime.uploadBlob(blob), + ]); + provider.opProcessingController.resumeProcessing(); + await uploadP; + }); + + it("reconnection does not block ops when having pending blobs", async () => { + const uploadBlobPromise = new Deferred(); + const container1 = await provider.makeTestContainer({ + ...testContainerConfig, + loaderProps: { + documentServiceFactory: wrapObjectAndOverride(provider.documentServiceFactory, { + createDocumentService: { + connectToStorage: { + createBlob: (dss) => async (blob) => { + // Wait for the uploadBlobPromise to be resolved + await uploadBlobPromise.promise; + return dss.createBlob(blob); + }, + }, }, - }, + }), }, - }), - }, - }); - const dataStore1 = (await container1.getEntryPoint()) as ITestDataObject; + }); + const dataStore1 = (await container1.getEntryPoint()) as ITestDataObject; - const handleP = dataStore1._runtime.uploadBlob(stringToBuffer("test string", "utf8")); + const handleP = dataStore1._runtime.uploadBlob(stringToBuffer("test string", "utf8")); - container1.disconnect(); - container1.connect(); - await waitForContainerConnection(container1); - // sending some ops to confirm pending blob is not blocking other ops - dataStore1._root.set("key", "value"); - dataStore1._root.set("another key", "another value"); + container1.disconnect(); + container1.connect(); + await waitForContainerConnection(container1); + // sending some ops to confirm pending blob is not blocking other ops + dataStore1._root.set("key", "value"); + dataStore1._root.set("another key", "another value"); - const container2 = await provider.loadTestContainer(testContainerConfig); - const dataStore2 = (await container2.getEntryPoint()) as ITestDataObject; - await provider.ensureSynchronized(); + const container2 = await provider.loadTestContainer(testContainerConfig); + const dataStore2 = (await container2.getEntryPoint()) as ITestDataObject; + await provider.ensureSynchronized(); - assert.strictEqual(dataStore2._root.get("key"), "value"); - assert.strictEqual(dataStore2._root.get("another key"), "another value"); + assert.strictEqual(dataStore2._root.get("key"), "value"); + assert.strictEqual(dataStore2._root.get("another key"), "another value"); - uploadBlobPromise.resolve(); - await assert.doesNotReject(handleP); - }); -}); + uploadBlobPromise.resolve(); + await assert.doesNotReject(handleP); + }); + }, + ); +} function serializationTests({ testContainerConfig, diff --git a/packages/test/test-end-to-end-tests/src/test/blobsisAttached.spec.ts b/packages/test/test-end-to-end-tests/src/test/blobsisAttached.spec.ts index 6efae62cf000..bbd1b13b7fc7 100644 --- a/packages/test/test-end-to-end-tests/src/test/blobsisAttached.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/blobsisAttached.spec.ts @@ -16,7 +16,7 @@ import { // eslint-disable-next-line import/no-internal-modules import { type IPendingRuntimeState } from "@fluidframework/container-runtime/internal/test/containerRuntime"; import { IContainerRuntime } from "@fluidframework/container-runtime-definitions/internal"; -import { IFluidHandle } from "@fluidframework/core-interfaces"; +import type { IFluidHandle } from "@fluidframework/core-interfaces"; import type { ISharedMap, ISharedDirectory, @@ -36,297 +36,327 @@ import { driverSupportsBlobs } from "./mockDetachedBlobStorage.js"; const mapId = "map"; const directoryId = "directoryKey"; -describeCompat("blob handle isAttached", "NoCompat", (getTestObjectProvider, apis) => { - const { SharedMap, SharedDirectory } = apis.dds; - const registry: ChannelFactoryRegistry = [ - [mapId, SharedMap.getFactory()], - [directoryId, SharedDirectory.getFactory()], - ]; - - const testContainerConfig: ITestContainerConfig = { - fluidDataObjectType: DataObjectFactoryType.Test, - registry, - }; - - describe("from attached container", () => { - let provider: ITestObjectProvider; - let loader: IHostLoader; - let container: IContainer; - - const runtimeOf = (dataObject: ITestFluidObject): IContainerRuntime & IRuntime => - dataObject.context.containerRuntime as IContainerRuntime & IRuntime; - - beforeEach("createContainer", async () => { - provider = getTestObjectProvider(); - loader = provider.makeTestLoader(testContainerConfig); - container = await createAndAttachContainer( - provider.defaultCodeDetails, - loader, - provider.driver.createCreateNewRequest(provider.documentId), - ); - provider.updateDocumentId(container.resolvedUrl); - }); - - it("blob is aborted before uploading", async function () { - const testString = "this is a test string"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const ac = new AbortController(); - ac.abort("abort test"); - let surprisingSuccess = false; - try { - await dataStore1.runtime.uploadBlob(stringToBuffer(testString, "utf-8"), ac.signal); - surprisingSuccess = true; - } catch (error: any) { - assert.strictEqual(error.status, undefined); - assert.strictEqual(error.uploadTime, undefined); - assert.strictEqual(error.acked, undefined); - } - if (surprisingSuccess) { - assert.fail("Should not succeed"); - } - - const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as - | IPendingRuntimeState - | undefined; - assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); - }); - - it("blob is aborted after upload succeds", async function () { - const testString = "this is a test string"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const map = await dataStore1.getSharedObject(mapId); - const ac = new AbortController(); - let blob: IFluidHandle; - try { - blob = await dataStore1.runtime.uploadBlob( - stringToBuffer(testString, "utf-8"), - ac.signal, - ); - ac.abort(); - map.set("key", blob); - } catch (error: any) { - assert.fail("Should succeed"); - } - const pendingState = (await runtimeOf(dataStore1).getPendingLocalState({ - notifyImminentClosure: true, - })) as IPendingRuntimeState | undefined; - assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); - }); - - it("blob is attached after usage in map", async function () { - const testString = "this is a test string"; - const testKey = "a blob"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const map = await dataStore1.getSharedObject(mapId); - - const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(testString, "utf-8")); - assert.strictEqual(blob.isAttached, false); - map.set(testKey, blob); - assert.strictEqual(blob.isAttached, true); - }); - - it("blob is attached after usage in directory", async function () { - const testString = "this is a test string"; - const testKey = "a blob"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const directory = await dataStore1.getSharedObject(directoryId); - - const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(testString, "utf-8")); - assert.strictEqual(blob.isAttached, false); - directory.set(testKey, blob); - assert.strictEqual(blob.isAttached, true); - }); - - it("removes pending blob when waiting for blob to be attached", async function () { - const testString = "this is a test string"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const map = await dataStore1.getSharedObject(mapId); - const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(testString, "utf-8")); - const pendingStateP: any = runtimeOf(dataStore1).getPendingLocalState({ - notifyImminentClosure: true, +for (const createBlobPayloadPending of [undefined, true] as const) { + describeCompat( + `blob handle isAttached (createBlobPayloadPending: ${createBlobPayloadPending})`, + "NoCompat", + (getTestObjectProvider, apis) => { + const { SharedMap, SharedDirectory } = apis.dds; + const registry: ChannelFactoryRegistry = [ + [mapId, SharedMap.getFactory()], + [directoryId, SharedDirectory.getFactory()], + ]; + + const testContainerConfig: ITestContainerConfig = { + fluidDataObjectType: DataObjectFactoryType.Test, + registry, + runtimeOptions: { + createBlobPayloadPending, + }, + }; + + describe("from attached container", () => { + let provider: ITestObjectProvider; + let loader: IHostLoader; + let container: IContainer; + + const runtimeOf = (dataObject: ITestFluidObject): IContainerRuntime & IRuntime => + dataObject.context.containerRuntime as IContainerRuntime & IRuntime; + + beforeEach("createContainer", async () => { + provider = getTestObjectProvider(); + loader = provider.makeTestLoader(testContainerConfig); + container = await createAndAttachContainer( + provider.defaultCodeDetails, + loader, + provider.driver.createCreateNewRequest(provider.documentId), + ); + provider.updateDocumentId(container.resolvedUrl); + }); + + it("blob is aborted before uploading", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + const testString = "this is a test string"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const ac = new AbortController(); + ac.abort("abort test"); + let surprisingSuccess = false; + try { + await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ac.signal, + ); + surprisingSuccess = true; + } catch (error: any) { + assert.strictEqual(error.status, undefined); + assert.strictEqual(error.uploadTime, undefined); + assert.strictEqual(error.acked, undefined); + } + if (surprisingSuccess) { + assert.fail("Should not succeed"); + } + + const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as + | IPendingRuntimeState + | undefined; + assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); + }); + + it("blob is aborted after upload succeds", async function () { + if (createBlobPayloadPending) { + // Blob creation with pending payload doesn't support abort + this.skip(); + } + const testString = "this is a test string"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const map = await dataStore1.getSharedObject(mapId); + const ac = new AbortController(); + let blob: IFluidHandle; + try { + blob = await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ac.signal, + ); + ac.abort(); + map.set("key", blob); + } catch (error: any) { + assert.fail("Should succeed"); + } + const pendingState = (await runtimeOf(dataStore1).getPendingLocalState({ + notifyImminentClosure: true, + })) as IPendingRuntimeState | undefined; + assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); + }); + + it("blob is attached after usage in map", async function () { + const testString = "this is a test string"; + const testKey = "a blob"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const map = await dataStore1.getSharedObject(mapId); + + const blob = await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ); + assert.strictEqual(blob.isAttached, false); + map.set(testKey, blob); + assert.strictEqual(blob.isAttached, true); + }); + + it("blob is attached after usage in directory", async function () { + const testString = "this is a test string"; + const testKey = "a blob"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const directory = await dataStore1.getSharedObject(directoryId); + + const blob = await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ); + assert.strictEqual(blob.isAttached, false); + directory.set(testKey, blob); + assert.strictEqual(blob.isAttached, true); + }); + + it("removes pending blob when waiting for blob to be attached", async function () { + const testString = "this is a test string"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const map = await dataStore1.getSharedObject(mapId); + const blob = await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ); + const pendingStateP: any = runtimeOf(dataStore1).getPendingLocalState({ + notifyImminentClosure: true, + }); + map.set("key", blob); + const pendingState = await pendingStateP; + assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); + }); + + it("removes pending blob after attached and acked", async function () { + const testString = "this is a test string"; + const testKey = "a blob"; + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + + const map = await dataStore1.getSharedObject(mapId); + const blob = await dataStore1.runtime.uploadBlob( + stringToBuffer(testString, "utf-8"), + ); + map.set(testKey, blob); + const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as + | IPendingRuntimeState + | undefined; + assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); + }); + + it("removes multiple pending blobs after attached and acked", async function () { + const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; + const map = await dataStore1.getSharedObject(mapId); + const lots = 10; + for (let i = 0; i < lots; i++) { + const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(`${i}`, "utf-8")); + map.set(`${i}`, blob); + } + const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as + | IPendingRuntimeState + | undefined; + assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); + }); }); - map.set("key", blob); - const pendingState = await pendingStateP; - assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); - }); - - it("removes pending blob after attached and acked", async function () { - const testString = "this is a test string"; - const testKey = "a blob"; - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - - const map = await dataStore1.getSharedObject(mapId); - const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(testString, "utf-8")); - map.set(testKey, blob); - const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as - | IPendingRuntimeState - | undefined; - assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); - }); - - it("removes multiple pending blobs after attached and acked", async function () { - const dataStore1 = (await container.getEntryPoint()) as ITestFluidObject; - const map = await dataStore1.getSharedObject(mapId); - const lots = 10; - for (let i = 0; i < lots; i++) { - const blob = await dataStore1.runtime.uploadBlob(stringToBuffer(`${i}`, "utf-8")); - map.set(`${i}`, blob); - } - const pendingState = (await runtimeOf(dataStore1).getPendingLocalState()) as - | IPendingRuntimeState - | undefined; - assert.strictEqual(pendingState?.pendingAttachmentBlobs, undefined); - }); - }); - - describe("from detached container", () => { - let provider: ITestObjectProvider; - let loader: IHostLoader; - let container: IContainer; - let detachedDataStore: ITestFluidObject; - let map: ISharedMap; - let directory: ISharedDirectory; - let text: string; - let blobHandle: IFluidHandle; - - beforeEach("createContainer", async function () { - provider = getTestObjectProvider(); - if (!driverSupportsBlobs(provider.driver)) { - this.skip(); - } - loader = provider.makeTestLoader({ - ...testContainerConfig, + + describe("from detached container", () => { + let provider: ITestObjectProvider; + let loader: IHostLoader; + let container: IContainer; + let detachedDataStore: ITestFluidObject; + let map: ISharedMap; + let directory: ISharedDirectory; + let text: string; + let blobHandle: IFluidHandle; + + beforeEach("createContainer", async function () { + provider = getTestObjectProvider(); + if (!driverSupportsBlobs(provider.driver)) { + this.skip(); + } + loader = provider.makeTestLoader({ + ...testContainerConfig, + }); + container = await loader.createDetachedContainer(provider.defaultCodeDetails); + provider.updateDocumentId(container.resolvedUrl); + detachedDataStore = (await container.getEntryPoint()) as ITestFluidObject; + map = SharedMap.create(detachedDataStore.runtime); + directory = SharedDirectory.create(detachedDataStore.runtime); + text = "this is some example text"; + blobHandle = await detachedDataStore.runtime.uploadBlob( + stringToBuffer(text, "utf-8"), + ); + }); + + const checkForDetachedHandles = (dds: ISharedMap | ISharedDirectory) => { + assert.strictEqual( + container.attachState, + AttachState.Detached, + "container should be detached", + ); + assert.strictEqual( + detachedDataStore.handle.isAttached, + false, + "data store handle should be detached", + ); + assert.strictEqual(dds.handle.isAttached, false, "dds handle should be detached"); + assert.strictEqual(blobHandle.isAttached, false, "blob handle should be detached"); + }; + + const checkForAttachedHandles = (dds: ISharedMap | ISharedDirectory) => { + assert.strictEqual( + container.attachState, + AttachState.Attached, + "container should be attached", + ); + assert.strictEqual( + detachedDataStore.handle.isAttached, + true, + "data store handle should be attached", + ); + assert.strictEqual(dds.handle.isAttached, true, "dds handle should be attached"); + assert.strictEqual(blobHandle.isAttached, true, "blob handle should be attached"); + }; + + it("all detached", async function () { + checkForDetachedHandles(map); + checkForDetachedHandles(directory); + }); + + it("after map is set in root directory", async function () { + detachedDataStore.root.set(mapId, map.handle); + checkForDetachedHandles(map); + }); + + it("after directory is set in root directory", async function () { + detachedDataStore.root.set(directoryId, directory.handle); + checkForDetachedHandles(directory); + }); + + it("after blob handle is set in map", async function () { + detachedDataStore.root.set("map", map.handle); + map.set("my blob", blobHandle); + checkForDetachedHandles(map); + }); + + it("after blob handle is set in directory", async function () { + detachedDataStore.root.set(directoryId, directory.handle); + directory.set("my blob", blobHandle); + checkForDetachedHandles(directory); + }); + + it("after container is attached with map", async function () { + detachedDataStore.root.set("map", map.handle); + map.set("my blob", blobHandle); + await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); + checkForAttachedHandles(map); + }); + + it("after container is attached with directory", async function () { + detachedDataStore.root.set(directoryId, directory.handle); + directory.set("my blob", blobHandle); + await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); + checkForAttachedHandles(directory); + }); + + it("after container is attached and dds is detached in map", async function () { + map.set("my blob", blobHandle); + await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); + assert.strictEqual( + map.handle.isAttached, + false, + "map should be detached after container attaches", + ); + assert.strictEqual( + blobHandle.isAttached, + false, + "blob should be detached in a detached dds and attached container", + ); + detachedDataStore.root.set(mapId, map.handle); + assert.strictEqual( + map.handle.isAttached, + true, + "map should be attached after dds attaches", + ); + assert.strictEqual( + blobHandle.isAttached, + true, + "blob should be attached in an attached dds", + ); + }); + + it("after container is attached and dds is detached in directory", async function () { + directory.set("my blob", blobHandle); + await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); + assert.strictEqual( + directory.handle.isAttached, + false, + "directory should be detached after container attaches", + ); + assert.strictEqual( + blobHandle.isAttached, + false, + "blob should be detached in a detached dds and attached container", + ); + detachedDataStore.root.set(directoryId, directory.handle); + assert.strictEqual( + directory.handle.isAttached, + true, + "directory should be attached after dds attaches", + ); + assert.strictEqual( + blobHandle.isAttached, + true, + "blob should be attached in an attached dds", + ); + }); }); - container = await loader.createDetachedContainer(provider.defaultCodeDetails); - provider.updateDocumentId(container.resolvedUrl); - detachedDataStore = (await container.getEntryPoint()) as ITestFluidObject; - map = SharedMap.create(detachedDataStore.runtime); - directory = SharedDirectory.create(detachedDataStore.runtime); - text = "this is some example text"; - blobHandle = await detachedDataStore.runtime.uploadBlob(stringToBuffer(text, "utf-8")); - }); - - const checkForDetachedHandles = (dds: ISharedMap | ISharedDirectory) => { - assert.strictEqual( - container.attachState, - AttachState.Detached, - "container should be detached", - ); - assert.strictEqual( - detachedDataStore.handle.isAttached, - false, - "data store handle should be detached", - ); - assert.strictEqual(dds.handle.isAttached, false, "dds handle should be detached"); - assert.strictEqual(blobHandle.isAttached, false, "blob handle should be detached"); - }; - - const checkForAttachedHandles = (dds: ISharedMap | ISharedDirectory) => { - assert.strictEqual( - container.attachState, - AttachState.Attached, - "container should be attached", - ); - assert.strictEqual( - detachedDataStore.handle.isAttached, - true, - "data store handle should be attached", - ); - assert.strictEqual(dds.handle.isAttached, true, "dds handle should be attached"); - assert.strictEqual(blobHandle.isAttached, true, "blob handle should be attached"); - }; - - it("all detached", async function () { - checkForDetachedHandles(map); - checkForDetachedHandles(directory); - }); - - it("after map is set in root directory", async function () { - detachedDataStore.root.set(mapId, map.handle); - checkForDetachedHandles(map); - }); - - it("after directory is set in root directory", async function () { - detachedDataStore.root.set(directoryId, directory.handle); - checkForDetachedHandles(directory); - }); - - it("after blob handle is set in map", async function () { - detachedDataStore.root.set("map", map.handle); - map.set("my blob", blobHandle); - checkForDetachedHandles(map); - }); - - it("after blob handle is set in directory", async function () { - detachedDataStore.root.set(directoryId, directory.handle); - directory.set("my blob", blobHandle); - checkForDetachedHandles(directory); - }); - - it("after container is attached with map", async function () { - detachedDataStore.root.set("map", map.handle); - map.set("my blob", blobHandle); - await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); - checkForAttachedHandles(map); - }); - - it("after container is attached with directory", async function () { - detachedDataStore.root.set(directoryId, directory.handle); - directory.set("my blob", blobHandle); - await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); - checkForAttachedHandles(directory); - }); - - it("after container is attached and dds is detached in map", async function () { - map.set("my blob", blobHandle); - await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); - assert.strictEqual( - map.handle.isAttached, - false, - "map should be detached after container attaches", - ); - assert.strictEqual( - blobHandle.isAttached, - false, - "blob should be detached in a detached dds and attached container", - ); - detachedDataStore.root.set(mapId, map.handle); - assert.strictEqual( - map.handle.isAttached, - true, - "map should be attached after dds attaches", - ); - assert.strictEqual( - blobHandle.isAttached, - true, - "blob should be attached in an attached dds", - ); - }); - - it("after container is attached and dds is detached in directory", async function () { - directory.set("my blob", blobHandle); - await container.attach(provider.driver.createCreateNewRequest(provider.documentId)); - assert.strictEqual( - directory.handle.isAttached, - false, - "directory should be detached after container attaches", - ); - assert.strictEqual( - blobHandle.isAttached, - false, - "blob should be detached in a detached dds and attached container", - ); - detachedDataStore.root.set(directoryId, directory.handle); - assert.strictEqual( - directory.handle.isAttached, - true, - "directory should be attached after dds attaches", - ); - assert.strictEqual( - blobHandle.isAttached, - true, - "blob should be attached in an attached dds", - ); - }); - }); -}); + }, + ); +}