diff --git a/.changeset/frowny-paths-hear.md b/.changeset/frowny-paths-hear.md new file mode 100644 index 000000000000..6d5a5f8f1f94 --- /dev/null +++ b/.changeset/frowny-paths-hear.md @@ -0,0 +1,11 @@ +--- +"@fluidframework/datastore": minor +"@fluidframework/test-runtime-utils": minor +"__section": legacy +--- + +Deprecate submitMessage on FluidDataStoreRuntime and MockFluidDataStoreRuntime + +Implementing `FluidDataStoreRuntime.submitMessage` is not required per `IFluidDataStoreChannel` and is now deprecated on `FluidDataStoreRuntime` and corresponding `MockFluidDataStoreRuntime`. + +See [issue #24406](https://github.com/microsoft/FluidFramework/issues/24406) for details and alternatives. diff --git a/packages/common/core-interfaces/src/messages.ts b/packages/common/core-interfaces/src/messages.ts index 92d7d2ad9360..25bb38b735ec 100644 --- a/packages/common/core-interfaces/src/messages.ts +++ b/packages/common/core-interfaces/src/messages.ts @@ -29,7 +29,6 @@ export interface ISignalEnvelope { */ contents: { type: string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - content: any; + content: unknown; }; } diff --git a/packages/framework/attributor/src/runtimeAttributorDataStoreChannel.ts b/packages/framework/attributor/src/runtimeAttributorDataStoreChannel.ts index 594b10e6cf55..076f1edec81d 100644 --- a/packages/framework/attributor/src/runtimeAttributorDataStoreChannel.ts +++ b/packages/framework/attributor/src/runtimeAttributorDataStoreChannel.ts @@ -186,7 +186,7 @@ export class RuntimeAttributorDataStoreChannel /** * {@inheritdoc IFluidDataStoreChannel.reSubmit} */ - public reSubmit(type: string, content: unknown, localOpMetadata: unknown): void { + public reSubmit(): void { // Should not resubmit anything from the attributor as the attributor does not send ops yet. throw new Error("Should not resubmit anything from the attributor"); } diff --git a/packages/runtime/container-runtime/src/channelCollection.ts b/packages/runtime/container-runtime/src/channelCollection.ts index 01255683c30c..ef883430a756 100644 --- a/packages/runtime/container-runtime/src/channelCollection.ts +++ b/packages/runtime/container-runtime/src/channelCollection.ts @@ -11,7 +11,10 @@ import { IResponse, ITelemetryBaseLogger, } from "@fluidframework/core-interfaces"; -import type { IFluidHandleInternal } from "@fluidframework/core-interfaces/internal"; +import type { + IFluidHandleInternal, + ISignalEnvelope, +} from "@fluidframework/core-interfaces/internal"; import { assert, Lazy, LazyPromise } from "@fluidframework/core-utils/internal"; import { FluidObjectHandle } from "@fluidframework/datastore/internal"; import type { ISnapshot } from "@fluidframework/driver-definitions/internal"; @@ -24,6 +27,12 @@ import { getSnapshotTree, isInstanceOfISnapshot, } from "@fluidframework/driver-utils/internal"; +import type { + FluidDataStoreMessage, + IRuntimeMessagesContent, + InboundAttachMessage, + IRuntimeMessageCollection, +} from "@fluidframework/runtime-definitions/internal"; import { ISummaryTreeWithStats, ITelemetryContext, @@ -43,9 +52,6 @@ import { channelsTreeName, IInboundSignalMessage, gcDataBlobKey, - type IRuntimeMessagesContent, - type InboundAttachMessage, - type IRuntimeMessageCollection, } from "@fluidframework/runtime-definitions/internal"; import { GCDataBuilder, @@ -98,6 +104,11 @@ import { import { DataStoreContexts } from "./dataStoreContexts.js"; import { FluidDataStoreRegistry } from "./dataStoreRegistry.js"; import { GCNodeType, IGCNodeUpdatedProps, urlToGCNodePath } from "./gc/index.js"; +import type { + ContainerRuntimeAliasMessage, + ContainerRuntimeDataStoreOpMessage, + OutboundContainerRuntimeAttachMessage, +} from "./messageTypes.js"; import { ContainerMessageType, LocalContainerRuntimeMessage } from "./messageTypes.js"; import { StorageServiceWithAttachBlobs } from "./storageServiceWithAttachBlobs.js"; import { @@ -115,15 +126,47 @@ export const AllowTombstoneRequestHeaderKey = "allowTombstone"; // Belongs in th type PendingAliasResolve = (success: boolean) => void; -interface FluidDataStoreMessage { - content: unknown; - type: string; +/** + * Envelope for signals not intended for the container. + * + * @privateRemarks + * `clientBroadcastSignalSequenceNumber` might be added to the envelope by the container runtime. + * But it should not be provided to start with. + * + * @internal + */ +export type AddressedSignalEnvelope = IEnvelope; + +/** + * Kin of {@link @fluidframework/runtime-definitions#IFluidParentContext} with alternate + * submitMessage and submitSignal methods. + * + * @internal + * @privateRemarks Exposed per ChannelCollection testing and API extractor request + */ +export interface IFluidRootParentContext + extends Omit { + readonly submitMessage: ( + containerRuntimeMessage: + | ContainerRuntimeDataStoreOpMessage + | OutboundContainerRuntimeAttachMessage + | ContainerRuntimeAliasMessage, + localOpMetadata: unknown, + ) => void; + readonly submitSignal: (envelope: AddressedSignalEnvelope, targetClientId?: string) => void; } +type SubmitKeys = "submitMessage" | "submitSignal"; + /** - * Creates a shallow wrapper of {@link IFluidParentContext}. The wrapper can then have its methods overwritten as needed + * Creates a shallow wrapper of {@link IFluidParentContext} or + * {@link IFluidRootParentContext} with `submitMessage` and `submitSignal` + * methods replaced with the provided overrides. */ -export function wrapContext(context: IFluidParentContext): IFluidParentContext { +export function formParentContext( + context: Omit, + overrides: Pick, +): Omit & Pick { return { get IFluidDataStoreRegistry() { return context.IFluidDataStoreRegistry; @@ -162,12 +205,8 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext { getAudience: (...args) => { return context.getAudience(...args); }, - submitMessage: (...args) => { - return context.submitMessage(...args); - }, - submitSignal: (...args) => { - return context.submitSignal(...args); - }, + submitMessage: overrides.submitMessage.bind(overrides), + submitSignal: overrides.submitSignal, makeLocallyVisible: (...args) => { return context.makeLocallyVisible(...args); }, @@ -194,38 +233,32 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext { * The wrapper will have the submit methods overwritten with the appropriate id as the destination address. * * @param id - the id of the channel - * @param parentContext - the {@link IFluidParentContext} to wrap + * @param parentContext - the {@link IFluidRootParentContext} to wrap * @returns A wrapped {@link IFluidParentContext} */ function wrapContextForInnerChannel( id: string, - parentContext: IFluidParentContext, + parentContext: IFluidRootParentContext, ): IFluidParentContext { - const context = wrapContext(parentContext); - - context.submitMessage = (type: string, content: unknown, localOpMetadata: unknown) => { - const fluidDataStoreContent: FluidDataStoreMessage = { - content, - type, - }; - const envelope: IEnvelope = { - address: id, - contents: fluidDataStoreContent, - }; - parentContext.submitMessage( - ContainerMessageType.FluidDataStoreOp, - envelope, - localOpMetadata, - ); - }; - - context.submitSignal = (type: string, contents: unknown, targetClientId?: string) => { - const envelope: IEnvelope = { - address: id, - contents, - }; - parentContext.submitSignal(type, envelope, targetClientId); - }; + const context = formParentContext(parentContext, { + submitMessage: (type: string, content: unknown, localOpMetadata: unknown) => { + const fluidDataStoreContent: FluidDataStoreMessage = { + content, + type, + }; + const envelope = { + address: id, + contents: fluidDataStoreContent, + }; + parentContext.submitMessage( + { type: ContainerMessageType.FluidDataStoreOp, contents: envelope }, + localOpMetadata, + ); + }, + submitSignal: (type: string, content: unknown, targetClientId?: string) => { + parentContext.submitSignal({ address: id, contents: { type, content } }, targetClientId); + }, + }); return context; } @@ -242,7 +275,9 @@ export function getLocalDataStoreType(localDataStore: LocalFluidDataStoreContext * but eventually could be hosted on any channel once we formalize the channel api boundary. * @internal */ -export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { +export class ChannelCollection + implements Omit, IDisposable +{ // Stores tracked by the Domain private readonly pendingAttach = new Map(); // 0.24 back-compat attachingBeforeSummary @@ -253,8 +288,6 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { // eslint-disable-next-line unicorn/consistent-function-scoping -- Property is defined once; no need to extract inner lambda private readonly disposeOnce = new Lazy(() => this.contexts.dispose()); - public readonly entryPoint: IFluidHandleInternal; - public readonly containerLoadStats: { // number of dataStores during loadContainer readonly containerLoadDataStoreCount: number; @@ -272,20 +305,14 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { constructor( protected readonly baseSnapshot: ISnapshotTree | ISnapshot | undefined, - public readonly parentContext: IFluidParentContext, + public readonly parentContext: IFluidRootParentContext, baseLogger: ITelemetryBaseLogger, private readonly gcNodeUpdated: (props: IGCNodeUpdatedProps) => void, private readonly isDataStoreDeleted: (nodePath: string) => boolean, private readonly aliasMap: Map, - provideEntryPoint: (runtime: ChannelCollection) => Promise, ) { this.mc = createChildMonitoringContext({ logger: baseLogger }); this.contexts = new DataStoreContexts(baseLogger); - this.entryPoint = new FluidObjectHandle( - new LazyPromise(async () => provideEntryPoint(this)), - "", - this.parentContext.IFluidHandleContext, - ); this.aliasedDataStores = new Set(aliasMap.values()); // Extract stores stored inside the snapshot @@ -585,7 +612,10 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { protected submitAttachChannelOp(localContext: LocalFluidDataStoreContext): void { const message = this.generateAttachMessage(localContext); this.pendingAttach.set(localContext.id, message); - this.parentContext.submitMessage(ContainerMessageType.Attach, message, undefined); + this.parentContext.submitMessage( + { type: ContainerMessageType.Attach, contents: message }, + undefined, + ); this.attachOpFiredForDataStore.add(localContext.id); } @@ -690,24 +720,32 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { } public readonly dispose = (): void => this.disposeOnce.value; - public reSubmit(type: string, content: unknown, localOpMetadata: unknown): void { - switch (type) { + public readonly reSubmitContainerMessage = ( + message: + | ContainerRuntimeDataStoreOpMessage + | OutboundContainerRuntimeAttachMessage + | ContainerRuntimeAliasMessage, + localOpMetadata: unknown, + ): void => { + switch (message.type) { case ContainerMessageType.Attach: case ContainerMessageType.Alias: { - this.parentContext.submitMessage(type, content, localOpMetadata); + this.parentContext.submitMessage(message, localOpMetadata); return; } case ContainerMessageType.FluidDataStoreOp: { - return this.reSubmitChannelOp(type, content, localOpMetadata); + return this.resubmitDataStoreOp(message.contents, localOpMetadata); } default: { assert(false, 0x907 /* unknown op type */); } } - } + }; - protected reSubmitChannelOp(type: string, content: unknown, localOpMetadata: unknown): void { - const envelope = content as IEnvelope; + protected readonly resubmitDataStoreOp = ( + envelope: IEnvelope, + localOpMetadata: unknown, + ): void => { const context = this.contexts.get(envelope.address); // If the data store has been deleted, log an error and throw an error. If there are local changes for a // deleted data store, it can otherwise lead to inconsistent state when compared to other clients. @@ -720,13 +758,13 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { }); } assert(!!context, 0x160 /* "There should be a store context for the op" */); - const innerContents = envelope.contents as FluidDataStoreMessage; - context.reSubmit(innerContents.type, innerContents.content, localOpMetadata); - } + context.reSubmit(envelope.contents, localOpMetadata); + }; - public rollback(type: string, content: unknown, localOpMetadata: unknown): void { - assert(type === ContainerMessageType.FluidDataStoreOp, 0x8e8 /* type */); - const envelope = content as IEnvelope; + public readonly rollbackDataStoreOp = ( + envelope: IEnvelope, + localOpMetadata: unknown, + ): void => { const context = this.contexts.get(envelope.address); // If the data store has been deleted, log an error and throw an error. If there are local changes for a // deleted data store, it can otherwise lead to inconsistent state when compared to other clients. @@ -739,9 +777,8 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { }); } assert(!!context, 0x2e8 /* "There should be a store context for the op" */); - const innerContents = envelope.contents as FluidDataStoreMessage; - context.rollback(innerContents.type, innerContents.content, localOpMetadata); - } + context.rollback(envelope.contents, localOpMetadata); + }; public async applyStashedOp(content: unknown): Promise { const opContents = content as LocalContainerRuntimeMessage; @@ -875,7 +912,7 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { * like merge tree or shared tree can process ops more efficiently when they are bunched together. */ for (const { contents, ...restOfMessagesContent } of messagesContent) { - const contentsEnvelope = contents as IEnvelope; + const contentsEnvelope = contents as IEnvelope; const address = contentsEnvelope.address; const context = this.contexts.get(address); @@ -902,8 +939,7 @@ export class ChannelCollection implements IFluidDataStoreChannel, IDisposable { ); } - const { type: contextType, content: contextContents } = - contentsEnvelope.contents as FluidDataStoreMessage; + const { type: contextType, content: contextContents } = contentsEnvelope.contents; // If the address or type of the message changes while processing the message, send the current bunch. if ( currentMessageState?.address !== address || @@ -1592,11 +1628,100 @@ export function detectOutboundReferences( } } +// ===================================================================== +// The code below here is for experimentation (and one test) only. + +/** + * @privateRemarks This class is only used for experimentation/testing. + * @internal + */ +export class ComposableChannelCollection + extends ChannelCollection + implements IFluidDataStoreChannel +{ + public readonly entryPoint: IFluidHandleInternal; + + public constructor( + baseSnapshot: ISnapshotTree | ISnapshot | undefined, + parentContext: IFluidParentContext, + baseLogger: ITelemetryBaseLogger, + gcNodeUpdated: (props: IGCNodeUpdatedProps) => void, + isDataStoreDeleted: (nodePath: string) => boolean, + aliasMap: Map, + provideEntryPoint: (runtime: ComposableChannelCollection) => Promise, + ) { + super( + baseSnapshot, + /* [root] parentContext */ + formParentContext(parentContext, { + submitMessage: ( + containerRuntimeMessage: + | ContainerRuntimeDataStoreOpMessage + | OutboundContainerRuntimeAttachMessage + | ContainerRuntimeAliasMessage, + localOpMetadata: unknown, + ): void => { + // Note that here our message format is reconfigured. + // While `ContainerRuntime*Message`s use `contents` + // as `FluidDataStoreMessage`s the content is + // stored in `content`. + parentContext.submitMessage( + containerRuntimeMessage.type, + containerRuntimeMessage.contents, + localOpMetadata, + ); + }, + submitSignal: (envelope: AddressedSignalEnvelope, targetClientId?: string): void => { + parentContext.submitSignal( + envelope.contents.type, + { + address: envelope.address, + contents: envelope.contents.content, + } satisfies IEnvelope, + targetClientId, + ); + }, + }), + baseLogger, + gcNodeUpdated, + isDataStoreDeleted, + aliasMap, + ); + this.entryPoint = new FluidObjectHandle( + new LazyPromise(async () => provideEntryPoint(this)), + "", + this.parentContext.IFluidHandleContext, + ); + } + + public reSubmit(type: string, content: unknown, localOpMetadata: unknown): void { + // If the cast is incorrect and type is not one of the three supported, + // reSubmitContainerMessage will assert. + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions -- Need to force conversion + const message = { + type, + contents: content, + } as + | ContainerRuntimeDataStoreOpMessage + | OutboundContainerRuntimeAttachMessage + | ContainerRuntimeAliasMessage; + this.reSubmitContainerMessage(message, localOpMetadata); + } + + public rollback(type: string, content: unknown, localOpMetadata: unknown): void { + assert(type === ContainerMessageType.FluidDataStoreOp, 0x8e8 /* type */); + this.rollbackDataStoreOp(content as IEnvelope, localOpMetadata); + } +} + /** + * @privateRemarks This factory is only used for experimentation/testing. + * * @internal */ -export class ChannelCollectionFactory - implements IFluidDataStoreFactory +export class ChannelCollectionFactory< + T extends ComposableChannelCollection = ComposableChannelCollection, +> implements IFluidDataStoreFactory { public readonly type = "ChannelCollectionChannel"; @@ -1608,7 +1733,9 @@ export class ChannelCollectionFactory Promise, - private readonly ctor: (...args: ConstructorParameters) => T, + private readonly ctor: ( + ...args: ConstructorParameters + ) => T, ) { this.IFluidDataStoreRegistry = new FluidDataStoreRegistry(registryEntries); } @@ -1623,10 +1750,10 @@ export class ChannelCollectionFactory { const runtime = this.ctor( context.baseSnapshot, - context, // parentContext + /* parentContext */ context, context.baseLogger, - () => {}, // gcNodeUpdated - (_nodePath: string) => false, // isDataStoreDeleted + /* gcNodeUpdated */ () => {}, + /* isDataStoreDeleted */ (_nodePath: string) => false, new Map(), // aliasMap this.provideEntryPoint, ); diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 412895d71695..f3b7452d9882 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -90,7 +90,6 @@ import type { IInboundSignalMessage, IRuntimeMessagesContent, ISummarizerNodeWithGC, - IFluidParentContext, } from "@fluidframework/runtime-definitions/internal"; import { FlushMode, @@ -147,10 +146,11 @@ import { loadBlobManagerLoadInfo, type IBlobManagerLoadInfo, } from "./blobManager/index.js"; +import type { AddressedSignalEnvelope, IFluidRootParentContext } from "./channelCollection.js"; import { ChannelCollection, getSummaryForDatastores, - wrapContext, + formParentContext, } from "./channelCollection.js"; import { defaultCompatibilityMode, @@ -181,11 +181,14 @@ import { import { InboundBatchAggregator } from "./inboundBatchAggregator.js"; import { ContainerMessageType, + type ContainerRuntimeAliasMessage, + type ContainerRuntimeDataStoreOpMessage, type ContainerRuntimeDocumentSchemaMessage, ContainerRuntimeGCMessage, type ContainerRuntimeIdAllocationMessage, type InboundSequencedContainerRuntimeMessage, type LocalContainerRuntimeMessage, + type OutboundContainerRuntimeAttachMessage, type UnknownContainerRuntimeMessage, } from "./messageTypes.js"; import { ISavedOpMetadata } from "./metadata.js"; @@ -708,11 +711,11 @@ export class ContainerRuntime extends TypedEventEmitter implements IContainerRuntime, + Omit, IRuntime, IGarbageCollectionRuntime, ISummarizerRuntime, ISummarizerInternalsProvider, - IFluidParentContext, IProvideFluidHandleContext, IProvideLayerCompatDetails { @@ -1096,7 +1099,9 @@ export class ContainerRuntime referenceSequenceNumber?: number, ) => number; /** - * Do not call directly - use submitAddressesSignal + * Do not call without first calling + * signalTelemetryManager.applyTrackingToBroadcastSignalEnvelope + * when targetClientId is undefined. */ private readonly submitSignalFn: (content: ISignalEnvelope, targetClientId?: string) => void; public readonly disposeFn: (error?: ICriticalContainerError) => void; @@ -1692,26 +1697,29 @@ export class ContainerRuntime async () => this.garbageCollector.getBaseGCDetails(), ); - const parentContext = wrapContext(this); + const parentContext = formParentContext(this, { + submitMessage: this.submitMessage.bind(this), + + // Due to a mismatch between different layers in terms of + // what is the interface of passing signals, we need the + // downstream stores to wrap the signal. + submitSignal: (envelope: AddressedSignalEnvelope, targetClientId?: string): void => { + // verifyNotClosed is called in FluidDataStoreContext, which is *the* expected caller. + assert( + !envelope.address.startsWith("/"), + "Addresses beginning with '/' are reserved for container use", + ); + if (targetClientId === undefined) { + this.signalTelemetryManager.applyTrackingToBroadcastSignalEnvelope(envelope); + } + this.submitSignalFn(envelope, targetClientId); + }, + }); if (snapshotWithContents !== undefined) { this.isSnapshotInstanceOfISnapshot = true; } - // Due to a mismatch between different layers in terms of - // what is the interface of passing signals, we need the - // downstream stores to wrap the signal. - parentContext.submitSignal = (type: string, content: unknown, targetClientId?: string) => { - // Future: Can the `content` argument type be IEnvelope? - // verifyNotClosed is called in FluidDataStoreContext, which is *the* expected caller. - const envelope1 = content as IEnvelope; - const envelope2 = createNewSignalEnvelope(envelope1.address, type, envelope1.contents); - if (targetClientId === undefined) { - this.signalTelemetryManager.applyTrackingToBroadcastSignalEnvelope(envelope2); - } - this.submitSignalFn(envelope2, targetClientId); - }; - let snapshot: ISnapshot | ISnapshotTree | undefined = getSummaryForDatastores( baseSnapshot, metadata, @@ -1734,7 +1742,6 @@ export class ContainerRuntime }), (path: string) => this.garbageCollector.isNodeDeleted(path), new Map(dataStoreAliasMap), - async (runtime: ChannelCollection) => provideEntryPoint, ); this.blobManager = new BlobManager({ @@ -4213,17 +4220,13 @@ export class ContainerRuntime } public submitMessage( - type: - | ContainerMessageType.FluidDataStoreOp - | ContainerMessageType.Alias - | ContainerMessageType.Attach, - // TODO: better typing - // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/no-explicit-any - contents: any, + containerRuntimeMessage: + | ContainerRuntimeDataStoreOpMessage + | OutboundContainerRuntimeAttachMessage + | ContainerRuntimeAliasMessage, localOpMetadata: unknown = undefined, ): void { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - this.submit({ type, contents }, localOpMetadata); + this.submit(containerRuntimeMessage, localOpMetadata); } public async uploadBlob( @@ -4461,7 +4464,7 @@ export class ContainerRuntime case ContainerMessageType.Alias: { // For Operations, call resubmitDataStoreOp which will find the right store // and trigger resubmission on it. - this.channelCollection.reSubmit(message.type, message.contents, localOpMetadata); + this.channelCollection.reSubmitContainerMessage(message, localOpMetadata); break; } case ContainerMessageType.IdAllocation: { @@ -4506,7 +4509,7 @@ export class ContainerRuntime case ContainerMessageType.FluidDataStoreOp: { // For operations, call rollbackDataStoreOp which will find the right store // and trigger rollback on it. - this.channelCollection.rollback(type, contents, localOpMetadata); + this.channelCollection.rollbackDataStoreOp(contents, localOpMetadata); break; } default: { @@ -4755,7 +4758,7 @@ export class ContainerRuntime } } -export function createNewSignalEnvelope( +function createNewSignalEnvelope( address: string | undefined, type: string, content: unknown, diff --git a/packages/runtime/container-runtime/src/dataStore.ts b/packages/runtime/container-runtime/src/dataStore.ts index a02c45cae9ee..5c1ca321fcb7 100644 --- a/packages/runtime/container-runtime/src/dataStore.ts +++ b/packages/runtime/container-runtime/src/dataStore.ts @@ -24,6 +24,7 @@ import { ContainerMessageType } from "./messageTypes.js"; /** * Interface for an op to be used for assigning an * alias to a datastore + * @internal */ export interface IDataStoreAliasMessage { /** @@ -140,7 +141,10 @@ class DataStore implements IDataStore { } const aliased = await this.ackBasedPromise((resolve) => { - this.parentContext.submitMessage(ContainerMessageType.Alias, message, resolve); + this.parentContext.submitMessage( + { type: ContainerMessageType.Alias, contents: message }, + resolve, + ); }) .catch((error) => { this.logger.sendErrorEvent( diff --git a/packages/runtime/container-runtime/src/dataStoreContext.ts b/packages/runtime/container-runtime/src/dataStoreContext.ts index 0d92b7a379ed..8e7d296ed6a4 100644 --- a/packages/runtime/container-runtime/src/dataStoreContext.ts +++ b/packages/runtime/container-runtime/src/dataStoreContext.ts @@ -33,7 +33,8 @@ import { readAndParse, } from "@fluidframework/driver-utils/internal"; import type { IIdCompressor } from "@fluidframework/id-compressor"; -import { +import type { + FluidDataStoreMessage, ISummaryTreeWithStats, ITelemetryContext, IGarbageCollectionData, @@ -53,12 +54,12 @@ import { ISummarizeResult, ISummarizerNodeWithGC, SummarizeInternalFn, - channelsTreeName, IInboundSignalMessage, - type IPendingMessagesState, - type IRuntimeMessageCollection, - type IFluidDataStoreFactory, + IPendingMessagesState, + IRuntimeMessageCollection, + IFluidDataStoreFactory, } from "@fluidframework/runtime-definitions/internal"; +import { channelsTreeName } from "@fluidframework/runtime-definitions/internal"; import { addBlobToSummary, isSnapshotFetchRequiredForLoadingGroupId, @@ -956,19 +957,19 @@ export abstract class FluidDataStoreContext return {}; } - public reSubmit(type: string, contents: unknown, localOpMetadata: unknown): void { + public reSubmit(message: FluidDataStoreMessage, localOpMetadata: unknown): void { assert(!!this.channel, 0x14b /* "Channel must exist when resubmitting ops" */); - this.channel.reSubmit(type, contents, localOpMetadata); + this.channel.reSubmit(message.type, message.content, localOpMetadata); } - public rollback(type: string, contents: unknown, localOpMetadata: unknown): void { + public rollback(message: FluidDataStoreMessage, localOpMetadata: unknown): void { if (!this.channel) { throw new Error("Channel must exist when rolling back ops"); } if (!this.channel.rollback) { throw new Error("Channel doesn't support rollback"); } - this.channel.rollback(type, contents, localOpMetadata); + this.channel.rollback(message.type, message.content, localOpMetadata); } public async applyStashedOp(contents: unknown): Promise { diff --git a/packages/runtime/container-runtime/src/index.ts b/packages/runtime/container-runtime/src/index.ts index a7a5a7e2bf4b..56586dc5aa18 100644 --- a/packages/runtime/container-runtime/src/index.ts +++ b/packages/runtime/container-runtime/src/index.ts @@ -20,12 +20,19 @@ export { export { CompressionAlgorithms, disabledCompressionConfig } from "./compressionDefinitions.js"; export { ContainerMessageType, + ContainerRuntimeAliasMessage, + ContainerRuntimeDataStoreOpMessage, + InternalUtilityTypes, + OutboundContainerRuntimeAttachMessage, UnknownContainerRuntimeMessage, } from "./messageTypes.js"; export { IBlobManagerLoadInfo } from "./blobManager/index.js"; +export type { IDataStoreAliasMessage } from "./dataStore.js"; export { FluidDataStoreRegistry } from "./dataStoreRegistry.js"; export { detectOutboundReferences, + type AddressedSignalEnvelope, + type IFluidRootParentContext, ChannelCollectionFactory, AllowTombstoneRequestHeaderKey, } from "./channelCollection.js"; @@ -107,7 +114,7 @@ export { DefaultSummaryConfiguration, } from "./summary/index.js"; export { IChunkedOp, unpackRuntimeMessage } from "./opLifecycle/index.js"; -export { ChannelCollection } from "./channelCollection.js"; +export { ChannelCollection, ComposableChannelCollection } from "./channelCollection.js"; export { IFluidDataStoreContextInternal, ISnapshotDetails, diff --git a/packages/runtime/container-runtime/src/messageTypes.ts b/packages/runtime/container-runtime/src/messageTypes.ts index 755f00a5d494..53fd30c5174e 100644 --- a/packages/runtime/container-runtime/src/messageTypes.ts +++ b/packages/runtime/container-runtime/src/messageTypes.ts @@ -5,7 +5,8 @@ import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; import type { IdCreationRange } from "@fluidframework/id-compressor/internal"; -import { +import type { + FluidDataStoreMessage, IAttachMessage, IEnvelope, InboundAttachMessage, @@ -59,63 +60,88 @@ export enum ContainerMessageType { } /** - * The unpacked runtime message / details to be handled or dispatched by the ContainerRuntime. - * Message type are differentiated via a `type` string and contain different contents depending on their type. - * - * IMPORTANT: when creating one to be serialized, set the properties in the order they appear here. - * This way stringified values can be compared. + * @internal */ -interface TypedContainerRuntimeMessage { - /** - * Type of the op, within the ContainerRuntime's domain - */ - type: TType; +// eslint-disable-next-line @typescript-eslint/no-namespace +export namespace InternalUtilityTypes { /** - * Domain-specific contents, interpreted according to the type + * The unpacked runtime message / details to be handled or dispatched by the ContainerRuntime. + * Message type are differentiated via a `type` string and contain different contents depending on their type. + * + * IMPORTANT: when creating one to be serialized, set the properties in the order they appear here. + * This way stringified values can be compared. */ - contents: TContents; + export interface TypedContainerRuntimeMessage< + TType extends ContainerMessageType, + TContents, + > { + /** + * Type of the op, within the ContainerRuntime's domain + */ + type: TType; + /** + * Domain-specific contents, interpreted according to the type + */ + contents: TContents; + } } -export type ContainerRuntimeDataStoreOpMessage = TypedContainerRuntimeMessage< - ContainerMessageType.FluidDataStoreOp, - IEnvelope ->; -export type InboundContainerRuntimeAttachMessage = TypedContainerRuntimeMessage< - ContainerMessageType.Attach, - InboundAttachMessage ->; -export type OutboundContainerRuntimeAttachMessage = TypedContainerRuntimeMessage< - ContainerMessageType.Attach, - IAttachMessage ->; -export type ContainerRuntimeChunkedOpMessage = TypedContainerRuntimeMessage< - ContainerMessageType.ChunkedOp, - IChunkedOp ->; -export type ContainerRuntimeBlobAttachMessage = TypedContainerRuntimeMessage< - ContainerMessageType.BlobAttach, - undefined ->; -export type ContainerRuntimeRejoinMessage = TypedContainerRuntimeMessage< +/** + * @internal + */ +export type ContainerRuntimeDataStoreOpMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.FluidDataStoreOp, + IEnvelope + >; +export type InboundContainerRuntimeAttachMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.Attach, + InboundAttachMessage + >; +/** + * @internal + */ +export type OutboundContainerRuntimeAttachMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.Attach, + IAttachMessage + >; +export type ContainerRuntimeChunkedOpMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.ChunkedOp, + IChunkedOp + >; +export type ContainerRuntimeBlobAttachMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.BlobAttach, + undefined + >; +export type ContainerRuntimeRejoinMessage = InternalUtilityTypes.TypedContainerRuntimeMessage< ContainerMessageType.Rejoin, undefined >; -export type ContainerRuntimeAliasMessage = TypedContainerRuntimeMessage< +/** + * @internal + */ +export type ContainerRuntimeAliasMessage = InternalUtilityTypes.TypedContainerRuntimeMessage< ContainerMessageType.Alias, IDataStoreAliasMessage >; -export type ContainerRuntimeIdAllocationMessage = TypedContainerRuntimeMessage< - ContainerMessageType.IdAllocation, - IdCreationRange ->; -export type ContainerRuntimeGCMessage = TypedContainerRuntimeMessage< +export type ContainerRuntimeIdAllocationMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.IdAllocation, + IdCreationRange + >; +export type ContainerRuntimeGCMessage = InternalUtilityTypes.TypedContainerRuntimeMessage< ContainerMessageType.GC, GarbageCollectionMessage >; -export type ContainerRuntimeDocumentSchemaMessage = TypedContainerRuntimeMessage< - ContainerMessageType.DocumentSchemaChange, - IDocumentSchemaChangeMessage ->; +export type ContainerRuntimeDocumentSchemaMessage = + InternalUtilityTypes.TypedContainerRuntimeMessage< + ContainerMessageType.DocumentSchemaChange, + IDocumentSchemaChangeMessage + >; /** * Represents an unrecognized TypedContainerRuntimeMessage, e.g. a message from a future version of the container runtime. @@ -136,7 +162,7 @@ export interface UnknownContainerRuntimeMessage { } /** - * A {@link TypedContainerRuntimeMessage} that is received from the server and will be processed by the container runtime. + * A {@link InternalUtilityTypes.TypedContainerRuntimeMessage} that is received from the server and will be processed by the container runtime. */ export type InboundContainerRuntimeMessage = | ContainerRuntimeDataStoreOpMessage @@ -152,7 +178,7 @@ export type InboundContainerRuntimeMessage = | UnknownContainerRuntimeMessage; /** - * A {@link TypedContainerRuntimeMessage} that has been generated by the container runtime, eventually to be sent to the ordering service. + * A {@link InternalUtilityTypes.TypedContainerRuntimeMessage} that has been generated by the container runtime, eventually to be sent to the ordering service. * These are messages generated by the local runtime, before the outbox's op virtualization step. */ export type LocalContainerRuntimeMessage = diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 5440c7626125..5ce340644fa2 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -41,20 +41,22 @@ import { type IDocumentAttributes, SummaryType, } from "@fluidframework/driver-definitions/internal"; -import { +import type { + FluidDataStoreMessage, ISummaryTreeWithStats, FluidDataStoreRegistryEntry, - FlushMode, - FlushModeExperimental, IFluidDataStoreContext, IFluidDataStoreFactory, IFluidDataStoreRegistry, NamedFluidDataStoreRegistryEntries, - type IRuntimeMessageCollection, - type ISequencedMessageEnvelope, - type IEnvelope, - type ITelemetryContext, - type ISummarizeInternalResult, + IRuntimeMessageCollection, + ISequencedMessageEnvelope, + ITelemetryContext, + ISummarizeInternalResult, +} from "@fluidframework/runtime-definitions/internal"; +import { + FlushMode, + FlushModeExperimental, } from "@fluidframework/runtime-definitions/internal"; import { IFluidErrorBase, @@ -102,17 +104,24 @@ import { type IRefreshSummaryAckOptions, } from "../summary/index.js"; +const testDataStoreMessage = { + type: "op", + content: { address: "test-address", contents: "test-contents" }, +} satisfies FluidDataStoreMessage; + function submitDataStoreOp( runtime: Pick, id: string, - contents: unknown, + contents: FluidDataStoreMessage, localOpMetadata?: unknown, -) { +): void { runtime.submitMessage( - ContainerMessageType.FluidDataStoreOp, { - address: id, - contents, + type: ContainerMessageType.FluidDataStoreOp, + contents: { + address: id, + contents, + }, }, localOpMetadata, ); @@ -122,7 +131,7 @@ const changeConnectionState = ( runtime: Omit, connected: boolean, clientId: string, -) => { +): void => { const audience = runtime.getAudience() as MockAudience; audience.setCurrentClientId(clientId); @@ -155,19 +164,12 @@ function isSignalEnvelope(obj: unknown): obj is ISignalEnvelope { } function defineResubmitAndSetConnectionState(containerRuntime: ContainerRuntime): void { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Modifying private property + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access -- Modifying private property (containerRuntime as any).channelCollection = { setConnectionState: (_connected: boolean, _clientId?: string) => {}, // Pass data store op right back to ContainerRuntime - reSubmit: (type: string, envelope: IEnvelope, localOpMetadata: unknown) => { - submitDataStoreOp( - containerRuntime, - envelope.address, - envelope.contents, - localOpMetadata, - ); - }, - } as ChannelCollection; + reSubmitContainerMessage: containerRuntime.submitMessage.bind(containerRuntime), + } satisfies Partial; } describe("Runtime", () => { @@ -357,7 +359,7 @@ describe("Runtime", () => { changeConnectionState(containerRuntime, false, mockClientId); // Not connected, so nothing is submitted on flush - just queued in PendingStateManager - submitDataStoreOp(containerRuntime, "1", "test", { emptyBatch: true }); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage, { emptyBatch: true }); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); changeConnectionState(containerRuntime, true, mockClientId); @@ -402,11 +404,11 @@ describe("Runtime", () => { changeConnectionState(containerRuntime, false, mockClientId); // Not connected, so nothing is submitted on flush - just queued in PendingStateManager - submitDataStoreOp(containerRuntime, "1", "test"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); - submitDataStoreOp(containerRuntime, "2", "test"); + submitDataStoreOp(containerRuntime, "2", testDataStoreMessage); changeConnectionState(containerRuntime, true, mockClientId); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); @@ -417,7 +419,7 @@ describe("Runtime", () => { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access assert.strictEqual(submittedOps[1].contents.address, "2"); - function batchIdMatchesUnsentFormat(batchId?: string) { + function batchIdMatchesUnsentFormat(batchId?: string): boolean { return ( batchId !== undefined && batchId.length === "00000000-0000-0000-0000-000000000000_[-1]".length && @@ -483,7 +485,7 @@ describe("Runtime", () => { }); // Submit the first message - submitDataStoreOp(containerRuntime, "1", "testMessage1"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); assert.strictEqual(submittedOps.length, 0, "No ops submitted yet"); // Bump lastSequenceNumber and trigger the "op" event artificially to simulate processing a non-runtime op @@ -507,7 +509,10 @@ describe("Runtime", () => { // Submit the second message // When [skipSafetyFlushDuringProcessStack: TRUE], this will trigger a flush via Outbox.maybeFlushPartialBatch - submitDataStoreOp(containerRuntime, "2", "testMessage2"); + submitDataStoreOp(containerRuntime, "2", { + type: "op", + content: { address: "test-address", contents: "test-contents2" }, + }); assert.equal( submittedOps.length, 1, @@ -699,9 +704,9 @@ describe("Runtime", () => { it("Batching property set properly", () => { containerRuntime.orderSequentially(() => { - submitDataStoreOp(containerRuntime, "1", "test"); - submitDataStoreOp(containerRuntime, "2", "test"); - submitDataStoreOp(containerRuntime, "3", "test"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "2", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "3", testDataStoreMessage); }); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); @@ -732,17 +737,17 @@ describe("Runtime", () => { changeConnectionState(containerRuntime, false, fakeClientId); containerRuntime.orderSequentially(() => { - submitDataStoreOp(containerRuntime, "1", "test"); - submitDataStoreOp(containerRuntime, "2", "test"); - submitDataStoreOp(containerRuntime, "3", "test"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "2", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "3", testDataStoreMessage); }); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); containerRuntime.orderSequentially(() => { - submitDataStoreOp(containerRuntime, "4", "test"); - submitDataStoreOp(containerRuntime, "5", "test"); - submitDataStoreOp(containerRuntime, "6", "test"); + submitDataStoreOp(containerRuntime, "4", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "5", testDataStoreMessage); + submitDataStoreOp(containerRuntime, "6", testDataStoreMessage); }); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); @@ -1291,14 +1296,7 @@ describe("Runtime", () => { patched.channelCollection = { setConnectionState: (_connected: boolean, _clientId?: string) => {}, // Pass data store op right back to ContainerRuntime - reSubmit: (type: string, envelope: IEnvelope, localOpMetadata: unknown) => { - submitDataStoreOp( - containerRuntime, - envelope.address, - envelope.contents, - localOpMetadata, - ); - }, + reSubmitContainerMessage: containerRuntime.submitMessage.bind(containerRuntime), } satisfies Partial; return patched; @@ -1309,14 +1307,14 @@ describe("Runtime", () => { changeConnectionState(patchedContainerRuntime, false, mockClientId); - submitDataStoreOp(patchedContainerRuntime, "1", "test"); - submitDataStoreOp(patchedContainerRuntime, "2", "test"); + submitDataStoreOp(patchedContainerRuntime, "1", testDataStoreMessage); + submitDataStoreOp(patchedContainerRuntime, "2", testDataStoreMessage); patchedContainerRuntime.submit({ // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment type: "FUTURE_TYPE" as any, contents: "3", }); - submitDataStoreOp(patchedContainerRuntime, "4", "test"); + submitDataStoreOp(patchedContainerRuntime, "4", testDataStoreMessage); assert.strictEqual( submittedOps.length, @@ -1516,7 +1514,10 @@ describe("Runtime", () => { it("modifying op content after submit does not reflect in PendingStateManager", () => { const content = { prop1: 1 }; - submitDataStoreOp(containerRuntime, "1", content); + submitDataStoreOp(containerRuntime, "1", { + type: "op", + content: { address: "test", contents: content }, + }); // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call (containerRuntime as any).flush(); @@ -1530,7 +1531,8 @@ describe("Runtime", () => { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access JSON.parse(state?.pendingStates?.[0].content).contents.contents, { - prop1: 1, + type: "op", + content: { address: "test", contents: { prop1: 1 } }, }, "content of pending local message has changed", ); @@ -1791,7 +1793,10 @@ describe("Runtime", () => { it("summary fails before generate if there are pending ops", async () => { // Submit an op and yield for it to be flushed from outbox to pending state manager. - submitDataStoreOp(containerRuntime, "fakeId", "fakeContents"); + submitDataStoreOp(containerRuntime, "fakeId", { + type: "op", + content: { address: "fakeAddress", contents: "fakeContents" }, + }); await yieldEventLoop(); const summarizeResultP = containerRuntime.submitSummary({ @@ -1824,7 +1829,10 @@ describe("Runtime", () => { const boundFn = fn.bind(containerRuntime); return async (...args: unknown[]) => { // Submit an op and yield for it to be flushed from outbox to pending state manager. - submitDataStoreOp(containerRuntime, "fakeId", "fakeContents"); + submitDataStoreOp(containerRuntime, "fakeId", { + type: "op", + content: { address: "fakeAddress", contents: "fakeContents" }, + }); await yieldEventLoop(); return boundFn(...args); @@ -3088,7 +3096,7 @@ describe("Runtime", () => { // Submit op so message is queued in PendingStateManager // This is needed to increase reconnect count - submitDataStoreOp(containerRuntime, "1", "test"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); // Disconnect + Reconnect changeConnectionState(containerRuntime, false, mockClientId); @@ -3140,7 +3148,7 @@ describe("Runtime", () => { // so that message is queued in PendingStateManager and reconnect count is increased. defineResubmitAndSetConnectionState(containerRuntime); // Send and process an initial signal to prime the system. - submitDataStoreOp(containerRuntime, "1", "test"); + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); sendSignals(1); // 1st signal (#1) processSubmittedSignals(1); diff --git a/packages/runtime/container-runtime/src/test/dataStoreContext.spec.ts b/packages/runtime/container-runtime/src/test/dataStoreContext.spec.ts index 91e03300db7f..c3a156acbf59 100644 --- a/packages/runtime/container-runtime/src/test/dataStoreContext.spec.ts +++ b/packages/runtime/container-runtime/src/test/dataStoreContext.spec.ts @@ -15,6 +15,7 @@ import { } from "@fluidframework/core-interfaces"; import { IFluidHandleContext } from "@fluidframework/core-interfaces/internal"; import { LazyPromise } from "@fluidframework/core-utils/internal"; +import type { LocalFluidDataStoreRuntimeMessage } from "@fluidframework/datastore/internal"; import { DataStoreMessageType, FluidObjectHandle } from "@fluidframework/datastore/internal"; import { ISummaryBlob, SummaryType } from "@fluidframework/driver-definitions"; import { @@ -22,10 +23,9 @@ import { IBlob, ISnapshotTree, } from "@fluidframework/driver-definitions/internal"; -import { +import type { IGarbageCollectionData, CreateChildSummarizerNodeFn, - CreateSummarizerNodeSource, IFluidDataStoreChannel, IFluidDataStoreContext, IFluidDataStoreFactory, @@ -33,8 +33,11 @@ import { IFluidParentContext, IGarbageCollectionDetailsBase, SummarizeInternalFn, + IContainerRuntimeBase, +} from "@fluidframework/runtime-definitions/internal"; +import { + CreateSummarizerNodeSource, channelsTreeName, - type IContainerRuntimeBase, } from "@fluidframework/runtime-definitions/internal"; import { GCDataBuilder, @@ -376,11 +379,14 @@ describe("Data Store Context Tests", () => { }); await localDataStoreContext.realize(); - localDataStoreContext.submitMessage( - DataStoreMessageType.ChannelOp, - "summarizer message", - {}, - ); + const message = { + type: DataStoreMessageType.ChannelOp, + content: { + address: "address", + contents: "summarizer message", + }, + } satisfies LocalFluidDataStoreRuntimeMessage; + localDataStoreContext.submitMessage(message.type, message.content, {}); const expectedEvents = [ { @@ -417,11 +423,14 @@ describe("Data Store Context Tests", () => { let eventCount = 0; for (let i = 0; i < 15; i++) { - localDataStoreContext.submitMessage( - DataStoreMessageType.ChannelOp, - `summarizer message ${i}`, - {}, - ); + const message = { + type: DataStoreMessageType.ChannelOp, + content: { + address: "address", + contents: `summarizer message ${i}`, + }, + } satisfies LocalFluidDataStoreRuntimeMessage; + localDataStoreContext.submitMessage(message.type, message.content, {}); } for (const event of mockLogger.events) { if ( diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/opDecompressor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/opDecompressor.spec.ts index 561b0487235c..596cfe2dd36c 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/opDecompressor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/opDecompressor.spec.ts @@ -7,7 +7,10 @@ import { strict as assert } from "node:assert"; import { IsoBuffer } from "@fluid-internal/client-utils"; import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; -import type { IEnvelope } from "@fluidframework/runtime-definitions/internal"; +import type { + FluidDataStoreMessage, + IEnvelope, +} from "@fluidframework/runtime-definitions/internal"; import { MockLogger } from "@fluidframework/telemetry-utils/internal"; import { compress } from "lz4js"; @@ -28,7 +31,7 @@ function generateCompressedBatchMessage(length: number): ISequencedDocumentMessa // Actual Op and contents aren't important. Values are not realistic. batch.push({ type: ContainerMessageType.FluidDataStoreOp, - contents: `value${i}` as unknown as IEnvelope, + contents: `value${i}` as unknown as IEnvelope, }); } diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/opSerialization.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/opSerialization.spec.ts index 3d6ae8d50f1d..2cb66b9cc10a 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/opSerialization.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/opSerialization.spec.ts @@ -64,7 +64,10 @@ describe("opSerialization", () => { const op: LocalContainerRuntimeMessage = { type: ContainerMessageType.FluidDataStoreOp, - contents: { address: "123", contents: { hereIsAHandle: mockHandle } }, + contents: { + address: "123", + contents: { type: "op", content: { address: "test", contents: mockHandle } }, + }, }; const serialized = serializeOp(op); @@ -86,7 +89,11 @@ describe("opSerialization", () => { contents: { address: "123", contents: { - alreadyEncodedHandle: encodeHandleForSerialization(new MockHandle({})), + type: "op", + content: { + address: "test", + contents: encodeHandleForSerialization(new MockHandle({})), + }, }, }, }; diff --git a/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md b/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md index 0ccee30275df..ef96d997f8d6 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.alpha.api.md @@ -79,7 +79,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; - rollback?(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; + rollback(type: DataStoreMessageType, content: any, localOpMetadata: unknown): void; // (undocumented) get rootRoutingContext(): this; // (undocumented) @@ -88,7 +88,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; diff --git a/packages/runtime/datastore/package.json b/packages/runtime/datastore/package.json index 94fc59092489..487d78fd629c 100644 --- a/packages/runtime/datastore/package.json +++ b/packages/runtime/datastore/package.json @@ -158,7 +158,11 @@ "typescript": "~5.4.5" }, "typeValidation": { - "broken": {}, + "broken": { + "Class_FluidDataStoreRuntime": { + "forwardCompat": false + } + }, "entrypoint": "legacy" } } diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index 290c5dabc384..ab478dc34fd8 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -112,6 +112,16 @@ export enum DataStoreMessageType { ChannelOp = "op", } +/** + * Outgoing {@link FluidDataStoreRuntime} message structures. + * @internal + * + * @privateRemarks The types here are required to satisfy {@link @fluidframework/runtime-definitions#FluidDataStoreMessage} interface. + */ +export type LocalFluidDataStoreRuntimeMessage = + | { type: DataStoreMessageType.ChannelOp; content: IEnvelope } + | { type: DataStoreMessageType.Attach; content: IAttachMessage }; + /** * @legacy * @alpha @@ -1047,8 +1057,12 @@ export class FluidDataStoreRuntime } } + /** + * @deprecated No implementation required and will be removed in 2.50. + */ public submitMessage(type: DataStoreMessageType, content: any, localOpMetadata: unknown) { - this.submit(type, content, localOpMetadata); + this.verifyNotClosed(); + this.submit({ type, content }, localOpMetadata); } /** @@ -1110,7 +1124,7 @@ export class FluidDataStoreRuntime type: channel.attributes.type, }; this.pendingAttach.add(channel.id); - this.submit(DataStoreMessageType.Attach, message); + this.submit({ type: DataStoreMessageType.Attach, content: message }); const context = this.contexts.get(channel.id) as LocalChannelContextBase; context.makeVisible(); @@ -1118,16 +1132,15 @@ export class FluidDataStoreRuntime private submitChannelOp(address: string, contents: any, localOpMetadata: unknown) { const envelope: IEnvelope = { address, contents }; - this.submit(DataStoreMessageType.ChannelOp, envelope, localOpMetadata); + this.verifyNotClosed(); + this.submit({ type: DataStoreMessageType.ChannelOp, content: envelope }, localOpMetadata); } private submit( - type: DataStoreMessageType, - content: any, + message: LocalFluidDataStoreRuntimeMessage, localOpMetadata: unknown = undefined, ): void { - this.verifyNotClosed(); - this.dataStoreContext.submitMessage(type, content, localOpMetadata); + this.dataStoreContext.submitMessage(message.type, message.content, localOpMetadata); } /** @@ -1136,6 +1149,11 @@ export class FluidDataStoreRuntime * This typically happens when we reconnect and there are unacked messages. * @param content - The content of the original message. * @param localOpMetadata - The local metadata associated with the original message. + * + * @privateRemarks + * `type` parameter's type of `DataStoreMessageType` is a covariance exception + * over `string` that `IFluidDataStoreChannel` specifies. (`unreachableCase` + * might be reachable over time without better typing in this area.) */ public reSubmit(type: DataStoreMessageType, content: any, localOpMetadata: unknown) { this.verifyNotClosed(); @@ -1151,7 +1169,7 @@ export class FluidDataStoreRuntime } case DataStoreMessageType.Attach: // For Attach messages, just submit them again. - this.submit(type, content, localOpMetadata); + this.submit({ type, content }, localOpMetadata); break; default: unreachableCase(type); @@ -1162,8 +1180,12 @@ export class FluidDataStoreRuntime * Revert a local op. * @param content - The content of the original message. * @param localOpMetadata - The local metadata associated with the original message. + * + * @privateRemarks + * `type` parameter's type of `DataStoreMessageType` is a covariance exception + * over `string` that `IFluidDataStoreChannel` specifies. */ - public rollback?(type: DataStoreMessageType, content: any, localOpMetadata: unknown) { + public rollback(type: DataStoreMessageType, content: any, localOpMetadata: unknown) { this.verifyNotClosed(); switch (type) { diff --git a/packages/runtime/datastore/src/index.ts b/packages/runtime/datastore/src/index.ts index efafb3267415..1cf3506b5000 100644 --- a/packages/runtime/datastore/src/index.ts +++ b/packages/runtime/datastore/src/index.ts @@ -8,6 +8,7 @@ export { DataStoreMessageType, FluidDataStoreRuntime, ISharedObjectRegistry, + type LocalFluidDataStoreRuntimeMessage, mixinRequestHandler, mixinSummaryHandler, } from "./dataStoreRuntime.js"; diff --git a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts index bab8d7182452..5fe3440cb5cc 100644 --- a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts +++ b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts @@ -22,6 +22,7 @@ declare type MakeUnusedImportErrorsGoAway = TypeOnly | MinimalType | Fu * typeValidation.broken: * "Class_FluidDataStoreRuntime": {"forwardCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type old_as_current_for_Class_FluidDataStoreRuntime = requireAssignableTo, TypeOnly> /* diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index d497b1a40dff..be8eb164b332 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -108,9 +108,9 @@ export interface IDataStore { } // @alpha -export interface IEnvelope { +export interface IEnvelope { address: string; - contents: any; + contents: TContents; } // @alpha @@ -133,11 +133,11 @@ export interface IFluidDataStoreChannel extends IDisposable { processSignal(message: IInboundSignalMessage, local: boolean): void; // (undocumented) request(request: IRequest): Promise; - reSubmit(type: string, content: any, localOpMetadata: unknown): any; + reSubmit(type: string, content: any, localOpMetadata: unknown): void; rollback?(type: string, content: any, localOpMetadata: unknown): void; // (undocumented) setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; - setConnectionState(connected: boolean, clientId?: string): any; + setConnectionState(connected: boolean, clientId?: string): void; summarize(fullTree?: boolean, trackState?: boolean, telemetryContext?: ITelemetryContext): Promise; updateUsedRoutes(usedRoutes: string[]): void; } diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index 5530e9d02f94..11546c0f835b 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -368,7 +368,7 @@ export interface IFluidDataStoreChannel extends IDisposable { * @param clientId - ID of the client. It's old ID when in disconnected state and * it's new client ID when we are connecting or connected. */ - setConnectionState(connected: boolean, clientId?: string); + setConnectionState(connected: boolean, clientId?: string): void; /** * Ask the DDS to resubmit a message. This could be because we reconnected and this message was not acked. @@ -377,7 +377,7 @@ export interface IFluidDataStoreChannel extends IDisposable { * @param localOpMetadata - The local metadata associated with the original message. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change - reSubmit(type: string, content: any, localOpMetadata: unknown); + reSubmit(type: string, content: any, localOpMetadata: unknown): void; // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change applyStashedOp(content: any): Promise; diff --git a/packages/runtime/runtime-definitions/src/index.ts b/packages/runtime/runtime-definitions/src/index.ts index 14178798eaa6..988865e34759 100644 --- a/packages/runtime/runtime-definitions/src/index.ts +++ b/packages/runtime/runtime-definitions/src/index.ts @@ -45,6 +45,7 @@ export { gcTreeKey, } from "./garbageCollectionDefinitions.js"; export type { + FluidDataStoreMessage, IAttachMessage, IEnvelope, IInboundSignalMessage, diff --git a/packages/runtime/runtime-definitions/src/protocol.ts b/packages/runtime/runtime-definitions/src/protocol.ts index 762aaf02c2a5..84553ec05d7f 100644 --- a/packages/runtime/runtime-definitions/src/protocol.ts +++ b/packages/runtime/runtime-definitions/src/protocol.ts @@ -14,7 +14,8 @@ import type { * @legacy * @alpha */ -export interface IEnvelope { +// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change +export interface IEnvelope { /** * The target for the envelope */ @@ -23,8 +24,7 @@ export interface IEnvelope { /** * The contents of the envelope */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO (#28746): breaking change - contents: any; + contents: TContents; } /** @@ -126,3 +126,24 @@ export interface IRuntimeMessageCollection { */ readonly messagesContent: readonly IRuntimeMessagesContent[]; } + +/** + * Outgoing {@link IFluidDataStoreChannel} message structures. + * @internal + * + * @privateRemarks + * Future use opportunity: + * - Change {@link IFluidDataStoreChannel} and {@link IFluidParentContext}, + * to have a generic specifying `T extends FluidDataStoreMessage` and uses + * `T["type"]` and `T["content"]` to qualify message related methods, + * preferably where `submitMessage`, `reSubmit`, and `rollback` have + * overloads to ensure callers pair values correctly. + * - A further improvement would be to reshape `submitMessage`, `reSubmit`, + * and `rollback` to accept `T` as `message` parameter instead of `type` + * and `content` parameters that are hard to convince TypeScript must be + * paired in implementations. + */ +export interface FluidDataStoreMessage { + type: string; + content: unknown; +} diff --git a/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md b/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md index d2e38472f9b5..e699c307dde6 100644 --- a/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md +++ b/packages/runtime/test-runtime-utils/api-report/test-runtime-utils.legacy.alpha.api.md @@ -360,9 +360,9 @@ export class MockFluidDataStoreContext implements IFluidDataStoreContext { // (undocumented) storage: IDocumentStorageService; // (undocumented) - submitMessage(type: string, content: any, localOpMetadata: unknown): void; + submitMessage(): void; // (undocumented) - submitSignal(type: string, content: any): void; + submitSignal(): void; // (undocumented) uploadBlob(blob: ArrayBufferLike): Promise>; } @@ -485,10 +485,10 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void; // (undocumented) setConnectionState(connected: boolean, clientId?: string): void; - // (undocumented) + // @deprecated (undocumented) submitMessage(type: MessageType, content: any): null; // (undocumented) - submitSignal(type: string, content: any): null; + submitSignal: IFluidDataStoreRuntime["submitSignal"]; // (undocumented) summarize(fullTree?: boolean, trackState?: boolean): Promise; // (undocumented) diff --git a/packages/runtime/test-runtime-utils/src/mocks.ts b/packages/runtime/test-runtime-utils/src/mocks.ts index 15636ce00a6a..3f9f95bd3589 100644 --- a/packages/runtime/test-runtime-utils/src/mocks.ts +++ b/packages/runtime/test-runtime-utils/src/mocks.ts @@ -990,6 +990,9 @@ export class MockFluidDataStoreRuntime return null; } + /** + * @deprecated No implementation required and will be removed in 2.50. + */ public submitMessage(type: MessageType, content: any) { return null; } @@ -1010,9 +1013,7 @@ export class MockFluidDataStoreRuntime return this.containerRuntime.dirty(); } - public submitSignal(type: string, content: any) { - return null; - } + public submitSignal: IFluidDataStoreRuntime["submitSignal"] = () => null; public processMessages(messageCollection: IRuntimeMessageCollection) { if (this.disposed) { diff --git a/packages/runtime/test-runtime-utils/src/mocksDataStoreContext.ts b/packages/runtime/test-runtime-utils/src/mocksDataStoreContext.ts index 33126e144086..898b1bb5f540 100644 --- a/packages/runtime/test-runtime-utils/src/mocksDataStoreContext.ts +++ b/packages/runtime/test-runtime-utils/src/mocksDataStoreContext.ts @@ -109,11 +109,11 @@ export class MockFluidDataStoreContext implements IFluidDataStoreContext { return undefined as any as IAudience; } - public submitMessage(type: string, content: any, localOpMetadata: unknown): void { + public submitMessage(): void { throw new Error("Method not implemented."); } - public submitSignal(type: string, content: any): void { + public submitSignal(): void { throw new Error("Method not implemented."); } diff --git a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts index 441a846d0cbd..8eb58fdb2b74 100644 --- a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts +++ b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts @@ -24,7 +24,11 @@ import { LocalResolver, } from "@fluidframework/local-driver/internal"; import { SharedDirectory, type ISharedMap, SharedMap } from "@fluidframework/map/internal"; -import { FlushMode, IEnvelope } from "@fluidframework/runtime-definitions/internal"; +import { + type FluidDataStoreMessage, + FlushMode, + IEnvelope, +} from "@fluidframework/runtime-definitions/internal"; import { SharedString } from "@fluidframework/sequence/internal"; import { ILocalDeltaConnectionServer, @@ -135,19 +139,20 @@ describe("Ops on Reconnect", () => { "op", (message: ISequencedDocumentMessage) => { if (message.type === ContainerMessageType.FluidDataStoreOp) { - const envelope = message.contents as IEnvelope; - const address = envelope.contents.content.address; - const content = envelope.contents.content.contents; + const envelope = message.contents as IEnvelope; + // explicitly using `any` even if IEnvelope default is later changed to unknown. + // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-arguments + const { address, contents } = envelope.contents.content as IEnvelope; const batch = (message.metadata as { batch?: unknown } | undefined)?.batch; let value1: string | number; let value2: string; // Add special handling for SharedString. SharedMap and SharedDirectory content structure is same. if (address === stringId) { - value1 = content.pos1; - value2 = content.seg; + value1 = contents.pos1; + value2 = contents.seg; } else { - value1 = content.key; - value2 = content.value.value; + value1 = contents.key; + value2 = contents.value.value; } receivedValues.push([value1, value2, batch]); } diff --git a/packages/test/test-end-to-end-tests/src/test/dataStoresNested.spec.ts b/packages/test/test-end-to-end-tests/src/test/dataStoresNested.spec.ts index b4fcb8e66937..8dfb25a66d76 100644 --- a/packages/test/test-end-to-end-tests/src/test/dataStoresNested.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/dataStoresNested.spec.ts @@ -8,7 +8,7 @@ import { describeCompat } from "@fluid-private/test-version-utils"; import { IContainer, IHostLoader } from "@fluidframework/container-definitions/internal"; import { Loader } from "@fluidframework/container-loader/internal"; import { - ChannelCollection, + ComposableChannelCollection, ChannelCollectionFactory, ISummarizer, SummaryCollection, @@ -66,8 +66,8 @@ describeCompat("Nested DataStores", "NoCompat", (getTestObjectProvider, apis) => const dataStoreFactory = new ChannelCollectionFactory( [[testObjectFactory.type, Promise.resolve(testObjectFactory)]], async (runtime: IFluidDataStoreChannel) => runtime, - (...args: ConstructorParameters) => - new ChannelCollection(...args), + (...args: ConstructorParameters) => + new ComposableChannelCollection(...args), ); const runtimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({