Skip to content

Commit feda828

Browse files
authored
dlp: add constituentMap (#1699)
1 parent 34e9f99 commit feda828

File tree

7 files changed

+500
-1
lines changed

7 files changed

+500
-1
lines changed

sdk/src/accounts/types.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
UserAccount,
77
UserStatsAccount,
88
InsuranceFundStake,
9+
ConstituentAccount,
910
} from '../types';
1011
import StrictEventEmitter from 'strict-event-emitter-types';
1112
import { EventEmitter } from 'events';
@@ -243,3 +244,22 @@ export interface HighLeverageModeConfigAccountEvents {
243244
update: void;
244245
error: (e: Error) => void;
245246
}
247+
248+
export interface ConstituentAccountSubscriber {
249+
eventEmitter: StrictEventEmitter<EventEmitter, ConstituentAccountEvents>;
250+
isSubscribed: boolean;
251+
252+
subscribe(constituentAccount?: ConstituentAccount): Promise<boolean>;
253+
sync(): Promise<void>;
254+
unsubscribe(): Promise<void>;
255+
}
256+
257+
export interface ConstituentAccountEvents {
258+
onAccountUpdate: (
259+
account: ConstituentAccount,
260+
pubkey: PublicKey,
261+
slot: number
262+
) => void;
263+
update: void;
264+
error: (e: Error) => void;
265+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
import {
2+
Commitment,
3+
MemcmpFilter,
4+
PublicKey,
5+
RpcResponseAndContext,
6+
} from '@solana/web3.js';
7+
import { ConstituentAccountSubscriber, DataAndSlot } from '../accounts/types';
8+
import { ConstituentAccount } from '../types';
9+
import { PollingConstituentAccountSubscriber } from './pollingConstituentAccountSubscriber';
10+
import { WebSocketConstituentAccountSubscriber } from './webSocketConstituentAccountSubscriber';
11+
import { DriftClient } from '../driftClient';
12+
import { ProgramAccount } from '@coral-xyz/anchor';
13+
import { getConstituentFilter } from '../memcmp';
14+
import { ZSTDDecoder } from 'zstddec';
15+
16+
const MAX_CONSTITUENT_SIZE_BYTES = 272; // TODO: update this when account is finalized
17+
18+
export type ConstituentMapConfig = {
19+
driftClient: DriftClient;
20+
subscriptionConfig:
21+
| {
22+
type: 'polling';
23+
frequency: number;
24+
commitment?: Commitment;
25+
}
26+
| {
27+
type: 'websocket';
28+
resubTimeoutMs?: number;
29+
logResubMessages?: boolean;
30+
commitment?: Commitment;
31+
};
32+
// potentially use these to filter Constituent accounts
33+
additionalFilters?: MemcmpFilter[];
34+
};
35+
36+
export interface ConstituentMapInterface {
37+
subscribe(): Promise<void>;
38+
unsubscribe(): Promise<void>;
39+
has(key: string): boolean;
40+
get(key: string): ConstituentAccount | undefined;
41+
getWithSlot(key: string): DataAndSlot<ConstituentAccount> | undefined;
42+
mustGet(key: string): Promise<ConstituentAccount>;
43+
mustGetWithSlot(key: string): Promise<DataAndSlot<ConstituentAccount>>;
44+
}
45+
46+
export class ConstituentMap implements ConstituentMapInterface {
47+
private driftClient: DriftClient;
48+
private constituentMap = new Map<string, DataAndSlot<ConstituentAccount>>();
49+
private constituentAccountSubscriber: ConstituentAccountSubscriber;
50+
private additionalFilters?: MemcmpFilter[];
51+
private commitment?: Commitment;
52+
53+
constructor(config: ConstituentMapConfig) {
54+
this.driftClient = config.driftClient;
55+
this.additionalFilters = config.additionalFilters;
56+
this.commitment = config.subscriptionConfig.commitment;
57+
58+
if (config.subscriptionConfig.type === 'polling') {
59+
this.constituentAccountSubscriber =
60+
new PollingConstituentAccountSubscriber(
61+
this,
62+
this.driftClient.program,
63+
config.subscriptionConfig.frequency,
64+
config.subscriptionConfig.commitment,
65+
config.additionalFilters
66+
);
67+
} else if (config.subscriptionConfig.type === 'websocket') {
68+
this.constituentAccountSubscriber =
69+
new WebSocketConstituentAccountSubscriber(
70+
this,
71+
this.driftClient.program,
72+
config.subscriptionConfig.resubTimeoutMs,
73+
config.subscriptionConfig.commitment,
74+
config.additionalFilters
75+
);
76+
}
77+
78+
// Listen for account updates from the subscriber
79+
this.constituentAccountSubscriber.eventEmitter.on(
80+
'onAccountUpdate',
81+
(account: ConstituentAccount, pubkey: PublicKey, slot: number) => {
82+
this.updateConstituentAccount(pubkey.toString(), account, slot);
83+
}
84+
);
85+
}
86+
87+
private getFilters(): MemcmpFilter[] {
88+
const filters = [getConstituentFilter()];
89+
if (this.additionalFilters) {
90+
filters.push(...this.additionalFilters);
91+
}
92+
return filters;
93+
}
94+
95+
private decode(name: string, buffer: Buffer): ConstituentAccount {
96+
return this.driftClient.program.account.constituent.coder.accounts.decodeUnchecked(
97+
name,
98+
buffer
99+
);
100+
}
101+
102+
public async sync(): Promise<void> {
103+
try {
104+
const rpcRequestArgs = [
105+
this.driftClient.program.programId.toBase58(),
106+
{
107+
commitment: this.commitment,
108+
filters: this.getFilters(),
109+
encoding: 'base64+zstd',
110+
withContext: true,
111+
},
112+
];
113+
114+
// @ts-ignore
115+
const rpcJSONResponse: any = await this.connection._rpcRequest(
116+
'getProgramAccounts',
117+
rpcRequestArgs
118+
);
119+
const rpcResponseAndContext: RpcResponseAndContext<
120+
Array<{ pubkey: PublicKey; account: { data: [string, string] } }>
121+
> = rpcJSONResponse.result;
122+
const slot = rpcResponseAndContext.context.slot;
123+
124+
const promises = rpcResponseAndContext.value.map(
125+
async (programAccount) => {
126+
const compressedUserData = Buffer.from(
127+
programAccount.account.data[0],
128+
'base64'
129+
);
130+
const decoder = new ZSTDDecoder();
131+
await decoder.init();
132+
const buffer = Buffer.from(
133+
decoder.decode(compressedUserData, MAX_CONSTITUENT_SIZE_BYTES)
134+
);
135+
const key = programAccount.pubkey.toString();
136+
const currAccountWithSlot = this.getWithSlot(key);
137+
138+
if (currAccountWithSlot) {
139+
if (slot >= currAccountWithSlot.slot) {
140+
const constituentAcc = this.decode('Constituent', buffer);
141+
this.updateConstituentAccount(key, constituentAcc, slot);
142+
}
143+
} else {
144+
const constituentAcc = this.decode('Constituent', buffer);
145+
this.updateConstituentAccount(key, constituentAcc, slot);
146+
}
147+
}
148+
);
149+
await Promise.all(promises);
150+
} catch (error) {
151+
console.log(`ConstituentMap.sync() error: ${error.message}`);
152+
}
153+
}
154+
155+
public async subscribe(): Promise<void> {
156+
await this.constituentAccountSubscriber.subscribe();
157+
}
158+
159+
public async unsubscribe(): Promise<void> {
160+
await this.constituentAccountSubscriber.unsubscribe();
161+
this.constituentMap.clear();
162+
}
163+
164+
public has(key: string): boolean {
165+
return this.constituentMap.has(key);
166+
}
167+
168+
public get(key: string): ConstituentAccount | undefined {
169+
return this.constituentMap.get(key)?.data;
170+
}
171+
172+
public getWithSlot(key: string): DataAndSlot<ConstituentAccount> | undefined {
173+
return this.constituentMap.get(key);
174+
}
175+
176+
public async mustGet(key: string): Promise<ConstituentAccount> {
177+
if (!this.has(key)) {
178+
await this.sync();
179+
}
180+
const result = this.constituentMap.get(key);
181+
if (!result) {
182+
throw new Error(`ConstituentAccount not found for key: ${key}`);
183+
}
184+
return result.data;
185+
}
186+
187+
public async mustGetWithSlot(
188+
key: string
189+
): Promise<DataAndSlot<ConstituentAccount>> {
190+
if (!this.has(key)) {
191+
await this.sync();
192+
}
193+
const result = this.constituentMap.get(key);
194+
if (!result) {
195+
throw new Error(`ConstituentAccount not found for key: ${key}`);
196+
}
197+
return result;
198+
}
199+
200+
public size(): number {
201+
return this.constituentMap.size;
202+
}
203+
204+
public *values(): IterableIterator<ConstituentAccount> {
205+
for (const dataAndSlot of this.constituentMap.values()) {
206+
yield dataAndSlot.data;
207+
}
208+
}
209+
210+
public valuesWithSlot(): IterableIterator<DataAndSlot<ConstituentAccount>> {
211+
return this.constituentMap.values();
212+
}
213+
214+
public *entries(): IterableIterator<[string, ConstituentAccount]> {
215+
for (const [key, dataAndSlot] of this.constituentMap.entries()) {
216+
yield [key, dataAndSlot.data];
217+
}
218+
}
219+
220+
public entriesWithSlot(): IterableIterator<
221+
[string, DataAndSlot<ConstituentAccount>]
222+
> {
223+
return this.constituentMap.entries();
224+
}
225+
226+
public updateConstituentAccount(
227+
key: string,
228+
constituentAccount: ConstituentAccount,
229+
slot: number
230+
): void {
231+
const existingData = this.getWithSlot(key);
232+
if (existingData) {
233+
if (slot >= existingData.slot) {
234+
this.constituentMap.set(key, {
235+
data: constituentAccount,
236+
slot,
237+
});
238+
}
239+
} else {
240+
this.constituentMap.set(key, {
241+
data: constituentAccount,
242+
slot,
243+
});
244+
}
245+
}
246+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import {
2+
DataAndSlot,
3+
NotSubscribedError,
4+
ConstituentAccountEvents,
5+
ConstituentAccountSubscriber,
6+
} from '../accounts/types';
7+
import { Program } from '@coral-xyz/anchor';
8+
import StrictEventEmitter from 'strict-event-emitter-types';
9+
import { EventEmitter } from 'events';
10+
import { PublicKey, Commitment, MemcmpFilter } from '@solana/web3.js';
11+
import { ConstituentAccount } from '../types';
12+
import { ConstituentMap } from './constituentMap';
13+
14+
export class PollingConstituentAccountSubscriber
15+
implements ConstituentAccountSubscriber
16+
{
17+
isSubscribed: boolean;
18+
program: Program;
19+
frequency: number;
20+
commitment?: Commitment;
21+
additionalFilters?: MemcmpFilter[];
22+
eventEmitter: StrictEventEmitter<EventEmitter, ConstituentAccountEvents>;
23+
24+
intervalId?: NodeJS.Timeout;
25+
constituentMap: ConstituentMap;
26+
27+
public constructor(
28+
constituentMap: ConstituentMap,
29+
program: Program,
30+
frequency: number,
31+
commitment?: Commitment,
32+
additionalFilters?: MemcmpFilter[]
33+
) {
34+
this.constituentMap = constituentMap;
35+
this.isSubscribed = false;
36+
this.program = program;
37+
this.frequency = frequency;
38+
this.commitment = commitment;
39+
this.additionalFilters = additionalFilters;
40+
this.eventEmitter = new EventEmitter();
41+
}
42+
43+
async subscribe(): Promise<boolean> {
44+
if (this.isSubscribed || this.frequency <= 0) {
45+
return true;
46+
}
47+
48+
const executeSync = async () => {
49+
await this.sync();
50+
this.intervalId = setTimeout(executeSync, this.frequency);
51+
};
52+
53+
// Initial sync
54+
await this.sync();
55+
56+
// Start polling
57+
this.intervalId = setTimeout(executeSync, this.frequency);
58+
59+
this.isSubscribed = true;
60+
return true;
61+
}
62+
63+
async sync(): Promise<void> {
64+
try {
65+
await this.constituentMap.sync();
66+
this.eventEmitter.emit('update');
67+
} catch (error) {
68+
console.log(
69+
`PollingConstituentAccountSubscriber.sync() error: ${error.message}`
70+
);
71+
this.eventEmitter.emit('error', error);
72+
}
73+
}
74+
75+
async unsubscribe(): Promise<void> {
76+
if (!this.isSubscribed) {
77+
return;
78+
}
79+
80+
if (this.intervalId) {
81+
clearTimeout(this.intervalId);
82+
this.intervalId = undefined;
83+
}
84+
85+
this.isSubscribed = false;
86+
}
87+
88+
assertIsSubscribed(): void {
89+
if (!this.isSubscribed) {
90+
throw new NotSubscribedError(
91+
'You must call `subscribe` before using this function'
92+
);
93+
}
94+
}
95+
96+
didSubscriptionSucceed(): boolean {
97+
return this.isSubscribed;
98+
}
99+
}

0 commit comments

Comments
 (0)