Skip to content

Add read support for handles with pending payloads #24320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 24, 2025
16 changes: 16 additions & 0 deletions packages/common/core-interfaces/src/handles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ export interface IFluidHandleInternal<
bind(handle: IFluidHandleInternal): void;
}

/**
* @privateRemarks
* To be merged onto IFluidHandleInternal in accordance with breaking change policy
* @internal
*/
export interface IFluidHandleInternalPlaceholder<
// REVIEW: Constrain `T` to something? How do we support dds and datastores safely?
out T = unknown, // FluidObject & IFluidLoadable,
> extends IFluidHandleInternal<T> {
/**
* Whether the handle is a placeholder, meaning that it may exist before its payload is retrievable.
* For instance, the BlobManager can generate handles before completing the blob upload/attach.
*/
readonly placeholder: boolean;
}

/**
* Symbol which must only be used on an {@link (IFluidHandle:interface)}, and is used to identify such objects.
*
Expand Down
1 change: 1 addition & 0 deletions packages/common/core-interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type {
IProvideFluidHandleContext,
IProvideFluidHandle,
IFluidHandleInternal,
IFluidHandleInternalPlaceholder,
IFluidHandleErased,
} from "./handles.js";
export { IFluidHandleContext, IFluidHandle, fluidHandleSymbol } from "./handles.js";
Expand Down
2 changes: 1 addition & 1 deletion packages/dds/shared-object-base/src/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class FluidSerializer implements IFluidSerializer {
? value.url
: generateHandleContextPath(value.url, this.context);

return new RemoteFluidObjectHandle(absolutePath, this.root);
return new RemoteFluidObjectHandle(absolutePath, this.root, value.placeholder === true);
} else {
return value;
}
Expand Down
24 changes: 20 additions & 4 deletions packages/dds/shared-object-base/src/test/serializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ describe("FluidSerializer", () => {
describe("vanilla JSON", () => {
const context = new MockHandleContext();
const serializer = new FluidSerializer(context);
const handle = new RemoteFluidObjectHandle("/root", context);
const handle = new RemoteFluidObjectHandle(
"/root",
context,
false, // placeholder
);

// Start with the various JSON-serializable types. A mix of "truthy" and "falsy" values
// are of particular interest.
Expand Down Expand Up @@ -167,7 +171,11 @@ describe("FluidSerializer", () => {
describe("JSON w/embedded handles", () => {
const context = new MockHandleContext();
const serializer = new FluidSerializer(context);
const handle = new RemoteFluidObjectHandle("/root", context);
const handle = new RemoteFluidObjectHandle(
"/root",
context,
false, // placeholder
);
const serializedHandle = {
type: "__fluid_handle__",
url: "/root",
Expand Down Expand Up @@ -329,8 +337,16 @@ describe("FluidSerializer", () => {
describe("Utils", () => {
const serializer = new FluidSerializer(new MockHandleContext());
it("makeSerializable is idempotent", () => {
const bind = new RemoteFluidObjectHandle("/", new MockHandleContext());
const handle = new RemoteFluidObjectHandle("/okay", new MockHandleContext());
const bind = new RemoteFluidObjectHandle(
"/",
new MockHandleContext(),
false, // placeholder
);
const handle = new RemoteFluidObjectHandle(
"/okay",
new MockHandleContext(),
false, // placeholder
);
const input = { x: handle, y: 123 };
const serializedOnce = makeHandlesSerializable(input, serializer, bind) as {
x: { type: "__fluid_handle__" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export interface ICompressionRuntimeOptions {
export interface IContainerRuntimeOptions {
readonly chunkSizeInBytes?: number;
readonly compressionOptions?: ICompressionRuntimeOptions;
readonly createBlobPlaceholders?: boolean;
// @deprecated
readonly enableGroupedBatching?: boolean;
readonly enableRuntimeIdCompressor?: IdCompressorMode;
Expand Down
49 changes: 39 additions & 10 deletions packages/runtime/container-runtime/src/blobManager/blobManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
IEventProvider,
IFluidHandleContext,
IFluidHandleInternal,
IFluidHandleInternalPlaceholder,
} from "@fluidframework/core-interfaces/internal";
import { assert, Deferred } from "@fluidframework/core-utils/internal";
import {
Expand Down Expand Up @@ -62,7 +63,10 @@ import {
* DataObject.request() recognizes requests in the form of `/blobs/<id>`
* and loads blob.
*/
export class BlobHandle extends FluidHandleBase<ArrayBufferLike> {
export class BlobHandle
extends FluidHandleBase<ArrayBufferLike>
implements IFluidHandleInternalPlaceholder<ArrayBufferLike>
{
private attached: boolean = false;

public get isAttached(): boolean {
Expand All @@ -75,6 +79,7 @@ export class BlobHandle extends FluidHandleBase<ArrayBufferLike> {
public readonly path: string,
public readonly routeContext: IFluidHandleContext,
public get: () => Promise<ArrayBufferLike>,
public readonly placeholder: boolean,
private readonly onAttachGraph?: () => void,
) {
super();
Expand Down Expand Up @@ -133,7 +138,8 @@ export interface IBlobManagerEvents extends IEvent {
}

interface IBlobManagerInternalEvents extends IEvent {
(event: "blobAttached", listener: (pending: PendingBlob) => void);
(event: "handleAttached", listener: (pending: PendingBlob) => void);
(event: "processedBlobAttach", listener: (localId: string, storageId: string) => void);
}

const stashedPendingBlobOverrides: Pick<
Expand Down Expand Up @@ -195,7 +201,7 @@ export class BlobManager {
new Map();
public readonly stashedBlobsUploadP: Promise<(void | ICreateBlobResponse)[]>;

constructor(props: {
public constructor(props: {
readonly routeContext: IFluidHandleContext;

blobManagerLoadInfo: IBlobManagerLoadInfo;
Expand All @@ -220,6 +226,7 @@ export class BlobManager {
readonly runtime: IBlobManagerRuntime;
stashedBlobs: IPendingBlobs | undefined;
readonly localBlobIdGenerator?: (() => string) | undefined;
readonly createBlobPlaceholders: boolean;
}) {
const {
routeContext,
Expand Down Expand Up @@ -351,7 +358,11 @@ export class BlobManager {
return [...this.pendingBlobs.values()].some((e) => e.stashedUpload === true);
}

public async getBlob(blobId: string): Promise<ArrayBufferLike> {
public hasBlob(blobId: string): boolean {
return this.redirectTable.get(blobId) !== undefined;
}

public async getBlob(blobId: string, placeholder: boolean): Promise<ArrayBufferLike> {
// Verify that the blob is not deleted, i.e., it has not been garbage collected. If it is, this will throw
// an error, failing the call.
this.verifyBlobNotDeleted(blobId);
Expand All @@ -374,8 +385,24 @@ export class BlobManager {
storageId = blobId;
} else {
const attachedStorageId = this.redirectTable.get(blobId);
assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */);
storageId = attachedStorageId;
if (!placeholder) {
assert(!!attachedStorageId, 0x11f /* "requesting unknown blobs" */);
}
// If we didn't find it in the redirectTable, assume the attach op is coming eventually and wait.
// We do this even if the local client doesn't have the blob placeholder flag enabled, in case a
// remote client does have it enabled. This wait may be infinite if the uploading client failed
// the upload and doesn't exist anymore.
storageId =
attachedStorageId ??
(await new Promise<string>((resolve) => {
const onProcessBlobAttach = (localId: string, _storageId: string): void => {
if (localId === blobId) {
this.internalEvents.off("processedBlobAttach", onProcessBlobAttach);
resolve(_storageId);
}
};
this.internalEvents.on("processedBlobAttach", onProcessBlobAttach);
}));
}

return PerformanceEvent.timedExecAsync(
Expand Down Expand Up @@ -406,14 +433,15 @@ export class BlobManager {
? () => {
pending.attached = true;
// Notify listeners (e.g. serialization process) that blob has been attached
this.internalEvents.emit("blobAttached", pending);
this.internalEvents.emit("handleAttached", pending);
this.deletePendingBlobMaybe(localId);
}
: undefined;
return new BlobHandle(
getGCNodePathFromBlobId(localId),
this.routeContext,
async () => this.getBlob(localId),
async () => this.getBlob(localId, false),
false, // placeholder
callback,
);
}
Expand Down Expand Up @@ -677,6 +705,7 @@ export class BlobManager {
this.deletePendingBlobMaybe(localId);
}
}
this.internalEvents.emit("processedBlobAttach", localId, blobId);
}

public summarize(telemetryContext?: ITelemetryContext): ISummaryTreeWithStats {
Expand Down Expand Up @@ -870,14 +899,14 @@ export class BlobManager {
);
const onBlobAttached = (attachedEntry: PendingBlob): void => {
if (attachedEntry === entry) {
this.internalEvents.off("blobAttached", onBlobAttached);
this.internalEvents.off("handleAttached", onBlobAttached);
resolve();
}
};
if (entry.attached) {
resolve();
} else {
this.internalEvents.on("blobAttached", onBlobAttached);
this.internalEvents.on("handleAttached", onBlobAttached);
}
}),
);
Expand Down
22 changes: 21 additions & 1 deletion packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import {
import {
GCDataBuilder,
RequestParser,
RuntimeHeaders,
TelemetryContext,
addBlobToSummary,
addSummarizeResultToSummary,
Expand Down Expand Up @@ -397,6 +398,12 @@ export interface IContainerRuntimeOptions {
* are engaged as they become available, without giving legacy clients any chance to fail predictably.
*/
readonly explicitSchemaControl?: boolean;

/**
* Create blob placeholders when calling createBlob (default is false).
* When enabled, createBlob will return a handle before the blob upload completes.
*/
readonly createBlobPlaceholders?: boolean;
}

/**
Expand Down Expand Up @@ -790,6 +797,7 @@ export class ContainerRuntime
chunkSizeInBytes = defaultChunkSizeInBytes,
enableGroupedBatching = true,
explicitSchemaControl = false,
createBlobPlaceholders = false,
}: IContainerRuntimeOptionsInternal = runtimeOptions;

const registry = new FluidDataStoreRegistry(registryEntries);
Expand Down Expand Up @@ -966,6 +974,7 @@ export class ContainerRuntime
compressionLz4,
idCompressorMode,
opGroupingEnabled: enableGroupedBatching,
createBlobPlaceholders,
disallowedVersions: [],
},
(schema) => {
Expand All @@ -992,6 +1001,7 @@ export class ContainerRuntime
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
enableGroupedBatching,
explicitSchemaControl,
createBlobPlaceholders,
};

const runtime = new containerRuntimeCtor(
Expand Down Expand Up @@ -1739,6 +1749,7 @@ export class ContainerRuntime
isBlobDeleted: (blobPath: string) => this.garbageCollector.isNodeDeleted(blobPath),
runtime: this,
stashedBlobs: pendingRuntimeState?.pendingAttachmentBlobs,
createBlobPlaceholders: this.sessionSchema.createBlobPlaceholders === true,
});

this.deltaScheduler = new DeltaScheduler(
Expand Down Expand Up @@ -2257,7 +2268,16 @@ export class ContainerRuntime
}

if (id === blobManagerBasePath && requestParser.isLeaf(2)) {
const blob = await this.blobManager.getBlob(requestParser.pathParts[1]);
const localId = requestParser.pathParts[1];
const placeholder = requestParser.headers?.[RuntimeHeaders.placeholder] === true;
if (
!this.blobManager.hasBlob(localId) &&
requestParser.headers?.[RuntimeHeaders.wait] === false
) {
return create404Response(request);
}

const blob = await this.blobManager.getBlob(localId, placeholder);
return {
status: 200,
mimeType: "fluid/object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export interface IDocumentSchemaFeatures {
compressionLz4: boolean;
idCompressorMode: IdCompressorMode;
opGroupingEnabled: boolean;
createBlobPlaceholders: boolean;

/**
* List of disallowed versions of the runtime.
Expand Down Expand Up @@ -227,6 +228,7 @@ const documentSchemaSupportedConfigs = {
idCompressorMode: new IdCompressorProperty(["delayed", "on"]),
opGroupingEnabled: new TrueOrUndefined(),
compressionLz4: new TrueOrUndefined(),
createBlobPlaceholders: new TrueOrUndefined(),
disallowedVersions: new CheckVersions(),
};

Expand Down Expand Up @@ -482,6 +484,7 @@ export class DocumentsSchemaController {
compressionLz4: boolToProp(features.compressionLz4),
idCompressorMode: features.idCompressorMode,
opGroupingEnabled: boolToProp(features.opGroupingEnabled),
createBlobPlaceholders: boolToProp(features.createBlobPlaceholders),
disallowedVersions: arrayToProp(features.disallowedVersions),
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { Deferred } from "@fluidframework/core-utils/internal";
import { IClientDetails, SummaryType } from "@fluidframework/driver-definitions";
import { IDocumentStorageService } from "@fluidframework/driver-definitions/internal";
import type { ISequencedMessageEnvelope } from "@fluidframework/runtime-definitions/internal";
import { isFluidHandleInternalPlaceholder } from "@fluidframework/runtime-utils/internal";
import {
LoggingError,
MockLogger,
Expand Down Expand Up @@ -104,6 +105,7 @@ export class MockRuntime
isBlobDeleted: (blobPath: string) => this.isBlobDeleted(blobPath),
runtime: this,
stashedBlobs: stashed[1] as IPendingBlobs | undefined,
createBlobPlaceholders: false,
});
}

Expand Down Expand Up @@ -160,7 +162,10 @@ export class MockRuntime
): Promise<ArrayBufferLike> {
const pathParts = blobHandle.absolutePath.split("/");
const blobId = pathParts[2];
return this.blobManager.getBlob(blobId);
const placeholder = isFluidHandleInternalPlaceholder(blobHandle)
? blobHandle.placeholder
: false;
return this.blobManager.getBlob(blobId, placeholder);
}

public async getPendingLocalState(): Promise<(unknown[] | IPendingBlobs | undefined)[]> {
Expand Down Expand Up @@ -743,7 +748,7 @@ describe("BlobManager", () => {
});

await assert.rejects(
async () => runtime.blobManager.getBlob(someId),
async () => runtime.blobManager.getBlob(someId, false),
(e: Error) => e.message === "BOOM!",
"Expected getBlob to throw with test error message",
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ function createBlobManager(overrides?: Partial<ConstructorParameters<typeof Blob
blobManagerLoadInfo: {},
stashedBlobs: undefined,
localBlobIdGenerator: undefined,
createBlobPlaceholders: false,

// overrides
...overrides,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ describe("Runtime", () => {
enableRuntimeIdCompressor: undefined,
enableGroupedBatching: true, // Redundant, but makes the JSON.stringify yield the same result as the logs
explicitSchemaControl: false,
createBlobPlaceholders: false, // Redundant, but makes the JSON.stringify yield the same result as the logs
};
const mergedRuntimeOptions = { ...defaultRuntimeOptions, ...runtimeOptions };

Expand Down
Loading
Loading