Skip to content

Add read support for handles with pending payloads #24320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 24, 2025
16 changes: 16 additions & 0 deletions packages/common/core-interfaces/src/handles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ export interface IFluidHandleInternal<
bind(handle: IFluidHandleInternal): void;
}

/**
* @privateRemarks
* To be merged onto IFluidHandleInternal in accordance with breaking change policy
* @internal
*/
export interface IFluidHandleInternalPayloadPending<
// REVIEW: Constrain `T` to something? How do we support dds and datastores safely?
out T = unknown, // FluidObject & IFluidLoadable,
> extends IFluidHandleInternal<T> {
/**
* Whether the handle has a pending payload, meaning that it may exist before its payload is retrievable.
* For instance, the BlobManager can generate handles before completing the blob upload/attach.
*/
readonly payloadPending: boolean;
}

/**
* Symbol which must only be used on an {@link (IFluidHandle:interface)}, and is used to identify such objects.
*
Expand Down
1 change: 1 addition & 0 deletions packages/common/core-interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type {
IProvideFluidHandleContext,
IProvideFluidHandle,
IFluidHandleInternal,
IFluidHandleInternalPayloadPending,
IFluidHandleErased,
} from "./handles.js";
export { IFluidHandleContext, IFluidHandle, fluidHandleSymbol } from "./handles.js";
Expand Down
6 changes: 5 additions & 1 deletion packages/dds/shared-object-base/src/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ export class FluidSerializer implements IFluidSerializer {
? value.url
: generateHandleContextPath(value.url, this.context);

return new RemoteFluidObjectHandle(absolutePath, this.root);
return new RemoteFluidObjectHandle(
absolutePath,
this.root,
value.payloadPending === true,
);
} else {
return value;
}
Expand Down
24 changes: 20 additions & 4 deletions packages/dds/shared-object-base/src/test/serializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ describe("FluidSerializer", () => {
describe("vanilla JSON", () => {
const context = new MockHandleContext();
const serializer = new FluidSerializer(context);
const handle = new RemoteFluidObjectHandle("/root", context);
const handle = new RemoteFluidObjectHandle(
"/root",
context,
false, // payloadPending
);

// Start with the various JSON-serializable types. A mix of "truthy" and "falsy" values
// are of particular interest.
Expand Down Expand Up @@ -167,7 +171,11 @@ describe("FluidSerializer", () => {
describe("JSON w/embedded handles", () => {
const context = new MockHandleContext();
const serializer = new FluidSerializer(context);
const handle = new RemoteFluidObjectHandle("/root", context);
const handle = new RemoteFluidObjectHandle(
"/root",
context,
false, // payloadPending
);
const serializedHandle = {
type: "__fluid_handle__",
url: "/root",
Expand Down Expand Up @@ -329,8 +337,16 @@ describe("FluidSerializer", () => {
describe("Utils", () => {
const serializer = new FluidSerializer(new MockHandleContext());
it("makeSerializable is idempotent", () => {
const bind = new RemoteFluidObjectHandle("/", new MockHandleContext());
const handle = new RemoteFluidObjectHandle("/okay", new MockHandleContext());
const bind = new RemoteFluidObjectHandle(
"/",
new MockHandleContext(),
false, // payloadPending
);
const handle = new RemoteFluidObjectHandle(
"/okay",
new MockHandleContext(),
false, // payloadPending
);
const input = { x: handle, y: 123 };
const serializedOnce = makeHandlesSerializable(input, serializer, bind) as {
x: { type: "__fluid_handle__" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export enum ContainerMessageType {
export interface ContainerRuntimeOptions {
readonly chunkSizeInBytes: number;
readonly compressionOptions: ICompressionRuntimeOptions;
readonly createBlobPayloadPending: boolean;
// @deprecated
readonly enableGroupedBatching: boolean;
readonly enableRuntimeIdCompressor: IdCompressorMode;
Expand Down
49 changes: 39 additions & 10 deletions packages/runtime/container-runtime/src/blobManager/blobManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
IEventProvider,
IFluidHandleContext,
IFluidHandleInternal,
IFluidHandleInternalPayloadPending,
} from "@fluidframework/core-interfaces/internal";
import { assert, Deferred } from "@fluidframework/core-utils/internal";
import {
Expand Down Expand Up @@ -62,7 +63,10 @@ import {
* DataObject.request() recognizes requests in the form of `/blobs/<id>`
* and loads blob.
*/
export class BlobHandle extends FluidHandleBase<ArrayBufferLike> {
export class BlobHandle
extends FluidHandleBase<ArrayBufferLike>
implements IFluidHandleInternalPayloadPending<ArrayBufferLike>
{
private attached: boolean = false;

public get isAttached(): boolean {
Expand All @@ -75,6 +79,7 @@ export class BlobHandle extends FluidHandleBase<ArrayBufferLike> {
public readonly path: string,
public readonly routeContext: IFluidHandleContext,
public get: () => Promise<ArrayBufferLike>,
public readonly payloadPending: boolean,
private readonly onAttachGraph?: () => void,
) {
super();
Expand Down Expand Up @@ -133,7 +138,8 @@ export interface IBlobManagerEvents extends IEvent {
}

interface IBlobManagerInternalEvents extends IEvent {
(event: "blobAttached", listener: (pending: PendingBlob) => void);
(event: "handleAttached", listener: (pending: PendingBlob) => void);
(event: "processedBlobAttach", listener: (localId: string, storageId: string) => void);
}

const stashedPendingBlobOverrides: Pick<
Expand Down Expand Up @@ -195,7 +201,7 @@ export class BlobManager {
new Map();
public readonly stashedBlobsUploadP: Promise<(void | ICreateBlobResponse)[]>;

constructor(props: {
public constructor(props: {
readonly routeContext: IFluidHandleContext;

blobManagerLoadInfo: IBlobManagerLoadInfo;
Expand All @@ -220,6 +226,7 @@ export class BlobManager {
readonly runtime: IBlobManagerRuntime;
stashedBlobs: IPendingBlobs | undefined;
readonly localBlobIdGenerator?: (() => string) | undefined;
readonly createBlobPayloadPending: boolean;
}) {
const {
routeContext,
Expand Down Expand Up @@ -351,7 +358,11 @@ export class BlobManager {
return [...this.pendingBlobs.values()].some((e) => e.stashedUpload === true);
}

public async getBlob(blobId: string): Promise<ArrayBufferLike> {
public hasBlob(blobId: string): boolean {
return this.redirectTable.get(blobId) !== undefined;
}

public async getBlob(blobId: string, payloadPending: boolean): Promise<ArrayBufferLike> {
// Verify that the blob is not deleted, i.e., it has not been garbage collected. If it is, this will throw
// an error, failing the call.
this.verifyBlobNotDeleted(blobId);
Expand All @@ -374,8 +385,24 @@ export class BlobManager {
storageId = blobId;
} else {
const attachedStorageId = this.redirectTable.get(blobId);
assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */);
storageId = attachedStorageId;
if (!payloadPending) {
assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */);
}
// If we didn't find it in the redirectTable, assume the attach op is coming eventually and wait.
// We do this even if the local client doesn't have the blob payloadPending flag enabled, in case a
// remote client does have it enabled. This wait may be infinite if the uploading client failed
// the upload and doesn't exist anymore.
storageId =
attachedStorageId ??
(await new Promise<string>((resolve) => {
const onProcessBlobAttach = (localId: string, _storageId: string): void => {
if (localId === blobId) {
this.internalEvents.off("processedBlobAttach", onProcessBlobAttach);
resolve(_storageId);
}
};
this.internalEvents.on("processedBlobAttach", onProcessBlobAttach);
}));
}

return PerformanceEvent.timedExecAsync(
Expand Down Expand Up @@ -406,14 +433,15 @@ export class BlobManager {
? () => {
pending.attached = true;
// Notify listeners (e.g. serialization process) that blob has been attached
this.internalEvents.emit("blobAttached", pending);
this.internalEvents.emit("handleAttached", pending);
this.deletePendingBlobMaybe(localId);
}
: undefined;
return new BlobHandle(
getGCNodePathFromBlobId(localId),
this.routeContext,
async () => this.getBlob(localId),
async () => this.getBlob(localId, false),
false, // payloadPending
callback,
);
}
Expand Down Expand Up @@ -681,6 +709,7 @@ export class BlobManager {
this.deletePendingBlobMaybe(localId);
}
}
this.internalEvents.emit("processedBlobAttach", localId, blobId);
}

public summarize(telemetryContext?: ITelemetryContext): ISummaryTreeWithStats {
Expand Down Expand Up @@ -874,14 +903,14 @@ export class BlobManager {
);
const onBlobAttached = (attachedEntry: PendingBlob): void => {
if (attachedEntry === entry) {
this.internalEvents.off("blobAttached", onBlobAttached);
this.internalEvents.off("handleAttached", onBlobAttached);
resolve();
}
};
if (entry.attached) {
resolve();
} else {
this.internalEvents.on("blobAttached", onBlobAttached);
this.internalEvents.on("handleAttached", onBlobAttached);
}
}),
);
Expand Down
6 changes: 6 additions & 0 deletions packages/runtime/container-runtime/src/compatUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ const runtimeOptionsAffectingDocSchemaConfigMap = {
// Although sweep is supported in 2.x, it is disabled by default until compatibilityVersion>=3.0.0 to be extra safe.
"3.0.0": { enableGCSweep: true },
} as const,
createBlobPayloadPending: {
// This feature is new and disabled by default. In the future we will enable it by default, but we have not
// closed on the version where that will happen yet. Probably a .10 release since blob functionality is not
// exposed on the public API surface.
"1.0.0": false,
} as const,
} as const satisfies ConfigMap<RuntimeOptionsAffectingDocSchema>;

/**
Expand Down
22 changes: 21 additions & 1 deletion packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ import {
import {
GCDataBuilder,
RequestParser,
RuntimeHeaders,
TelemetryContext,
addBlobToSummary,
addSummarizeResultToSummary,
Expand Down Expand Up @@ -406,6 +407,12 @@ export interface ContainerRuntimeOptions {
* are engaged as they become available, without giving legacy clients any chance to fail predictably.
*/
readonly explicitSchemaControl: boolean;

/**
* Create blob handles with pending payloads when calling createBlob (default is false).
* When enabled, createBlob will return a handle before the blob upload completes.
*/
readonly createBlobPayloadPending: boolean;
}

/**
Expand Down Expand Up @@ -829,6 +836,7 @@ export class ContainerRuntime
compressionOptions = enableGroupedBatching === false
? disabledCompressionConfig
: defaultConfigs.compressionOptions,
createBlobPayloadPending = defaultConfigs.createBlobPayloadPending,
}: IContainerRuntimeOptionsInternal = runtimeOptions;

// The logic for enableRuntimeIdCompressor is a bit different. Since `undefined` represents a logical state (off)
Expand Down Expand Up @@ -1012,6 +1020,7 @@ export class ContainerRuntime
compressionLz4,
idCompressorMode,
opGroupingEnabled: enableGroupedBatching,
createBlobPayloadPending,
disallowedVersions: [],
},
(schema) => {
Expand All @@ -1037,6 +1046,7 @@ export class ContainerRuntime
enableRuntimeIdCompressor,
enableGroupedBatching,
explicitSchemaControl,
createBlobPayloadPending,
};

const runtime = new containerRuntimeCtor(
Expand Down Expand Up @@ -1771,6 +1781,7 @@ export class ContainerRuntime
isBlobDeleted: (blobPath: string) => this.garbageCollector.isNodeDeleted(blobPath),
runtime: this,
stashedBlobs: pendingRuntimeState?.pendingAttachmentBlobs,
createBlobPayloadPending: this.sessionSchema.createBlobPayloadPending === true,
});

this.deltaScheduler = new DeltaScheduler(
Expand Down Expand Up @@ -2319,7 +2330,16 @@ export class ContainerRuntime
}

if (id === blobManagerBasePath && requestParser.isLeaf(2)) {
const blob = await this.blobManager.getBlob(requestParser.pathParts[1]);
const localId = requestParser.pathParts[1];
const payloadPending = requestParser.headers?.[RuntimeHeaders.payloadPending] === true;
if (
!this.blobManager.hasBlob(localId) &&
requestParser.headers?.[RuntimeHeaders.wait] === false
) {
return create404Response(request);
}

const blob = await this.blobManager.getBlob(localId, payloadPending);
return {
status: 200,
mimeType: "fluid/object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export interface IDocumentSchemaFeatures {
compressionLz4: boolean;
idCompressorMode: IdCompressorMode;
opGroupingEnabled: boolean;
createBlobPayloadPending: boolean;

/**
* List of disallowed versions of the runtime.
Expand Down Expand Up @@ -227,6 +228,7 @@ const documentSchemaSupportedConfigs = {
idCompressorMode: new IdCompressorProperty(["delayed", "on"]),
opGroupingEnabled: new TrueOrUndefined(),
compressionLz4: new TrueOrUndefined(),
createBlobPayloadPending: new TrueOrUndefined(),
disallowedVersions: new CheckVersions(),
};

Expand Down Expand Up @@ -482,6 +484,7 @@ export class DocumentsSchemaController {
compressionLz4: boolToProp(features.compressionLz4),
idCompressorMode: features.idCompressorMode,
opGroupingEnabled: boolToProp(features.opGroupingEnabled),
createBlobPayloadPending: boolToProp(features.createBlobPayloadPending),
disallowedVersions: arrayToProp(features.disallowedVersions),
},
};
Expand Down
Loading
Loading