From a967ec1cc09406abf298a3a1c65c3c30772ceea1 Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Wed, 21 Aug 2024 01:58:33 -0400 Subject: [PATCH 1/3] POC for coinbase-prime response delay on balance2 endpoint --- .../coinbase-prime/src/endpoint/balance2.ts | 67 ++++ .../coinbase-prime/src/endpoint/index.ts | 1 + packages/sources/coinbase-prime/src/index.ts | 4 +- .../coinbase-prime/src/transport/balance2.ts | 286 ++++++++++++++++++ 4 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 packages/sources/coinbase-prime/src/endpoint/balance2.ts create mode 100644 packages/sources/coinbase-prime/src/transport/balance2.ts diff --git a/packages/sources/coinbase-prime/src/endpoint/balance2.ts b/packages/sources/coinbase-prime/src/endpoint/balance2.ts new file mode 100644 index 0000000000..fa7413ac3f --- /dev/null +++ b/packages/sources/coinbase-prime/src/endpoint/balance2.ts @@ -0,0 +1,67 @@ +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { InputParameters } from '@chainlink/external-adapter-framework/validation' +import { AdapterError } from '@chainlink/external-adapter-framework/validation/error' +import { SingleNumberResultResponse } from '@chainlink/external-adapter-framework/util' +import { config } from '../config' +import { balanceTransport } from '../transport/balance2' +import { getApiKeys } from '../transport/utils' + +export const inputParameters = new InputParameters( + { + portfolio: { + required: true, + type: 'string', + description: 'The portfolio ID to query the balance of', + }, + symbol: { + required: true, + type: 'string', + description: 'The symbol to return the balance for', + }, + type: { + type: 'string', + description: 'The balance type to return', + default: 'total', + options: ['total', 'vault', 'trading'], + }, + apiKey: { + type: 'string', + description: + 'Alternative api keys to use for this request, {$apiKey}_ACCESS_KEY {$apiKey}_PASSPHRASE {$apiKey}_SIGNING_KEY required in environment variables', + default: '', + }, + acceptDelay: { + type: 'boolean', + description: 'Delay ', + default: false, + }, + }, + [ + { + portfolio: 'abcd1234-123a-1234-ab12-12a34bcd56e7', + symbol: 'BTC', + type: 'total', + apiKey: '', + acceptDelay: false, + }, + ], +) + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Response: SingleNumberResultResponse + Settings: typeof config.settings +} + +export const endpoint = new AdapterEndpoint({ + name: 'balance2', + // transport: httpTransport, + transport: balanceTransport, + inputParameters, + customInputValidation: (request, settings): AdapterError | undefined => { + if (request.requestContext.data.apiKey) { + getApiKeys(request.requestContext.data.apiKey, settings) + } + return + }, +}) diff --git a/packages/sources/coinbase-prime/src/endpoint/index.ts b/packages/sources/coinbase-prime/src/endpoint/index.ts index 681acabb92..18b6846719 100644 --- a/packages/sources/coinbase-prime/src/endpoint/index.ts +++ b/packages/sources/coinbase-prime/src/endpoint/index.ts @@ -1,2 +1,3 @@ export { endpoint as balance } from './balance' +export { endpoint as balance2 } from './balance2' export { endpoint as wallet } from './wallet' diff --git a/packages/sources/coinbase-prime/src/index.ts b/packages/sources/coinbase-prime/src/index.ts index 403f9854dc..abc79e506e 100644 --- a/packages/sources/coinbase-prime/src/index.ts +++ b/packages/sources/coinbase-prime/src/index.ts @@ -1,13 +1,13 @@ import { expose, ServerInstance } from '@chainlink/external-adapter-framework' import { config } from './config' -import { balance, wallet } from './endpoint' +import { balance, wallet, balance2 } from './endpoint' import { PoRAdapter } from '@chainlink/external-adapter-framework/adapter/por' export const adapter = new PoRAdapter({ defaultEndpoint: balance.name, name: 'COINBASE_PRIME', config, - endpoints: [balance, wallet], + endpoints: [balance, wallet, balance2], rateLimiting: { tiers: { default: { diff --git a/packages/sources/coinbase-prime/src/transport/balance2.ts b/packages/sources/coinbase-prime/src/transport/balance2.ts new file mode 100644 index 0000000000..4b70aba9ae --- /dev/null +++ b/packages/sources/coinbase-prime/src/transport/balance2.ts @@ -0,0 +1,286 @@ +import { TransportDependencies } from '@chainlink/external-adapter-framework/transports' +import { BaseEndpointTypes, inputParameters } from '../endpoint/balance2' +import { sign, getApiKeys } from './utils' +import { + calculateCacheKey, + calculateHttpRequestKey, +} from '@chainlink/external-adapter-framework/cache' +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { sleep, AdapterResponse, makeLogger } from '@chainlink/external-adapter-framework/util' +import { Requester } from '@chainlink/external-adapter-framework/util/requester' +import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription' + +const logger = makeLogger('Balance2Transport') + +export interface ResponseSchema { + balances: { + symbol: string + amount: string + holds: string + bonded_amount: string + reserved_amount: string + unbonding_amount: string + unvested_amount: string + pending_rewards_amount: string + past_rewards_amount: string + bondable_amount: string + withdrawable_amount: string + fiat_amount: string + }[] + type: string + trading_balances: { + total: string // Returns total in fiat amount + holds: string + } + vault_balances: { + total: string // Returns total in fiat amount + holds: string + } +} + +export type BalanceTransportTypes = BaseEndpointTypes & { + Provider: { + RequestBody: never + ResponseBody: ResponseSchema + } +} + +type RequestParams = typeof inputParameters.validated + +// revisit if we have >100 separate portfolios using this EA +// const myCache = CacheFactory.buildCache({cacheType: 'local', maxSizeForLocalCache: 100}) +type BlipCacheValue = { + result: number + timestamp: number +} +const blipCache = new Map() +const BLIP_DURATION_MS = 120000 + +// export class WalletTransport extends SubscriptionTransport { +export class BalanceTransport extends SubscriptionTransport { + settings!: BalanceTransportTypes['Settings'] + requester!: Requester + endpointName!: string + + async initialize( + dependencies: TransportDependencies, + adapterSettings: BalanceTransportTypes['Settings'], + endpointName: string, + transportName: string, + ): Promise { + await super.initialize(dependencies, adapterSettings, endpointName, transportName) + this.settings = adapterSettings + this.requester = dependencies.requester + this.endpointName = endpointName + } + + async backgroundHandler( + context: EndpointContext, + entries: RequestParams[], + ) { + await Promise.all(entries.map(async (param) => this.handleRequest(param))) + await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS) + } + + async handleRequest(param: RequestParams) { + let response: AdapterResponse + try { + response = await this._handleRequest(param) + } catch (e) { + const errorMessage = e instanceof Error ? e.message : 'Unknown error occurred' + logger.error(e, errorMessage) + response = { + statusCode: 502, + errorMessage, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: undefined, + }, + } + } + await this.responseCache.write(this.name, [{ params: param, response }]) + } + + async _handleRequest( + param: RequestParams, + ): Promise> { + const { portfolio, symbol, type, apiKey, acceptDelay } = param + const providerDataRequestedUnixMs = Date.now() + + const response = await this.sendBalanceRequest(portfolio, symbol, type, apiKey) + if (!response) { + return { + errorMessage: `The data provider did not return data for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + statusCode: 502, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + if (!response.balances) { + return { + errorMessage: `The data provider response does not contain a balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + statusCode: 502, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + // The adapter only supports querying one asset at a time so the balances list should only contain 1 element + if (response.balances.length !== 1) { + return { + errorMessage: `The data provider response does not contain exactly one element in the balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`, + statusCode: 502, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + const result = Number(response.balances[0].amount) + if (isNaN(result)) { + return { + errorMessage: `The data provider returned non-numeric balance: ${response.balances[0].amount}`, + statusCode: 502, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + const generateResponseBody = (r: number = result) => { + return { + result: r, + data: { + result: r, + }, + statusCode: 200, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } + } + + // standard REST API case + if (!acceptDelay) { + return generateResponseBody() + } + + const cacheKey = calculateCacheKey({ + transportName: this.name, + data: param, + adapterName: this.responseCache.adapterName, + endpointName: this.responseCache.endpointName, + adapterSettings: this.responseCache.adapterSettings, + }) + + // if `result` doesn't match cached response, we want to delay returning the new value + // by 2 minutes, ie: we don't want to update the response cache right away. + // we'll do this by caching this value in a separate map for 2 minutes + // TODO make 2 minutes configurable + const responseCacheData = await this.responseCache.cache.get(cacheKey) + if (!responseCacheData || !responseCacheData.result) { + console.log('no responseCacheData found') + return generateResponseBody() + } else if (responseCacheData.result === result) { + console.log('responseCacheData and latest result are the same') + return generateResponseBody() + } + + const blipCacheData = blipCache.get(cacheKey) + console.log('responseCacheData = ') + console.log(responseCacheData) + console.log('blipCacheData = ') + console.log(blipCacheData) + + // result is found in blipCache, want to update TTL of the cached value + if (result === blipCacheData?.result) { + console.log( + `blipCache timestamp = ${blipCacheData?.timestamp}, blipmin = ${ + Date.now() - BLIP_DURATION_MS + }, isless = ${blipCacheData?.timestamp < Date.now() - BLIP_DURATION_MS}`, + ) + if (blipCacheData?.timestamp > Date.now() - BLIP_DURATION_MS) { + console.log(`rewriting responseCache ${cacheKey}`) + // await this.responseCache.writeTTL(this.name, [param], this.settings.CACHE_MAX_AGE) + // return generateResponseBody(a.result) + } else { + // blipCacheValue has been cached long enough and seems like a good value + console.log(`removing local ${cacheKey}`) + blipCache.delete(cacheKey) + return generateResponseBody() + } + } else { + // blipCacheValue not the same as result, overwrite + console.log(`overwriting local ${cacheKey}`) + blipCache.set(cacheKey, { result, timestamp: providerDataRequestedUnixMs }) + } + + return generateResponseBody(responseCacheData.result) + } + + async sendBalanceRequest( + portfolio: string, + symbol: string, + type: string, + apiKey: string, + ): Promise { + const [signingKey, accessKey, passPhrase] = getApiKeys(apiKey, this.settings) + const timestamp = Math.floor(Date.now() / 1000) + const method = 'GET' + const path = `/v1/portfolios/${portfolio}/balances` + const message = `${timestamp}${method}${path}` + const signature = sign(message, signingKey) + + const requestConfig = { + baseURL: this.settings.API_ENDPOINT, + url: path, + headers: { + 'X-CB-ACCESS-KEY': accessKey, + 'X-CB-ACCESS-PASSPHRASE': passPhrase, + 'X-CB-ACCESS-SIGNATURE': signature, + 'X-CB-ACCESS-TIMESTAMP': timestamp, + 'Content-Type': 'application/json', + }, + params: { + symbols: symbol.toUpperCase(), + balance_type: `${type.toUpperCase()}_BALANCES`, + }, + } + + const res = await this.requester.request( + calculateHttpRequestKey({ + context: { + adapterSettings: this.settings, + inputParameters, + endpointName: this.endpointName, + }, + data: requestConfig.params, + transportName: this.name, + }), + requestConfig, + ) + + console.log(res.response.data) + return res.response.data + } + + getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number { + return adapterSettings.WARMUP_SUBSCRIPTION_TTL + } +} + +export const balanceTransport = new BalanceTransport() From c618984523b4e2471689fb6c57b2d77639fabfe7 Mon Sep 17 00:00:00 2001 From: Karen Stepanyan Date: Thu, 22 Aug 2024 16:42:31 +0400 Subject: [PATCH 2/3] cleanup --- .../coinbase-prime/src/config/index.ts | 5 + .../coinbase-prime/src/transport/balance2.ts | 108 ++++++------------ .../coinbase-prime/src/transport/utils.ts | 16 +++ 3 files changed, 57 insertions(+), 72 deletions(-) diff --git a/packages/sources/coinbase-prime/src/config/index.ts b/packages/sources/coinbase-prime/src/config/index.ts index cd529b8397..9f1d0b2789 100644 --- a/packages/sources/coinbase-prime/src/config/index.ts +++ b/packages/sources/coinbase-prime/src/config/index.ts @@ -24,6 +24,11 @@ export const config = new AdapterConfig({ required: true, sensitive: true, }, + DELAYED_RESPONSE_MS: { + description: 'The amount of time to delay the new response in milliseconds', + type: 'number', + default: 120_000, + }, BACKGROUND_EXECUTE_MS: { description: 'The amount of time the background execute should sleep before performing the next request', diff --git a/packages/sources/coinbase-prime/src/transport/balance2.ts b/packages/sources/coinbase-prime/src/transport/balance2.ts index 4b70aba9ae..7fc119b708 100644 --- a/packages/sources/coinbase-prime/src/transport/balance2.ts +++ b/packages/sources/coinbase-prime/src/transport/balance2.ts @@ -1,6 +1,6 @@ import { TransportDependencies } from '@chainlink/external-adapter-framework/transports' import { BaseEndpointTypes, inputParameters } from '../endpoint/balance2' -import { sign, getApiKeys } from './utils' +import { sign, getApiKeys, errorResponse } from './utils' import { calculateCacheKey, calculateHttpRequestKey, @@ -48,15 +48,12 @@ export type BalanceTransportTypes = BaseEndpointTypes & { type RequestParams = typeof inputParameters.validated // revisit if we have >100 separate portfolios using this EA -// const myCache = CacheFactory.buildCache({cacheType: 'local', maxSizeForLocalCache: 100}) type BlipCacheValue = { result: number timestamp: number } const blipCache = new Map() -const BLIP_DURATION_MS = 120000 -// export class WalletTransport extends SubscriptionTransport { export class BalanceTransport extends SubscriptionTransport { settings!: BalanceTransportTypes['Settings'] requester!: Requester @@ -110,53 +107,33 @@ export class BalanceTransport extends SubscriptionTransport { @@ -174,7 +151,7 @@ export class BalanceTransport extends SubscriptionTransport { + return { + errorMessage: message, + statusCode, + timestamps: { + providerDataRequestedUnixMs, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + } +} From 027d2bcd9dab5877aa8654fe4181ff5b9fdf1635 Mon Sep 17 00:00:00 2001 From: Matthew McAllister Date: Fri, 23 Aug 2024 20:45:00 -0400 Subject: [PATCH 3/3] updating comments & additional blipCache.delete to avoid edge case --- .../coinbase-prime/src/config/index.ts | 2 +- .../coinbase-prime/src/endpoint/balance2.ts | 1 - .../coinbase-prime/src/transport/balance2.ts | 30 +++++++++++-------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/sources/coinbase-prime/src/config/index.ts b/packages/sources/coinbase-prime/src/config/index.ts index 9f1d0b2789..057e1b99b6 100644 --- a/packages/sources/coinbase-prime/src/config/index.ts +++ b/packages/sources/coinbase-prime/src/config/index.ts @@ -27,7 +27,7 @@ export const config = new AdapterConfig({ DELAYED_RESPONSE_MS: { description: 'The amount of time to delay the new response in milliseconds', type: 'number', - default: 120_000, + default: 180_000, }, BACKGROUND_EXECUTE_MS: { description: diff --git a/packages/sources/coinbase-prime/src/endpoint/balance2.ts b/packages/sources/coinbase-prime/src/endpoint/balance2.ts index fa7413ac3f..6eb754e568 100644 --- a/packages/sources/coinbase-prime/src/endpoint/balance2.ts +++ b/packages/sources/coinbase-prime/src/endpoint/balance2.ts @@ -55,7 +55,6 @@ export type BaseEndpointTypes = { export const endpoint = new AdapterEndpoint({ name: 'balance2', - // transport: httpTransport, transport: balanceTransport, inputParameters, customInputValidation: (request, settings): AdapterError | undefined => { diff --git a/packages/sources/coinbase-prime/src/transport/balance2.ts b/packages/sources/coinbase-prime/src/transport/balance2.ts index 7fc119b708..5595b2feaa 100644 --- a/packages/sources/coinbase-prime/src/transport/balance2.ts +++ b/packages/sources/coinbase-prime/src/transport/balance2.ts @@ -164,29 +164,35 @@ export class BalanceTransport extends SubscriptionTransport