Skip to content

feat(client-presence): Create Acknowledgment message interface and ackRequested flow #24470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/framework/presence/src/datastorePresenceManagerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Hacky support for internal datastore based usages.
*/

import { createEmitter } from "@fluid-internal/client-utils";
import { createEmitter, type ILayerCompatDetails } from "@fluid-internal/client-utils";
import type {
ExtensionHostEvents,
RawInboundExtensionMessage,
Expand All @@ -21,6 +21,7 @@ import { BasicDataStoreFactory, LoadableFluidObject } from "./datastoreSupport.j
import type { PresenceWithNotifications as Presence } from "./presence.js";
import { createPresenceManager } from "./presenceManager.js";
import type {
OutboundAcknowledgementMessage,
OutboundClientJoinMessage,
OutboundDatastoreUpdateMessage,
SignalMessages,
Expand Down Expand Up @@ -60,8 +61,14 @@ class PresenceManagerDataObject extends LoadableFluidObject {
events,
getQuorum: runtime.getQuorum.bind(runtime),
getAudience: runtime.getAudience.bind(runtime),
submitSignal: (message: OutboundClientJoinMessage | OutboundDatastoreUpdateMessage) =>
runtime.submitSignal(message.type, message.content, message.targetClientId),
submitSignal: (
message:
| OutboundAcknowledgementMessage
| OutboundClientJoinMessage
| OutboundDatastoreUpdateMessage,
) => runtime.submitSignal(message.type, message.content, message.targetClientId),
supportedFeatures:
(runtime.ILayerCompatDetails as ILayerCompatDetails)?.supportedFeatures ?? new Set(),
});
this.runtime.on("signal", (message: IInboundSignalMessage, local: boolean) => {
assertSignalMessageIsValid(message);
Expand Down
13 changes: 8 additions & 5 deletions packages/framework/presence/src/experimentalAccess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ class ContainerPresenceManager
private readonly manager: PresenceExtensionInterface;

public constructor(host: ExtensionHost) {
this.interface = this.manager = createPresenceManager({
...host,
submitSignal: (message) => {
host.submitAddressedSignal([], message);
this.interface = this.manager = createPresenceManager(
{
...host,
submitSignal: (message) => {
host.submitAddressedSignal([], message);
},
},
});
host.supportedFeatures.has("submit_signals_v2"),
);
}

public onNewUse(): void {
Expand Down
6 changes: 5 additions & 1 deletion packages/framework/presence/src/internalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ExtensionHost as ContainerExtensionHost } from "@fluidframework/co
import type { InternalTypes } from "./exposedInternalTypes.js";
import type { AttendeeId, Attendee } from "./presence.js";
import type {
OutboundAcknowledgementMessage,
OutboundClientJoinMessage,
OutboundDatastoreUpdateMessage,
SignalMessages,
Expand Down Expand Up @@ -55,7 +56,10 @@ export type IEphemeralRuntime = Omit<ExtensionHost, "logger" | "submitAddressedS
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
submitSignal: (
message: OutboundClientJoinMessage | OutboundDatastoreUpdateMessage,
message:
| OutboundAcknowledgementMessage
| OutboundClientJoinMessage
| OutboundDatastoreUpdateMessage,
) => void;
};

Expand Down
23 changes: 21 additions & 2 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ import type {
SignalMessages,
SystemDatastore,
} from "./protocol.js";
import { datastoreUpdateMessageType, joinMessageType } from "./protocol.js";
import {
acknowledgementMessageType,
datastoreUpdateMessageType,
joinMessageType,
} from "./protocol.js";
import type { SystemWorkspaceDatastore } from "./systemWorkspace.js";
import { TimerManager } from "./timerManager.js";
import type {
Expand Down Expand Up @@ -65,7 +69,11 @@ const internalWorkspaceTypes: Readonly<Record<string, "States" | "Notifications"
n: "Notifications",
} as const;

const knownMessageTypes = new Set([joinMessageType, datastoreUpdateMessageType]);
const knownMessageTypes = new Set([
joinMessageType,
datastoreUpdateMessageType,
acknowledgementMessageType,
]);
function isPresenceMessage(
message: InboundExtensionMessage<SignalMessages>,
): message is InboundDatastoreUpdateMessage | InboundClientJoinMessage {
Expand Down Expand Up @@ -148,6 +156,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
private readonly presence: Presence,
systemWorkspaceDatastore: SystemWorkspaceDatastore,
systemWorkspace: AnyWorkspaceEntry<StatesWorkspaceSchema>,
private readonly targetedSignalSupport: boolean,
) {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
this.datastore = { "system:presence": systemWorkspaceDatastore } as PresenceDatastore;
Expand Down Expand Up @@ -347,6 +356,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
assert(optional, "Unrecognized message type in critical message");
return;
}

if (local) {
const deliveryDelta = received - message.content.sendTimestamp;
// Limit returnedMessages count to 256 such that newest message
Expand Down Expand Up @@ -377,6 +387,15 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
if (message.content.isComplete) {
this.refreshBroadcastRequested = false;
}
// If the message requests an acknowledgement, we will send a targeted acknowledgement message back to just the requestor.
if (message.content.acknowledgementId !== undefined) {
assert(this.targetedSignalSupport, "Targeted signals not supported");
this.runtime.submitSignal({
type: acknowledgementMessageType,
content: { id: message.content.acknowledgementId },
targetClientId: message.clientId,
});
}
}

// Handle activation of unregistered workspaces before processing updates.
Expand Down
12 changes: 10 additions & 2 deletions packages/framework/presence/src/presenceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ class PresenceManager implements Presence, PresenceExtensionInterface {

private readonly mc: MonitoringContext | undefined = undefined;

public constructor(runtime: IEphemeralRuntime, attendeeId: AttendeeId) {
public constructor(
runtime: IEphemeralRuntime,
attendeeId: AttendeeId,
targetedSignalSupport: boolean,
) {
const logger = runtime.logger;
if (logger) {
this.mc = createChildMonitoringContext({ logger, namespace: "Presence" });
Expand All @@ -89,6 +93,7 @@ class PresenceManager implements Presence, PresenceExtensionInterface {
this.events,
this.mc?.logger,
this,
targetedSignalSupport,
);
this.attendees = this.systemWorkspace;

Expand Down Expand Up @@ -160,6 +165,7 @@ function setupSubComponents(
IEmitter<PresenceEvents & AttendeesEvents>,
logger: ITelemetryLoggerExt | undefined,
presence: Presence,
targetedSignalSupport: boolean,
): [PresenceDatastoreManager, SystemWorkspace] {
const systemWorkspaceDatastore: SystemWorkspaceDatastore = {
clientToSessionId: {},
Expand All @@ -179,6 +185,7 @@ function setupSubComponents(
presence,
systemWorkspaceDatastore,
systemWorkspaceConfig.statesEntry,
targetedSignalSupport,
);
return [datastoreManager, systemWorkspaceConfig.workspace];
}
Expand All @@ -190,7 +197,8 @@ function setupSubComponents(
*/
export function createPresenceManager(
runtime: IEphemeralRuntime,
targetedSignalSupport: boolean = false,
attendeeId: AttendeeId = createSessionId() as AttendeeId,
): Presence & PresenceExtensionInterface {
return new PresenceManager(runtime, attendeeId);
return new PresenceManager(runtime, attendeeId, targetedSignalSupport);
}
23 changes: 22 additions & 1 deletion packages/framework/presence/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ interface DatastoreUpdateMessage {
content: {
sendTimestamp: number;
avgLatency: number;
acknowledgementId?: number;
isComplete?: true;
data: DatastoreMessageContent;
};
Expand Down Expand Up @@ -72,6 +73,23 @@ interface ClientJoinMessage {
};
}

/**
* @internal
*/
export const acknowledgementMessageType = "Pres:Ack";

interface AcknowledgementMessage {
type: typeof acknowledgementMessageType;
content: {
id: number;
};
}

/**
* @internal
*/
export type OutboundAcknowledgementMessage = OutboundExtensionMessage<AcknowledgementMessage>;

/**
* @internal
*/
Expand All @@ -85,4 +103,7 @@ export type InboundClientJoinMessage = VerifiedInboundExtensionMessage<ClientJoi
/**
* @internal
*/
export type SignalMessages = ClientJoinMessage | DatastoreUpdateMessage;
export type SignalMessages =
| AcknowledgementMessage
| ClientJoinMessage
| DatastoreUpdateMessage;
3 changes: 3 additions & 0 deletions packages/framework/presence/src/test/mockEphemeralRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
connected: [],
disconnected: [],
};

private isSupportedEvent(event: string): event is keyof typeof this.listeners {
return event in this.listeners;
}
Expand Down Expand Up @@ -185,5 +186,7 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
assert.deepStrictEqual(args, expected, "Unexpected signal");
};

public supportedFeatures: ReadonlySet<string> = new Set(["submit_signals_v2"]);

// #endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,39 @@ describe("Presence", () => {
// Verify
assert.strictEqual(listener.callCount, 1);
});

it("with acknowledgementId sends targeted acknowledgment messsage back to requestor", () => {
// We expect to send a targeted acknowledgment back to the requestor
runtime.signalsExpected.push([
{
type: "Pres:Ack",
content: { id: 1 },
targetClientId: "client4",
},
]);

// Act - send generic datastore update with acknowledgement id specified
presence.processSignal(
[],
{
type: "Pres:DatastoreUpdate",
content: {
sendTimestamp: clock.now - 10,
avgLatency: 20,
data: {
"system:presence": systemWorkspaceUpdate,
"s:name:testStateWorkspace": statesWorkspaceUpdate,
},
acknowledgementId: 1,
},
clientId: "client4",
},
false,
);

// Verify
assertFinalExpectations(runtime, logger);
});
});
});
});
6 changes: 5 additions & 1 deletion packages/framework/presence/src/test/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ export function prepareConnectedPresence(
delete expectedClientJoin.clientId;
runtime.signalsExpected.push([expectedClientJoin]);

const presence = createPresenceManager(runtime, attendeeId as AttendeeId);
const presence = createPresenceManager(
runtime,
true /* targetedSignalSupport */,
attendeeId as AttendeeId,
);

// Validate expectations post initialization to make sure logger
// and runtime are left in a clean expectation state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import type { IQuorumClients } from "@fluidframework/driver-definitions/internal
* @internal
*/
export type ClientConnectionId = string;

/**
* Common structure between incoming and outgoing extension signals.
*
Expand Down Expand Up @@ -205,6 +204,8 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper

readonly logger: ITelemetryBaseLogger;

readonly supportedFeatures: ReadonlySet<string>;

/**
* Submits a signal to be sent to other clients.
* @param addressChain - Custom address sequence for the signal.
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4986,6 +4986,7 @@ export class ContainerRuntime
},
getQuorum: this.getQuorum.bind(this),
getAudience: this.getAudience.bind(this),
supportedFeatures: this.ILayerCompatDetails.supportedFeatures,
} satisfies ExtensionHost<TRuntimeProperties>;
entry = new factory(runtime, ...useContext);
this.extensions.set(id, entry);
Expand Down
Loading