diff --git a/packages/common/core-interfaces/src/handles.ts b/packages/common/core-interfaces/src/handles.ts index b3bf3361a9af..998bd53e6190 100644 --- a/packages/common/core-interfaces/src/handles.ts +++ b/packages/common/core-interfaces/src/handles.ts @@ -101,6 +101,22 @@ export interface IFluidHandleInternal< bind(handle: IFluidHandleInternal): void; } +/** + * @privateRemarks + * To be merged onto IFluidHandleInternal in accordance with breaking change policy + * @internal + */ +export interface IFluidHandleInternalPayloadPending< + // REVIEW: Constrain `T` to something? How do we support dds and datastores safely? + out T = unknown, // FluidObject & IFluidLoadable, +> extends IFluidHandleInternal { + /** + * Whether the handle has a pending payload, meaning that it may exist before its payload is retrievable. + * For instance, the BlobManager can generate handles before completing the blob upload/attach. + */ + readonly payloadPending: boolean; +} + /** * Symbol which must only be used on an {@link (IFluidHandle:interface)}, and is used to identify such objects. * diff --git a/packages/common/core-interfaces/src/index.ts b/packages/common/core-interfaces/src/index.ts index 06796dab9223..0d6e8f70f55c 100644 --- a/packages/common/core-interfaces/src/index.ts +++ b/packages/common/core-interfaces/src/index.ts @@ -31,6 +31,7 @@ export type { IProvideFluidHandleContext, IProvideFluidHandle, IFluidHandleInternal, + IFluidHandleInternalPayloadPending, IFluidHandleErased, } from "./handles.js"; export { IFluidHandleContext, IFluidHandle, fluidHandleSymbol } from "./handles.js"; diff --git a/packages/dds/shared-object-base/src/serializer.ts b/packages/dds/shared-object-base/src/serializer.ts index ac527f5bcc5b..077d7cb6602f 100644 --- a/packages/dds/shared-object-base/src/serializer.ts +++ b/packages/dds/shared-object-base/src/serializer.ts @@ -153,7 +153,11 @@ export class FluidSerializer implements IFluidSerializer { ? value.url : generateHandleContextPath(value.url, this.context); - return new RemoteFluidObjectHandle(absolutePath, this.root); + return new RemoteFluidObjectHandle( + absolutePath, + this.root, + value.payloadPending === true, + ); } else { return value; } diff --git a/packages/dds/shared-object-base/src/test/serializer.spec.ts b/packages/dds/shared-object-base/src/test/serializer.spec.ts index 9b9021f93c5b..56df90a4516b 100644 --- a/packages/dds/shared-object-base/src/test/serializer.spec.ts +++ b/packages/dds/shared-object-base/src/test/serializer.spec.ts @@ -41,7 +41,11 @@ describe("FluidSerializer", () => { describe("vanilla JSON", () => { const context = new MockHandleContext(); const serializer = new FluidSerializer(context); - const handle = new RemoteFluidObjectHandle("/root", context); + const handle = new RemoteFluidObjectHandle( + "/root", + context, + false, // payloadPending + ); // Start with the various JSON-serializable types. A mix of "truthy" and "falsy" values // are of particular interest. @@ -167,7 +171,11 @@ describe("FluidSerializer", () => { describe("JSON w/embedded handles", () => { const context = new MockHandleContext(); const serializer = new FluidSerializer(context); - const handle = new RemoteFluidObjectHandle("/root", context); + const handle = new RemoteFluidObjectHandle( + "/root", + context, + false, // payloadPending + ); const serializedHandle = { type: "__fluid_handle__", url: "/root", @@ -329,8 +337,16 @@ describe("FluidSerializer", () => { describe("Utils", () => { const serializer = new FluidSerializer(new MockHandleContext()); it("makeSerializable is idempotent", () => { - const bind = new RemoteFluidObjectHandle("/", new MockHandleContext()); - const handle = new RemoteFluidObjectHandle("/okay", new MockHandleContext()); + const bind = new RemoteFluidObjectHandle( + "/", + new MockHandleContext(), + false, // payloadPending + ); + const handle = new RemoteFluidObjectHandle( + "/okay", + new MockHandleContext(), + false, // payloadPending + ); const input = { x: handle, y: 123 }; const serializedOnce = makeHandlesSerializable(input, serializer, bind) as { x: { type: "__fluid_handle__" }; diff --git a/packages/runtime/container-runtime/api-report/container-runtime.legacy.alpha.api.md b/packages/runtime/container-runtime/api-report/container-runtime.legacy.alpha.api.md index 35421ff4b8f9..2b8b7125f480 100644 --- a/packages/runtime/container-runtime/api-report/container-runtime.legacy.alpha.api.md +++ b/packages/runtime/container-runtime/api-report/container-runtime.legacy.alpha.api.md @@ -36,6 +36,7 @@ export enum ContainerMessageType { export interface ContainerRuntimeOptions { readonly chunkSizeInBytes: number; readonly compressionOptions: ICompressionRuntimeOptions; + readonly createBlobPayloadPending: boolean; // @deprecated readonly enableGroupedBatching: boolean; readonly enableRuntimeIdCompressor: IdCompressorMode; diff --git a/packages/runtime/container-runtime/src/blobManager/blobManager.ts b/packages/runtime/container-runtime/src/blobManager/blobManager.ts index da92ac7d9a44..a741ebc616c5 100644 --- a/packages/runtime/container-runtime/src/blobManager/blobManager.ts +++ b/packages/runtime/container-runtime/src/blobManager/blobManager.ts @@ -18,6 +18,7 @@ import type { IEventProvider, IFluidHandleContext, IFluidHandleInternal, + IFluidHandleInternalPayloadPending, } from "@fluidframework/core-interfaces/internal"; import { assert, Deferred } from "@fluidframework/core-utils/internal"; import { @@ -62,7 +63,10 @@ import { * DataObject.request() recognizes requests in the form of `/blobs/` * and loads blob. */ -export class BlobHandle extends FluidHandleBase { +export class BlobHandle + extends FluidHandleBase + implements IFluidHandleInternalPayloadPending +{ private attached: boolean = false; public get isAttached(): boolean { @@ -75,6 +79,7 @@ export class BlobHandle extends FluidHandleBase { public readonly path: string, public readonly routeContext: IFluidHandleContext, public get: () => Promise, + public readonly payloadPending: boolean, private readonly onAttachGraph?: () => void, ) { super(); @@ -133,7 +138,8 @@ export interface IBlobManagerEvents extends IEvent { } interface IBlobManagerInternalEvents extends IEvent { - (event: "blobAttached", listener: (pending: PendingBlob) => void); + (event: "handleAttached", listener: (pending: PendingBlob) => void); + (event: "processedBlobAttach", listener: (localId: string, storageId: string) => void); } const stashedPendingBlobOverrides: Pick< @@ -195,7 +201,7 @@ export class BlobManager { new Map(); public readonly stashedBlobsUploadP: Promise<(void | ICreateBlobResponse)[]>; - constructor(props: { + public constructor(props: { readonly routeContext: IFluidHandleContext; blobManagerLoadInfo: IBlobManagerLoadInfo; @@ -220,6 +226,7 @@ export class BlobManager { readonly runtime: IBlobManagerRuntime; stashedBlobs: IPendingBlobs | undefined; readonly localBlobIdGenerator?: (() => string) | undefined; + readonly createBlobPayloadPending: boolean; }) { const { routeContext, @@ -351,7 +358,11 @@ export class BlobManager { return [...this.pendingBlobs.values()].some((e) => e.stashedUpload === true); } - public async getBlob(blobId: string): Promise { + public hasBlob(blobId: string): boolean { + return this.redirectTable.get(blobId) !== undefined; + } + + public async getBlob(blobId: string, payloadPending: boolean): Promise { // Verify that the blob is not deleted, i.e., it has not been garbage collected. If it is, this will throw // an error, failing the call. this.verifyBlobNotDeleted(blobId); @@ -374,8 +385,24 @@ export class BlobManager { storageId = blobId; } else { const attachedStorageId = this.redirectTable.get(blobId); - assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */); - storageId = attachedStorageId; + if (!payloadPending) { + assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */); + } + // If we didn't find it in the redirectTable, assume the attach op is coming eventually and wait. + // We do this even if the local client doesn't have the blob payloadPending flag enabled, in case a + // remote client does have it enabled. This wait may be infinite if the uploading client failed + // the upload and doesn't exist anymore. + storageId = + attachedStorageId ?? + (await new Promise((resolve) => { + const onProcessBlobAttach = (localId: string, _storageId: string): void => { + if (localId === blobId) { + this.internalEvents.off("processedBlobAttach", onProcessBlobAttach); + resolve(_storageId); + } + }; + this.internalEvents.on("processedBlobAttach", onProcessBlobAttach); + })); } return PerformanceEvent.timedExecAsync( @@ -406,14 +433,15 @@ export class BlobManager { ? () => { pending.attached = true; // Notify listeners (e.g. serialization process) that blob has been attached - this.internalEvents.emit("blobAttached", pending); + this.internalEvents.emit("handleAttached", pending); this.deletePendingBlobMaybe(localId); } : undefined; return new BlobHandle( getGCNodePathFromBlobId(localId), this.routeContext, - async () => this.getBlob(localId), + async () => this.getBlob(localId, false), + false, // payloadPending callback, ); } @@ -681,6 +709,7 @@ export class BlobManager { this.deletePendingBlobMaybe(localId); } } + this.internalEvents.emit("processedBlobAttach", localId, blobId); } public summarize(telemetryContext?: ITelemetryContext): ISummaryTreeWithStats { @@ -874,14 +903,14 @@ export class BlobManager { ); const onBlobAttached = (attachedEntry: PendingBlob): void => { if (attachedEntry === entry) { - this.internalEvents.off("blobAttached", onBlobAttached); + this.internalEvents.off("handleAttached", onBlobAttached); resolve(); } }; if (entry.attached) { resolve(); } else { - this.internalEvents.on("blobAttached", onBlobAttached); + this.internalEvents.on("handleAttached", onBlobAttached); } }), ); diff --git a/packages/runtime/container-runtime/src/compatUtils.ts b/packages/runtime/container-runtime/src/compatUtils.ts index bdaff123b8a1..d8bd0abbc016 100644 --- a/packages/runtime/container-runtime/src/compatUtils.ts +++ b/packages/runtime/container-runtime/src/compatUtils.ts @@ -147,6 +147,12 @@ const runtimeOptionsAffectingDocSchemaConfigMap = { // Although sweep is supported in 2.x, it is disabled by default until compatibilityVersion>=3.0.0 to be extra safe. "3.0.0": { enableGCSweep: true }, } as const, + createBlobPayloadPending: { + // This feature is new and disabled by default. In the future we will enable it by default, but we have not + // closed on the version where that will happen yet. Probably a .10 release since blob functionality is not + // exposed on the public API surface. + "1.0.0": false, + } as const, } as const satisfies ConfigMap; /** diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index e76532d85243..05a137b03195 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -101,6 +101,7 @@ import { import { GCDataBuilder, RequestParser, + RuntimeHeaders, TelemetryContext, addBlobToSummary, addSummarizeResultToSummary, @@ -406,6 +407,12 @@ export interface ContainerRuntimeOptions { * are engaged as they become available, without giving legacy clients any chance to fail predictably. */ readonly explicitSchemaControl: boolean; + + /** + * Create blob handles with pending payloads when calling createBlob (default is false). + * When enabled, createBlob will return a handle before the blob upload completes. + */ + readonly createBlobPayloadPending: boolean; } /** @@ -829,6 +836,7 @@ export class ContainerRuntime compressionOptions = enableGroupedBatching === false ? disabledCompressionConfig : defaultConfigs.compressionOptions, + createBlobPayloadPending = defaultConfigs.createBlobPayloadPending, }: IContainerRuntimeOptionsInternal = runtimeOptions; // The logic for enableRuntimeIdCompressor is a bit different. Since `undefined` represents a logical state (off) @@ -1012,6 +1020,7 @@ export class ContainerRuntime compressionLz4, idCompressorMode, opGroupingEnabled: enableGroupedBatching, + createBlobPayloadPending, disallowedVersions: [], }, (schema) => { @@ -1037,6 +1046,7 @@ export class ContainerRuntime enableRuntimeIdCompressor, enableGroupedBatching, explicitSchemaControl, + createBlobPayloadPending, }; const runtime = new containerRuntimeCtor( @@ -1771,6 +1781,7 @@ export class ContainerRuntime isBlobDeleted: (blobPath: string) => this.garbageCollector.isNodeDeleted(blobPath), runtime: this, stashedBlobs: pendingRuntimeState?.pendingAttachmentBlobs, + createBlobPayloadPending: this.sessionSchema.createBlobPayloadPending === true, }); this.deltaScheduler = new DeltaScheduler( @@ -2319,7 +2330,16 @@ export class ContainerRuntime } if (id === blobManagerBasePath && requestParser.isLeaf(2)) { - const blob = await this.blobManager.getBlob(requestParser.pathParts[1]); + const localId = requestParser.pathParts[1]; + const payloadPending = requestParser.headers?.[RuntimeHeaders.payloadPending] === true; + if ( + !this.blobManager.hasBlob(localId) && + requestParser.headers?.[RuntimeHeaders.wait] === false + ) { + return create404Response(request); + } + + const blob = await this.blobManager.getBlob(localId, payloadPending); return { status: 200, mimeType: "fluid/object", diff --git a/packages/runtime/container-runtime/src/summary/documentSchema.ts b/packages/runtime/container-runtime/src/summary/documentSchema.ts index 472f4b7d61c7..82e1d07cf091 100644 --- a/packages/runtime/container-runtime/src/summary/documentSchema.ts +++ b/packages/runtime/container-runtime/src/summary/documentSchema.ts @@ -96,6 +96,7 @@ export interface IDocumentSchemaFeatures { compressionLz4: boolean; idCompressorMode: IdCompressorMode; opGroupingEnabled: boolean; + createBlobPayloadPending: boolean; /** * List of disallowed versions of the runtime. @@ -227,6 +228,7 @@ const documentSchemaSupportedConfigs = { idCompressorMode: new IdCompressorProperty(["delayed", "on"]), opGroupingEnabled: new TrueOrUndefined(), compressionLz4: new TrueOrUndefined(), + createBlobPayloadPending: new TrueOrUndefined(), disallowedVersions: new CheckVersions(), }; @@ -482,6 +484,7 @@ export class DocumentsSchemaController { compressionLz4: boolToProp(features.compressionLz4), idCompressorMode: features.idCompressorMode, opGroupingEnabled: boolToProp(features.opGroupingEnabled), + createBlobPayloadPending: boolToProp(features.createBlobPayloadPending), disallowedVersions: arrayToProp(features.disallowedVersions), }, }; diff --git a/packages/runtime/container-runtime/src/test/blobManager.spec.ts b/packages/runtime/container-runtime/src/test/blobManager.spec.ts index fbc2fd73b9a6..22a74d7a2422 100644 --- a/packages/runtime/container-runtime/src/test/blobManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/blobManager.spec.ts @@ -27,6 +27,7 @@ import { Deferred } from "@fluidframework/core-utils/internal"; import { IClientDetails, SummaryType } from "@fluidframework/driver-definitions"; import { IDocumentStorageService } from "@fluidframework/driver-definitions/internal"; import type { ISequencedMessageEnvelope } from "@fluidframework/runtime-definitions/internal"; +import { isFluidHandleInternalPayloadPending } from "@fluidframework/runtime-utils/internal"; import { LoggingError, MockLogger, @@ -104,6 +105,7 @@ export class MockRuntime isBlobDeleted: (blobPath: string) => this.isBlobDeleted(blobPath), runtime: this, stashedBlobs: stashed[1] as IPendingBlobs | undefined, + createBlobPayloadPending: false, }); } @@ -160,7 +162,10 @@ export class MockRuntime ): Promise { const pathParts = blobHandle.absolutePath.split("/"); const blobId = pathParts[2]; - return this.blobManager.getBlob(blobId); + const payloadPending = isFluidHandleInternalPayloadPending(blobHandle) + ? blobHandle.payloadPending + : false; + return this.blobManager.getBlob(blobId, payloadPending); } public async getPendingLocalState(): Promise<(unknown[] | IPendingBlobs | undefined)[]> { @@ -743,7 +748,7 @@ describe("BlobManager", () => { }); await assert.rejects( - async () => runtime.blobManager.getBlob(someId), + async () => runtime.blobManager.getBlob(someId, false), (e: Error) => e.message === "BOOM!", "Expected getBlob to throw with test error message", ); @@ -760,6 +765,29 @@ describe("BlobManager", () => { ); }); + it("waits for blobs from handles with pending payloads without error", async () => { + await runtime.attach(); + + // Part of remoteUpload, but stop short of processing the message + const response = await runtime.storage.createBlob(IsoBuffer.from("blob", "utf8")); + const op = { metadata: { localId: uuid(), blobId: response.id } }; + + await assert.rejects( + runtime.blobManager.getBlob(op.metadata.localId, false), + "Rejects when attempting to get non-existent, shared-payload blobs", + ); + + // Try to get the blob that we haven't processed the attach op for yet. + // This simulates having found this ID in a handle with a pending payload that the remote client would have sent + const blobP = runtime.blobManager.getBlob(op.metadata.localId, true); + + // Process the op as if it were arriving from the remote client, which should cause the blobP promise to resolve + runtime.blobManager.processBlobAttachMessage(op as ISequencedMessageEnvelope, false); + + // Await the promise to confirm it settles and does not reject + await blobP; + }); + describe("Abort Signal", () => { it("abort before upload", async () => { await runtime.attach(); diff --git a/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts b/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts index 57bd52e367c1..2fa3b1b9d8be 100644 --- a/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts +++ b/packages/runtime/container-runtime/src/test/blobManager.stashed.spec.ts @@ -52,6 +52,7 @@ function createBlobManager(overrides?: Partial { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, // Redundant, but makes the JSON.stringify yield the same result as the logs explicitSchemaControl: false, + createBlobPayloadPending: false, // Redundant, but makes the JSON.stringify yield the same result as the logs }; const mergedRuntimeOptions = { ...defaultRuntimeOptions, ...runtimeOptions }; @@ -3652,6 +3653,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3689,6 +3691,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: false, explicitSchemaControl: false, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3726,6 +3729,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3763,6 +3767,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3800,6 +3805,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3827,6 +3833,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: "on", enableGroupedBatching: false, // By turning off batching, we will also disable compression automatically explicitSchemaControl: true, + createBlobPayloadPending: false, }, provideEntryPoint: mockProvideEntryPoint, }); @@ -3845,6 +3852,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: "on", enableGroupedBatching: false, explicitSchemaControl: true, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3875,6 +3883,7 @@ describe("Runtime", () => { enableGroupedBatching: undefined, compressionOptions: undefined, explicitSchemaControl: undefined, + createBlobPayloadPending: undefined, }, }, ]) @@ -3902,6 +3911,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, // idCompressor is undefined, since that represents a logical state (off) enableGroupedBatching: true, explicitSchemaControl: false, + createBlobPayloadPending: false, }; logger.assertMatchAny([ @@ -3938,6 +3948,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + createBlobPayloadPending: false, }; logger.assertMatchAny([ diff --git a/packages/runtime/container-runtime/src/test/documentSchema.spec.ts b/packages/runtime/container-runtime/src/test/documentSchema.spec.ts index 8f8c43677bf1..5454ff59df41 100644 --- a/packages/runtime/container-runtime/src/test/documentSchema.spec.ts +++ b/packages/runtime/container-runtime/src/test/documentSchema.spec.ts @@ -29,6 +29,7 @@ describe("Runtime", () => { compressionLz4: true, idCompressorMode: "delayed", // opGroupingEnabled: undefined, + // createBlobPayloadPending: true, }, }; @@ -37,6 +38,7 @@ describe("Runtime", () => { compressionLz4: true, opGroupingEnabled: false, idCompressorMode: "delayed", + createBlobPayloadPending: false, disallowedVersions: [], }; @@ -301,6 +303,7 @@ describe("Runtime", () => { compressionLz4: boolToProp(featuresModified.compressionLz4), idCompressorMode: featuresModified.idCompressorMode, opGroupingEnabled: boolToProp(featuresModified.opGroupingEnabled), + createBlobPayloadPending: boolToProp(featuresModified.createBlobPayloadPending), disallowedVersions: arrayToProp(featuresModified.disallowedVersions), }, }; diff --git a/packages/runtime/runtime-utils/src/handles.ts b/packages/runtime/runtime-utils/src/handles.ts index 82b1abaa4f14..3c5eeb7d1e06 100644 --- a/packages/runtime/runtime-utils/src/handles.ts +++ b/packages/runtime/runtime-utils/src/handles.ts @@ -5,7 +5,10 @@ import type { IFluidHandleErased } from "@fluidframework/core-interfaces"; import { IFluidHandle, fluidHandleSymbol } from "@fluidframework/core-interfaces"; -import type { IFluidHandleInternal } from "@fluidframework/core-interfaces/internal"; +import type { + IFluidHandleInternal, + IFluidHandleInternalPayloadPending, +} from "@fluidframework/core-interfaces/internal"; /** * JSON serialized form of an IFluidHandle @@ -17,6 +20,17 @@ export interface ISerializedHandle { // URL to the object. Relative URLs are relative to the handle context passed to the stringify. url: string; + + /** + * The handle may have a pending payload, as determined by and resolvable by the subsystem that + * the handle relates to. For instance, the BlobManager uses this to distinguish blob handles + * which may not yet have an attached blob yet. + * + * @remarks + * Will only exist if the handle was created with a pending payload, will be omitted entirely from + * the serialized format if the handle was created with an already-shared payload. + */ + readonly payloadPending?: true; } /** @@ -26,6 +40,14 @@ export interface ISerializedHandle { export const isSerializedHandle = (value: any): value is ISerializedHandle => value?.type === "__fluid_handle__"; +/** + * @internal + */ +export const isFluidHandleInternalPayloadPending = ( + fluidHandleInternal: IFluidHandleInternal, +): fluidHandleInternal is IFluidHandleInternalPayloadPending => + "payloadPending" in fluidHandleInternal && fluidHandleInternal.payloadPending === true; + /** * Encodes the given IFluidHandle into a JSON-serializable form, * @param handle - The IFluidHandle to serialize. @@ -34,10 +56,16 @@ export const isSerializedHandle = (value: any): value is ISerializedHandle => * @internal */ export function encodeHandleForSerialization(handle: IFluidHandleInternal): ISerializedHandle { - return { - type: "__fluid_handle__", - url: handle.absolutePath, - }; + return isFluidHandleInternalPayloadPending(handle) + ? { + type: "__fluid_handle__", + url: handle.absolutePath, + payloadPending: true, + } + : { + type: "__fluid_handle__", + url: handle.absolutePath, + }; } /** diff --git a/packages/runtime/runtime-utils/src/index.ts b/packages/runtime/runtime-utils/src/index.ts index d93bc1be05b1..62424a16139b 100644 --- a/packages/runtime/runtime-utils/src/index.ts +++ b/packages/runtime/runtime-utils/src/index.ts @@ -15,6 +15,7 @@ export { ISerializedHandle, isSerializedHandle, isFluidHandle, + isFluidHandleInternalPayloadPending, toFluidHandleErased, toFluidHandleInternal, FluidHandleBase, diff --git a/packages/runtime/runtime-utils/src/remoteFluidObjectHandle.ts b/packages/runtime/runtime-utils/src/remoteFluidObjectHandle.ts index 13e72aedaa8c..a662a1b1ee16 100644 --- a/packages/runtime/runtime-utils/src/remoteFluidObjectHandle.ts +++ b/packages/runtime/runtime-utils/src/remoteFluidObjectHandle.ts @@ -31,10 +31,12 @@ export class RemoteFluidObjectHandle extends FluidHandleBase { * Creates a new RemoteFluidObjectHandle when parsing an IFluidHandle. * @param absolutePath - The absolute path to the handle from the container runtime. * @param routeContext - The root IFluidHandleContext that has a route to this handle. + * @param payloadPending - Whether the handle may have a pending payload that is not yet available. */ constructor( public readonly absolutePath: string, public readonly routeContext: IFluidHandleContext, + public readonly payloadPending: boolean, ) { super(); assert( @@ -48,7 +50,10 @@ export class RemoteFluidObjectHandle extends FluidHandleBase { // Add `viaHandle` header to distinguish from requests from non-handle paths. const request: IRequest = { url: this.absolutePath, - headers: { [RuntimeHeaders.viaHandle]: true }, + headers: { + [RuntimeHeaders.viaHandle]: true, + [RuntimeHeaders.payloadPending]: this.payloadPending, + }, }; this.objectP = this.routeContext.resolveHandle(request).then((response) => { if (response.mimeType === "fluid/object") { diff --git a/packages/runtime/runtime-utils/src/utils.ts b/packages/runtime/runtime-utils/src/utils.ts index 15b2304b4ce7..6ffed8e077bf 100644 --- a/packages/runtime/runtime-utils/src/utils.ts +++ b/packages/runtime/runtime-utils/src/utils.ts @@ -22,6 +22,10 @@ export enum RuntimeHeaders { * True if the request is coming from an IFluidHandle. */ viaHandle = "viaHandle", + /** + * True if the request is coming from a handle with a pending payload. + */ + payloadPending = "payloadPending", } /** diff --git a/packages/test/test-service-load/src/optionsMatrix.ts b/packages/test/test-service-load/src/optionsMatrix.ts index 415ed706a7bf..bbd38d51b119 100644 --- a/packages/test/test-service-load/src/optionsMatrix.ts +++ b/packages/test/test-service-load/src/optionsMatrix.ts @@ -116,6 +116,7 @@ export function generateRuntimeOptions( chunkSizeInBytes: [204800], enableRuntimeIdCompressor: ["on", undefined, "delayed"], enableGroupedBatching: [true, false], + createBlobPayloadPending: [true, false], explicitSchemaControl: [true, false], };