Skip to content

feat(client-presence): Create Acknowledgment message interface and ackRequested flow #24470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/framework/presence/src/internalTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export type IEphemeralRuntime = Pick<
(IContainerRuntime & IRuntimeInternal) | IFluidDataStoreRuntime,
"clientId" | "connected" | "getAudience" | "getQuorum" | "off" | "on" | "submitSignal"
> &
Partial<Pick<IFluidDataStoreRuntime, "logger">>;
Partial<Pick<IFluidDataStoreRuntime, "logger">> & {
ILayerCompatDetails?: unknown;
};

/**
* @internal
Expand Down
47 changes: 47 additions & 0 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Licensed under the MIT License.
*/

import type { ILayerCompatDetails } from "@fluid-internal/client-utils";
import type { IEmitter } from "@fluidframework/core-interfaces/internal";
import { assert } from "@fluidframework/core-utils/internal";
import type { IInboundSignalMessage } from "@fluidframework/runtime-definitions/internal";
Expand Down Expand Up @@ -75,6 +76,7 @@ interface DatastoreUpdateMessage extends IInboundSignalMessage {
sendTimestamp: number;
avgLatency: number;
isComplete?: true;
acknowledgementId?: number;
data: DatastoreMessageContent;
};
}
Expand All @@ -90,11 +92,34 @@ interface ClientJoinMessage extends IInboundSignalMessage {
};
}

// Message type identifier for presence acknowledgment messages.
const acknowledgementMessageType = "Pres:Ack";

/**
* Message type used to acknowledge receipt of remote update messages.
*/
interface AcknowledgementMessage extends IInboundSignalMessage {
type: typeof acknowledgementMessageType;
content: {
id: number;
};
}

function isPresenceMessage(
message: IInboundSignalMessage,
): message is DatastoreUpdateMessage | ClientJoinMessage {
return message.type.startsWith("Pres:");
}

function isLayerCompatDetails(value: unknown): value is ILayerCompatDetails {
return (
typeof value === "object" &&
value !== null &&
"supportedFeatures" in value &&
value.supportedFeatures instanceof Set
);
}

/**
* @internal
*/
Expand Down Expand Up @@ -157,6 +182,9 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
private refreshBroadcastRequested = false;
private readonly timer = new TimerManager();
private readonly workspaces = new Map<string, AnyWorkspaceEntry<StatesWorkspaceSchema>>();
private readonly supportsTargetedSignals =
isLayerCompatDetails(this.runtime.ILayerCompatDetails) &&
this.runtime.ILayerCompatDetails.supportedFeatures.has("submit_signals_v2");

public constructor(
private readonly attendeeId: AttendeeId,
Expand Down Expand Up @@ -364,6 +392,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
if (!isPresenceMessage(message)) {
return;
}

if (local) {
const deliveryDelta = received - message.content.sendTimestamp;
// Limit returnedMessages count to 256 such that newest message
Expand Down Expand Up @@ -395,6 +424,24 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
if (message.content.isComplete) {
this.refreshBroadcastRequested = false;
}
// If the message requests an acknowledgement, we will send a targeted acknowledgement message back to just the requestor.
if (message.content.acknowledgementId !== undefined) {
assert(this.supportsTargetedSignals, "Targeted signals not supported");
this.runtime.submitSignal(
acknowledgementMessageType,
{
id: message.content.acknowledgementId,
} satisfies AcknowledgementMessage["content"],
message.clientId,
);
this.logger?.sendTelemetryEvent({
eventName: "AckSent",
details: {
requestor: message.clientId,
responder: this.runtime.clientId,
},
});
}
}

// Handle activation of unregistered workspaces before processing updates.
Expand Down
18 changes: 18 additions & 0 deletions packages/framework/presence/src/test/mockEphemeralRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { strict as assert } from "node:assert";

import type { ILayerCompatDetails } from "@fluid-internal/client-utils";
import type { ITelemetryBaseLogger } from "@fluidframework/core-interfaces";
import type { IClient, ISequencedClient } from "@fluidframework/driver-definitions";
import { MockAudience, MockQuorumClients } from "@fluidframework/test-runtime-utils/internal";
Expand Down Expand Up @@ -78,6 +79,7 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
connected: [],
disconnected: [],
};
private readonly supportedFeatures = new Set(["submit_signals_v2"]);
private isSupportedEvent(event: string): event is keyof typeof this.listeners {
return event in this.listeners;
}
Expand Down Expand Up @@ -136,6 +138,14 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
);
}

public setTargetSignalSupport(supported: boolean): void {
if (supported) {
this.supportedFeatures.add("submit_signals_v2");
} else {
this.supportedFeatures.delete("submit_signals_v2");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future use? I don't see reference to this.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I assumed in the future we'd want something like this to test behavior of both flows (broadcast join vs targeted join behavior).


public removeMember(clientId: ClientConnectionId): void {
const client = this.audience.getMember(clientId);
assert(client !== undefined, `Attempting to remove unknown connection: ${clientId}`);
Expand Down Expand Up @@ -187,5 +197,13 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
assert.deepStrictEqual(args, expected, "Unexpected signal");
};

public get ILayerCompatDetails(): ILayerCompatDetails {
return {
supportedFeatures: this.supportedFeatures,
generation: 1,
pkgVersion: "2.33.0",
};
}

// #endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,47 @@ describe("Presence", () => {
// Verify
assert.strictEqual(listener.callCount, 1);
});

it("with acknowledgementId sends targeted acknowledgment messsage back to requestor", () => {
// Setup
logger.registerExpectedEvent({
eventName: "Presence:AckSent",
details: JSON.stringify({
requestor: "client4",
responder: "client2",
}),
});
// We expect to send a targeted acknowledgment back to the requestor
runtime.signalsExpected.push([
"Pres:Ack",
{
"id": 1,
},
"client4" /* targetClientId */,
]);

// Act - send generic datastore update with acknowledgement id specified
presence.processSignal(
"",
{
type: "Pres:DatastoreUpdate",
content: {
sendTimestamp: clock.now - 10,
avgLatency: 20,
data: {
"system:presence": systemWorkspaceUpdate,
"s:name:testStateWorkspace": statesWorkspaceUpdate,
},
acknowledgementId: 1,
},
clientId: "client4",
},
false,
);

// Verify
assertFinalExpectations(runtime, logger);
});
});
});
});
Loading