diff --git a/packages/cli/src/ceramic-daemon.ts b/packages/cli/src/ceramic-daemon.ts index 85a4b304de..9052087220 100644 --- a/packages/cli/src/ceramic-daemon.ts +++ b/packages/cli/src/ceramic-daemon.ts @@ -301,7 +301,7 @@ export class CeramicDaemon { opts.stateStore?.s3Endpoint ) - await ceramic.repository.injectKeyValueStore(s3Store) + await ceramic.repository.setKeyValueStore(s3Store) } const did = new DID({ resolver: makeResolvers(ceramic, ceramicConfig, opts) }) ceramic.did = did diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 6d2501ea4d..d25c578b6b 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -55,6 +55,7 @@ import { AnchorResumingService } from './state-management/anchor-resuming-servic import { SyncApi } from './sync/sync-api.js' import { ProvidersCache } from './providers-cache.js' import crypto from 'crypto' +import { StateManager } from './state-management/state-manager.js' const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting @@ -275,33 +276,39 @@ export class Ceramic implements CeramicApi { this._streamHandlers ) const pinStore = modules.pinStoreFactory.createPinStore() - const localIndex = new LocalIndexApi( - params.indexingConfig, - this.repository, - this._logger, - params.networkOptions.name - ) - this.repository.setDeps({ - dispatcher: this.dispatcher, - pinStore: pinStore, - keyValueStore: this._levelStore, - anchorRequestStore: new AnchorRequestStore(), - context: this.context, - handlers: this._streamHandlers, - anchorService: modules.anchorService, - conflictResolution: conflictResolution, - indexing: localIndex, - }) this.syncApi = new SyncApi( { db: params.indexingConfig.db, on: params.sync, }, this.dispatcher, - this.repository.stateManager.handleUpdate.bind(this.repository.stateManager), - this.repository.index, + //this.repository.index, this._logger ) + const localIndex = new LocalIndexApi( + params.indexingConfig, + this.repository, + this._logger, + params.networkOptions.name + ) + const stateManager = new StateManager( + this.dispatcher, + pinStore, + new AnchorRequestStore(), + this.repository.executionQ, + modules.anchorService, + conflictResolution, + this._logger, + (streamId) => this.repository.fromMemoryOrStore(streamId), + (streamId, opts) => this.repository.load(streamId, opts), + this.repository.indexStreamIfNeeded.bind(this), + localIndex + ) + this.syncApi.bind(stateManager.handleUpdate.bind(stateManager)) + this.repository.setContext(this.context) + this.repository.setStateManager(stateManager) + this.repository.setHandlers(this._streamHandlers) + this.repository.setKeyValueStore(this._levelStore) this.admin = new LocalAdminApi(localIndex, this.syncApi, this.nodeStatus.bind(this)) } @@ -603,7 +610,7 @@ export class Ceramic implements CeramicApi { if (this.syncApi.enabled) { const provider = await this.providersCache.getProvider(chainId) - await this.syncApi.init(provider) + await this.syncApi.init(provider, this.index) } await this._startupChecks() diff --git a/packages/core/src/state-management/repository.ts b/packages/core/src/state-management/repository.ts index 04f0158fd4..48c3f4b639 100644 --- a/packages/core/src/state-management/repository.ts +++ b/packages/core/src/state-management/repository.ts @@ -30,18 +30,6 @@ import { LocalIndexApi } from '../indexing/local-index-api.js' import { IKVStore } from '../store/ikv-store.js' import { AnchorRequestStore } from '../store/anchor-request-store.js' -export type RepositoryDependencies = { - dispatcher: Dispatcher - pinStore: PinStore - keyValueStore: IKVStore - anchorRequestStore: AnchorRequestStore - context: Context - handlers: HandlersMap - anchorService: AnchorService - conflictResolution: ConflictResolution - indexing: LocalIndexApi -} - const DEFAULT_LOAD_OPTS = { sync: SyncOptions.PREFER_CACHE, syncTimeoutSeconds: 3 } /** @@ -70,15 +58,25 @@ export class Repository { readonly inmemory: StateCache /** - * Various dependencies. + * KeyValueStore */ - #deps: RepositoryDependencies + keyValueStore: IKVStore /** * Instance of StateManager for performing operations on stream state. */ stateManager: StateManager + /** + * Handlers to use + */ + handlers: HandlersMap + + /** + * Context to use + */ + context: Context + /** * @param cacheLimit - Maximum number of streams to store in memory cache. * @param logger - Where we put diagnostics messages. @@ -100,49 +98,35 @@ export class Repository { this.updates$ = this.updates$.bind(this) } - async injectKeyValueStore(stateStore: IKVStore): Promise { - this.setDeps({ - ...this.#deps, - keyValueStore: stateStore, - }) - await this.init() + setKeyValueStore(keyValueStore: IKVStore) { + this.keyValueStore = keyValueStore } async init(): Promise { - await this.pinStore.open(this.#deps.keyValueStore) - await this.anchorRequestStore.open(this.#deps.keyValueStore) + await this.stateManager.open(this.keyValueStore) + await this.anchorRequestStore.open(this.keyValueStore) await this.index.init() } - get pinStore(): PinStore { - return this.#deps.pinStore - } - get anchorRequestStore(): AnchorRequestStore { - return this.#deps.anchorRequestStore + return this.stateManager.anchorRequestStore } get index(): LocalIndexApi { - return this.#deps.indexing + return this.stateManager._index } // Ideally this would be provided in the constructor, but circular dependencies in our initialization process make this necessary for now - setDeps(deps: RepositoryDependencies): void { - this.#deps = deps - this.stateManager = new StateManager( - deps.dispatcher, - deps.pinStore, - deps.anchorRequestStore, - this.executionQ, - deps.anchorService, - deps.conflictResolution, - this.logger, - (streamId) => this.fromMemoryOrStore(streamId), - (streamId, opts) => this.load(streamId, opts), - // TODO (NET-1687): remove as part of refactor to push indexing into state-manager.ts - this.indexStreamIfNeeded.bind(this), - deps.indexing - ) + setStateManager(stateManager: StateManager): void { + this.stateManager = stateManager + } + + setHandlers(handlers: HandlersMap): void { + this.handlers = handlers + } + + setContext(context: Context): void { + this.context = context } private fromMemory(streamId: StreamID): RunningState | undefined { @@ -150,7 +134,7 @@ export class Repository { } private async fromStreamStateStore(streamId: StreamID): Promise { - const streamState = await this.#deps.pinStore.stateStore.load(streamId) + const streamState = await this.stateManager.loadPinnedStream(streamId) if (streamState) { const runningState = new RunningState(streamState, true) this.add(runningState) @@ -165,9 +149,9 @@ export class Repository { } private async fromNetwork(streamId: StreamID): Promise { - const handler = this.#deps.handlers.get(streamId.typeName) + const handler = this.handlers.get(streamId.typeName) const genesisCid = streamId.cid - const commitData = await Utils.getCommitData(this.#deps.dispatcher, genesisCid, streamId) + const commitData = await Utils.getCommitData(this.stateManager.dispatcher, genesisCid, streamId) if (commitData == null) { throw new Error(`No genesis commit found with CID ${genesisCid.toString()}`) } @@ -175,7 +159,7 @@ export class Repository { // (or learning that the genesis commit *is* the current tip), when we will have timestamp // information for when the genesis commit was anchored. commitData.disableTimecheck = true - const state = await handler.applyCommit(commitData, this.#deps.context) + const state = await handler.applyCommit(commitData, this.context) const state$ = new RunningState(state, false) this.add(state$) this.logger.verbose(`Genesis commit for stream ${streamId.toString()} successfully loaded`) @@ -357,7 +341,7 @@ export class Repository { if (fromMemory) { return fromMemory.state } else { - return this.#deps.pinStore.stateStore.load(streamId) + return this.stateManager.loadPinnedStream(streamId) } } @@ -369,7 +353,7 @@ export class Repository { } pin(state$: RunningState, force?: boolean): Promise { - return this.#deps.pinStore.add(state$, force) + return this.stateManager.add(state$, force) } async unpin(state$: RunningState, opts?: PublishOpts): Promise { @@ -385,7 +369,7 @@ export class Repository { this.stateManager.publishTip(state$) } - return this.#deps.pinStore.rm(state$) + return this.stateManager.rm(state$) } /** @@ -393,7 +377,7 @@ export class Repository { * If `streamId` is passed, indicate if it is pinned. */ async listPinned(streamId?: StreamID): Promise { - return this.#deps.pinStore.ls(streamId) + return this.stateManager.ls(streamId) } /** @@ -401,7 +385,7 @@ export class Repository { */ async randomPinnedStreamState(): Promise { // First get a random streamID from the state store. - const res = await this.#deps.pinStore.stateStore.listStoredStreamIDs(null, 1) + const res = await this.stateManager.listStoredStreamIDs(null, 1) if (res.length == 0) { return null } @@ -414,7 +398,7 @@ export class Repository { } const [streamID] = res - return this.#deps.pinStore.stateStore.load(StreamID.fromString(streamID)) + return this.stateManager.loadPinnedStream(StreamID.fromString(streamID)) } /** @@ -484,7 +468,7 @@ export class Repository { this.inmemory.delete(id) stream.complete() }) - await this.#deps.pinStore.close() + await this.stateManager.close() await this.index.close() } } diff --git a/packages/core/src/state-management/state-manager.ts b/packages/core/src/state-management/state-manager.ts index 412f967edf..12953f9100 100644 --- a/packages/core/src/state-management/state-manager.ts +++ b/packages/core/src/state-management/state-manager.ts @@ -15,6 +15,7 @@ import { DiagnosticsLogger, StreamUtils, GenesisCommit, + StreamState, } from '@ceramicnetwork/common' import { RunningState } from './running-state.js' import type { CID } from 'multiformats/cid' @@ -24,6 +25,7 @@ import { SnapshotState } from './snapshot-state.js' import { CommitID, StreamID } from '@ceramicnetwork/streamid' import { LocalIndexApi } from '../indexing/local-index-api.js' import { AnchorRequestStore } from '../store/anchor-request-store.js' +import { IKVStore } from '../store/ikv-store.js' const APPLY_ANCHOR_COMMIT_ATTEMPTS = 3 @@ -48,9 +50,9 @@ export class StateManager { * @param indexStreamIfNeeded - `Repository#indexStreamIfNeeded` */ constructor( - private readonly dispatcher: Dispatcher, + readonly dispatcher: Dispatcher, private readonly pinStore: PinStore, - private readonly anchorRequestStore: AnchorRequestStore, + readonly anchorRequestStore: AnchorRequestStore, private readonly executionQ: ExecutionQueue, public anchorService: AnchorService, public conflictResolution: ConflictResolution, @@ -61,9 +63,72 @@ export class StateManager { opts?: LoadOpts | CreateOpts ) => Promise, private readonly indexStreamIfNeeded, - private readonly _index: LocalIndexApi | undefined + readonly _index: LocalIndexApi | undefined ) {} + /** + * Open the state manager + */ + async open(store: IKVStore): Promise { + return await this.pinStore.open(store) + } + /** + * Load a pinned stream + */ + async loadPinnedStream(streamId: StreamID): Promise { + return await this.pinStore.stateStore.load(streamId) + } + + /** + * List a pinned stream + * @param streamId + */ + async ls(streamId?: StreamID): Promise { + return await this.pinStore.ls(streamId) + } + + /** + * List a set of pinned streams + */ + async listStoredStreamIDs(streamId?: StreamID | null, limit?: number): Promise { + return await this.pinStore.stateStore.listStoredStreamIDs(streamId, limit) + } + + /** + * Takes a StreamState and finds all the IPFS CIDs that are in any way needed to load data + * from the stream, pins them against the configured pinning backend, writes the + * StreamState itself into the state store, and updates the RunningState's pinned commits which + * prevents the StreamState's commits from being stored again. + * @param runningState - object holding the current StreamState for the stream being pinned + * If the stream was previously pinned, then this will also contain a set of CIDs + * (in string representation) of the commits that were pinned previously. This means + * we only need to pin CIDs corresponding to the commits contained in the log of the given + * StreamState that aren't contained within `pinnedCommits` + * @param force - optional boolean that if set to true forces all commits in the stream to pinned, + * regardless of whether they have been previously pinned + */ + async add(runningState: RunningState, force?: boolean): Promise { + await this.pinStore.add(runningState, force) + } + + /** + * Effectively opposite of 'add' - this finds all the IPFS CIDs that are required to load the + * given stream and unpins them from IPFS, and them removes the stream state from the Ceramic + * state store. There is one notable difference of behavior however, which is that 'rm()' + * intentionally leaves the CIDs that make up the anchor proof and anchor merkle tree pinned. + * This is to avoid accidentally unpinning data that is needed by other streams, in the case where + * there are multiple pinned streams that contain anchor commits from the same anchor batch + * and therefore share the same anchor proof and merkle tree. + * @param runningState + */ + async rm(runningState: RunningState): Promise { + return await this.pinStore.rm(runningState) + } + + async close(): Promise { + return await this.pinStore.close() + } + /** * Returns whether the given StreamID corresponds to a pinned stream that has been synced at least * once during the lifetime of this process. As long as it's been synced once, it's guaranteed to diff --git a/packages/core/src/sync/sync-api.ts b/packages/core/src/sync/sync-api.ts index 6a746dd0de..4439c279c5 100644 --- a/packages/core/src/sync/sync-api.ts +++ b/packages/core/src/sync/sync-api.ts @@ -58,12 +58,11 @@ export class SyncApi implements ISyncApi { private provider: Provider private chainId: SupportedNetwork private initialIndexingBlock: number + private handleCommit: HandleCommit constructor( private readonly syncConfig: SyncConfig, private readonly ipfsService: IpfsService, - private readonly handleCommit: HandleCommit, - private readonly localIndex: LocalIndexApi, private readonly diagnosticsLogger: DiagnosticsLogger ) { if (!this.syncConfig.on) return @@ -72,7 +71,11 @@ export class SyncApi implements ISyncApi { this.jobQueue = new JobQueue(this.syncConfig.db, this.diagnosticsLogger) } - async init(provider: Provider): Promise { + bind(handleCommit: HandleCommit) { + this.handleCommit = handleCommit + } + + async init(provider: Provider, localIndex: LocalIndexApi): Promise { if (!this.syncConfig.on) return this.provider = provider @@ -83,7 +86,7 @@ export class SyncApi implements ISyncApi { const [latestBlock, { processedBlockNumber }] = await Promise.all([ this.provider.getBlock(-BLOCK_CONFIRMATIONS), this._initStateTable(), - this._initModelsToSync(), + this._initModelsToSync(localIndex), this._initJobQueue(), ]) @@ -132,8 +135,8 @@ export class SyncApi implements ISyncApi { /** * Load models to sync from the DB. */ - async _initModelsToSync(): Promise { - const streamsIds = await this.localIndex.indexedModels() + async _initModelsToSync(localIndex: LocalIndexApi): Promise { + const streamsIds = await localIndex.indexedModels() for (const id of streamsIds) { this.modelsToSync.add(id.toString()) }