Skip to content

feat(client-presence): Create Acknowledgment message interface and ackRequested flow #24470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ import type { SharedObjectKind } from "@fluidframework/shared-object-base";
import { BasicDataStoreFactory, LoadableFluidObject } from "./datastoreSupport.js";
import type { PresenceWithNotifications as Presence } from "./presence.js";
import { createPresenceManager } from "./presenceManager.js";
import type {
OutboundClientJoinMessage,
OutboundDatastoreUpdateMessage,
SignalMessages,
} from "./protocol.js";
import type { OutboundPresenceMessage, SignalMessages } from "./protocol.js";

/**
* This provides faux validation of the signal message.
Expand Down Expand Up @@ -60,8 +56,10 @@ class PresenceManagerDataObject extends LoadableFluidObject {
events,
getQuorum: runtime.getQuorum.bind(runtime),
getAudience: runtime.getAudience.bind(runtime),
submitSignal: (message: OutboundClientJoinMessage | OutboundDatastoreUpdateMessage) =>
submitSignal: (message: OutboundPresenceMessage) =>
runtime.submitSignal(message.type, message.content, message.targetClientId),
supportedFeatures:
new Set() /* We do not implement feature detection here since this is a deprecated path */,
});
this.runtime.on("signal", (message: IInboundSignalMessage, local: boolean) => {
assertSignalMessageIsValid(message);
Expand Down
6 changes: 5 additions & 1 deletion packages/framework/presence/src/internalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ExtensionHost as ContainerExtensionHost } from "@fluidframework/co
import type { InternalTypes } from "./exposedInternalTypes.js";
import type { AttendeeId, Attendee } from "./presence.js";
import type {
OutboundAcknowledgementMessage,
OutboundClientJoinMessage,
OutboundDatastoreUpdateMessage,
SignalMessages,
Expand Down Expand Up @@ -54,7 +55,10 @@ export type IEphemeralRuntime = Omit<ExtensionHost, "logger" | "submitAddressedS
* @param targetClientId - When specified, the signal is only sent to the provided client id.
*/
submitSignal: (
message: OutboundClientJoinMessage | OutboundDatastoreUpdateMessage,
message:
| OutboundAcknowledgementMessage
| OutboundClientJoinMessage
| OutboundDatastoreUpdateMessage,
) => void;
};

Expand Down
27 changes: 25 additions & 2 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ import type {
SignalMessages,
SystemDatastore,
} from "./protocol.js";
import { datastoreUpdateMessageType, joinMessageType } from "./protocol.js";
import {
acknowledgementMessageType,
datastoreUpdateMessageType,
joinMessageType,
} from "./protocol.js";
import type { SystemWorkspaceDatastore } from "./systemWorkspace.js";
import { TimerManager } from "./timerManager.js";
import type {
Expand Down Expand Up @@ -64,7 +68,11 @@ const internalWorkspaceTypes: Readonly<Record<string, "States" | "Notifications"
n: "Notifications",
} as const;

const knownMessageTypes = new Set([joinMessageType, datastoreUpdateMessageType]);
const knownMessageTypes = new Set([
joinMessageType,
datastoreUpdateMessageType,
acknowledgementMessageType,
]);
function isPresenceMessage(
message: InboundExtensionMessage<SignalMessages>,
): message is InboundDatastoreUpdateMessage | InboundClientJoinMessage {
Expand Down Expand Up @@ -137,6 +145,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
private refreshBroadcastRequested = false;
private readonly timer = new TimerManager();
private readonly workspaces = new Map<string, AnyWorkspaceEntry<StatesWorkspaceSchema>>();
private readonly targetedSignalSupport: boolean;

public constructor(
private readonly attendeeId: AttendeeId,
Expand All @@ -150,6 +159,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
this.datastore = { "system:presence": systemWorkspaceDatastore } as PresenceDatastore;
this.workspaces.set("system:presence", systemWorkspace);
this.targetedSignalSupport = this.runtime.supportedFeatures.has("submit_signals_v2");
}

public joinSession(clientId: ClientConnectionId): void {
Expand Down Expand Up @@ -344,6 +354,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
assert(optional, "Unrecognized message type in critical message");
return;
}

if (local) {
const deliveryDelta = received - message.content.sendTimestamp;
// Limit returnedMessages count to 256 such that newest message
Expand Down Expand Up @@ -374,6 +385,18 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
if (message.content.isComplete) {
this.refreshBroadcastRequested = false;
}
// If the message requests an acknowledgement, we will send a targeted acknowledgement message back to just the requestor.
if (message.content.acknowledgementId !== undefined) {
assert(
this.targetedSignalSupport,
"Acknowledgment message was requested while targeted signal capability is not supported",
);
this.runtime.submitSignal({
type: acknowledgementMessageType,
content: { id: message.content.acknowledgementId },
targetClientId: message.clientId,
});
}
}

// Handle activation of unregistered workspaces before processing updates.
Expand Down
33 changes: 32 additions & 1 deletion packages/framework/presence/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ interface DatastoreUpdateMessage {
content: {
sendTimestamp: number;
avgLatency: number;
acknowledgementId?: AcknowledgmentIdType;
isComplete?: true;
data: DatastoreMessageContent;
};
Expand Down Expand Up @@ -72,6 +73,25 @@ interface ClientJoinMessage {
};
}

/**
* Acknowledgement message type.
*/
export const acknowledgementMessageType = "Pres:Ack";

interface AcknowledgementMessage {
type: typeof acknowledgementMessageType;
content: {
id: AcknowledgmentIdType;
};
}

type AcknowledgmentIdType = string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • It is a type. You don't need to say Type in the name.
  • Move before it is used. (English reading order.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof right just saw this, will have a follow up fixing this


/**
* Outbound acknowledgement message.
*/
export type OutboundAcknowledgementMessage = OutboundExtensionMessage<AcknowledgementMessage>;

/**
* Outbound client join message
*/
Expand All @@ -82,7 +102,18 @@ export type OutboundClientJoinMessage = OutboundExtensionMessage<ClientJoinMessa
*/
export type InboundClientJoinMessage = VerifiedInboundExtensionMessage<ClientJoinMessage>;

/**
* Outbound presence message.
*/
export type OutboundPresenceMessage =
| OutboundAcknowledgementMessage
| OutboundClientJoinMessage
| OutboundDatastoreUpdateMessage;

/**
* Messages structures that can be sent and received as understood in the presence protocol
*/
export type SignalMessages = ClientJoinMessage | DatastoreUpdateMessage;
export type SignalMessages =
| AcknowledgementMessage
| ClientJoinMessage
| DatastoreUpdateMessage;
3 changes: 3 additions & 0 deletions packages/framework/presence/src/test/mockEphemeralRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
connected: [],
disconnected: [],
};

private isSupportedEvent(event: string): event is keyof typeof this.listeners {
return event in this.listeners;
}
Expand Down Expand Up @@ -185,5 +186,7 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
assert.deepStrictEqual(args, expected, "Unexpected signal");
};

public supportedFeatures: ReadonlySet<string> = new Set(["submit_signals_v2"]);

// #endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,39 @@ describe("Presence", () => {
// Verify
assert.strictEqual(listener.callCount, 1);
});

it("with acknowledgementId sends targeted acknowledgment messsage back to requestor", () => {
// We expect to send a targeted acknowledgment back to the requestor
runtime.signalsExpected.push([
{
type: "Pres:Ack",
content: { id: "ackID" },
targetClientId: "client4",
},
]);

// Act - send generic datastore update with acknowledgement id specified
presence.processSignal(
[],
{
type: "Pres:DatastoreUpdate",
content: {
sendTimestamp: clock.now - 10,
avgLatency: 20,
data: {
"system:presence": systemWorkspaceUpdate,
"s:name:testStateWorkspace": statesWorkspaceUpdate,
},
acknowledgementId: "ackID",
},
clientId: "client4",
},
false,
);

// Verify
assertFinalExpectations(runtime, logger);
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"typetests:prepare": "flub typetests --dir . --reset --previous --normalize"
},
"dependencies": {
"@fluid-internal/client-utils": "workspace:~",
"@fluidframework/container-definitions": "workspace:~",
"@fluidframework/core-interfaces": "workspace:~",
"@fluidframework/driver-definitions": "workspace:~",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Licensed under the MIT License.
*/

import type { ILayerCompatDetails } from "@fluid-internal/client-utils";
import type { IAudience } from "@fluidframework/container-definitions/internal";
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- BrandedType is a class declaration only
import type {
Expand Down Expand Up @@ -224,6 +225,8 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper

readonly logger: ITelemetryBaseLogger;

readonly supportedFeatures: ILayerCompatDetails["supportedFeatures"];

/**
* Submits a signal to be sent to other clients.
* @param addressChain - Custom address sequence for the signal.
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5021,6 +5021,7 @@ export class ContainerRuntime
},
getQuorum: this.getQuorum.bind(this),
getAudience: this.getAudience.bind(this),
supportedFeatures: this.ILayerCompatDetails.supportedFeatures,
} satisfies ExtensionHost<TRuntimeProperties>;
entry = new factory(runtime, ...useContext);
this.extensions.set(id, entry);
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading