Skip to content

chore(core): Reduce circular dependencies #2726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/src/ceramic-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export class CeramicDaemon {
opts.stateStore?.s3Endpoint
)

await ceramic.repository.injectKeyValueStore(s3Store)
await ceramic.repository.setKeyValueStore(s3Store)
}
const did = new DID({ resolver: makeResolvers(ceramic, ceramicConfig, opts) })
ceramic.did = did
Expand Down
47 changes: 27 additions & 20 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { AnchorResumingService } from './state-management/anchor-resuming-servic
import { SyncApi } from './sync/sync-api.js'
import { ProvidersCache } from './providers-cache.js'
import crypto from 'crypto'
import { StateManager } from './state-management/state-manager.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -275,33 +276,39 @@ export class Ceramic implements CeramicApi {
this._streamHandlers
)
const pinStore = modules.pinStoreFactory.createPinStore()
const localIndex = new LocalIndexApi(
params.indexingConfig,
this.repository,
this._logger,
params.networkOptions.name
)
this.repository.setDeps({
dispatcher: this.dispatcher,
pinStore: pinStore,
keyValueStore: this._levelStore,
anchorRequestStore: new AnchorRequestStore(),
context: this.context,
handlers: this._streamHandlers,
anchorService: modules.anchorService,
conflictResolution: conflictResolution,
indexing: localIndex,
})
this.syncApi = new SyncApi(
{
db: params.indexingConfig.db,
on: params.sync,
},
this.dispatcher,
this.repository.stateManager.handleUpdate.bind(this.repository.stateManager),
this.repository.index,
//this.repository.index,
this._logger
)
const localIndex = new LocalIndexApi(
params.indexingConfig,
this.repository,
this._logger,
params.networkOptions.name
)
const stateManager = new StateManager(
this.dispatcher,
pinStore,
new AnchorRequestStore(),
this.repository.executionQ,
modules.anchorService,
conflictResolution,
this._logger,
(streamId) => this.repository.fromMemoryOrStore(streamId),
(streamId, opts) => this.repository.load(streamId, opts),
this.repository.indexStreamIfNeeded.bind(this),
localIndex
)
this.syncApi.bind(stateManager.handleUpdate.bind(stateManager))
this.repository.setContext(this.context)
this.repository.setStateManager(stateManager)
this.repository.setHandlers(this._streamHandlers)
this.repository.setKeyValueStore(this._levelStore)
this.admin = new LocalAdminApi(localIndex, this.syncApi, this.nodeStatus.bind(this))
}

Expand Down Expand Up @@ -603,7 +610,7 @@ export class Ceramic implements CeramicApi {

if (this.syncApi.enabled) {
const provider = await this.providersCache.getProvider(chainId)
await this.syncApi.init(provider)
await this.syncApi.init(provider, this.index)
}

await this._startupChecks()
Expand Down
94 changes: 39 additions & 55 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ import { LocalIndexApi } from '../indexing/local-index-api.js'
import { IKVStore } from '../store/ikv-store.js'
import { AnchorRequestStore } from '../store/anchor-request-store.js'

export type RepositoryDependencies = {
dispatcher: Dispatcher
pinStore: PinStore
keyValueStore: IKVStore
anchorRequestStore: AnchorRequestStore
context: Context
handlers: HandlersMap
anchorService: AnchorService
conflictResolution: ConflictResolution
indexing: LocalIndexApi
}

const DEFAULT_LOAD_OPTS = { sync: SyncOptions.PREFER_CACHE, syncTimeoutSeconds: 3 }

/**
Expand Down Expand Up @@ -70,15 +58,25 @@ export class Repository {
readonly inmemory: StateCache<RunningState>

/**
* Various dependencies.
* KeyValueStore
*/
#deps: RepositoryDependencies
keyValueStore: IKVStore

/**
* Instance of StateManager for performing operations on stream state.
*/
stateManager: StateManager

/**
* Handlers to use
*/
handlers: HandlersMap

/**
* Context to use
*/
context: Context

/**
* @param cacheLimit - Maximum number of streams to store in memory cache.
* @param logger - Where we put diagnostics messages.
Expand All @@ -100,57 +98,43 @@ export class Repository {
this.updates$ = this.updates$.bind(this)
}

async injectKeyValueStore(stateStore: IKVStore): Promise<void> {
this.setDeps({
...this.#deps,
keyValueStore: stateStore,
})
await this.init()
setKeyValueStore(keyValueStore: IKVStore) {
this.keyValueStore = keyValueStore
}

async init(): Promise<void> {
await this.pinStore.open(this.#deps.keyValueStore)
await this.anchorRequestStore.open(this.#deps.keyValueStore)
await this.stateManager.open(this.keyValueStore)
await this.anchorRequestStore.open(this.keyValueStore)
await this.index.init()
}

get pinStore(): PinStore {
return this.#deps.pinStore
}

get anchorRequestStore(): AnchorRequestStore {
return this.#deps.anchorRequestStore
return this.stateManager.anchorRequestStore
}

get index(): LocalIndexApi {
return this.#deps.indexing
return this.stateManager._index
}

// Ideally this would be provided in the constructor, but circular dependencies in our initialization process make this necessary for now
setDeps(deps: RepositoryDependencies): void {
this.#deps = deps
this.stateManager = new StateManager(
deps.dispatcher,
deps.pinStore,
deps.anchorRequestStore,
this.executionQ,
deps.anchorService,
deps.conflictResolution,
this.logger,
(streamId) => this.fromMemoryOrStore(streamId),
(streamId, opts) => this.load(streamId, opts),
// TODO (NET-1687): remove as part of refactor to push indexing into state-manager.ts
this.indexStreamIfNeeded.bind(this),
deps.indexing
)
setStateManager(stateManager: StateManager): void {
this.stateManager = stateManager
}

setHandlers(handlers: HandlersMap): void {
this.handlers = handlers
}

setContext(context: Context): void {
this.context = context
}

private fromMemory(streamId: StreamID): RunningState | undefined {
return this.inmemory.get(streamId.toString())
}

private async fromStreamStateStore(streamId: StreamID): Promise<RunningState | undefined> {
const streamState = await this.#deps.pinStore.stateStore.load(streamId)
const streamState = await this.stateManager.loadPinnedStream(streamId)
if (streamState) {
const runningState = new RunningState(streamState, true)
this.add(runningState)
Expand All @@ -165,17 +149,17 @@ export class Repository {
}

private async fromNetwork(streamId: StreamID): Promise<RunningState> {
const handler = this.#deps.handlers.get(streamId.typeName)
const handler = this.handlers.get(streamId.typeName)
const genesisCid = streamId.cid
const commitData = await Utils.getCommitData(this.#deps.dispatcher, genesisCid, streamId)
const commitData = await Utils.getCommitData(this.stateManager.dispatcher, genesisCid, streamId)
if (commitData == null) {
throw new Error(`No genesis commit found with CID ${genesisCid.toString()}`)
}
// Do not check for possible key revocation here, as we will do so later after loading the tip
// (or learning that the genesis commit *is* the current tip), when we will have timestamp
// information for when the genesis commit was anchored.
commitData.disableTimecheck = true
const state = await handler.applyCommit(commitData, this.#deps.context)
const state = await handler.applyCommit(commitData, this.context)
const state$ = new RunningState(state, false)
this.add(state$)
this.logger.verbose(`Genesis commit for stream ${streamId.toString()} successfully loaded`)
Expand Down Expand Up @@ -357,7 +341,7 @@ export class Repository {
if (fromMemory) {
return fromMemory.state
} else {
return this.#deps.pinStore.stateStore.load(streamId)
return this.stateManager.loadPinnedStream(streamId)
}
}

Expand All @@ -369,7 +353,7 @@ export class Repository {
}

pin(state$: RunningState, force?: boolean): Promise<void> {
return this.#deps.pinStore.add(state$, force)
return this.stateManager.add(state$, force)
}

async unpin(state$: RunningState, opts?: PublishOpts): Promise<void> {
Expand All @@ -385,23 +369,23 @@ export class Repository {
this.stateManager.publishTip(state$)
}

return this.#deps.pinStore.rm(state$)
return this.stateManager.rm(state$)
}

/**
* List pinned streams as array of StreamID strings.
* If `streamId` is passed, indicate if it is pinned.
*/
async listPinned(streamId?: StreamID): Promise<string[]> {
return this.#deps.pinStore.ls(streamId)
return this.stateManager.ls(streamId)
}

/**
* Returns the StreamState of a random pinned stream from the state store
*/
async randomPinnedStreamState(): Promise<StreamState | null> {
// First get a random streamID from the state store.
const res = await this.#deps.pinStore.stateStore.listStoredStreamIDs(null, 1)
const res = await this.stateManager.listStoredStreamIDs(null, 1)
if (res.length == 0) {
return null
}
Expand All @@ -414,7 +398,7 @@ export class Repository {
}

const [streamID] = res
return this.#deps.pinStore.stateStore.load(StreamID.fromString(streamID))
return this.stateManager.loadPinnedStream(StreamID.fromString(streamID))
}

/**
Expand Down Expand Up @@ -484,7 +468,7 @@ export class Repository {
this.inmemory.delete(id)
stream.complete()
})
await this.#deps.pinStore.close()
await this.stateManager.close()
await this.index.close()
}
}
71 changes: 68 additions & 3 deletions packages/core/src/state-management/state-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
DiagnosticsLogger,
StreamUtils,
GenesisCommit,
StreamState,
} from '@ceramicnetwork/common'
import { RunningState } from './running-state.js'
import type { CID } from 'multiformats/cid'
Expand All @@ -24,6 +25,7 @@ import { SnapshotState } from './snapshot-state.js'
import { CommitID, StreamID } from '@ceramicnetwork/streamid'
import { LocalIndexApi } from '../indexing/local-index-api.js'
import { AnchorRequestStore } from '../store/anchor-request-store.js'
import { IKVStore } from '../store/ikv-store.js'

const APPLY_ANCHOR_COMMIT_ATTEMPTS = 3

Expand All @@ -48,9 +50,9 @@ export class StateManager {
* @param indexStreamIfNeeded - `Repository#indexStreamIfNeeded`
*/
constructor(
private readonly dispatcher: Dispatcher,
readonly dispatcher: Dispatcher,
private readonly pinStore: PinStore,
private readonly anchorRequestStore: AnchorRequestStore,
readonly anchorRequestStore: AnchorRequestStore,
private readonly executionQ: ExecutionQueue,
public anchorService: AnchorService,
public conflictResolution: ConflictResolution,
Expand All @@ -61,9 +63,72 @@ export class StateManager {
opts?: LoadOpts | CreateOpts
) => Promise<RunningState>,
private readonly indexStreamIfNeeded,
private readonly _index: LocalIndexApi | undefined
readonly _index: LocalIndexApi | undefined
) {}

/**
* Open the state manager
*/
async open(store: IKVStore): Promise<void> {
return await this.pinStore.open(store)
}
/**
* Load a pinned stream
*/
async loadPinnedStream(streamId: StreamID): Promise<StreamState> {
return await this.pinStore.stateStore.load(streamId)
}

/**
* List a pinned stream
* @param streamId
*/
async ls(streamId?: StreamID): Promise<string[]> {
return await this.pinStore.ls(streamId)
}

/**
* List a set of pinned streams
*/
async listStoredStreamIDs(streamId?: StreamID | null, limit?: number): Promise<string[]> {
return await this.pinStore.stateStore.listStoredStreamIDs(streamId, limit)
}

/**
* Takes a StreamState and finds all the IPFS CIDs that are in any way needed to load data
* from the stream, pins them against the configured pinning backend, writes the
* StreamState itself into the state store, and updates the RunningState's pinned commits which
* prevents the StreamState's commits from being stored again.
* @param runningState - object holding the current StreamState for the stream being pinned
* If the stream was previously pinned, then this will also contain a set of CIDs
* (in string representation) of the commits that were pinned previously. This means
* we only need to pin CIDs corresponding to the commits contained in the log of the given
* StreamState that aren't contained within `pinnedCommits`
* @param force - optional boolean that if set to true forces all commits in the stream to pinned,
* regardless of whether they have been previously pinned
*/
async add(runningState: RunningState, force?: boolean): Promise<void> {
await this.pinStore.add(runningState, force)
}

/**
* Effectively opposite of 'add' - this finds all the IPFS CIDs that are required to load the
* given stream and unpins them from IPFS, and them removes the stream state from the Ceramic
* state store. There is one notable difference of behavior however, which is that 'rm()'
* intentionally leaves the CIDs that make up the anchor proof and anchor merkle tree pinned.
* This is to avoid accidentally unpinning data that is needed by other streams, in the case where
* there are multiple pinned streams that contain anchor commits from the same anchor batch
* and therefore share the same anchor proof and merkle tree.
* @param runningState
*/
async rm(runningState: RunningState): Promise<void> {
return await this.pinStore.rm(runningState)
}

async close(): Promise<void> {
return await this.pinStore.close()
}

/**
* Returns whether the given StreamID corresponds to a pinned stream that has been synced at least
* once during the lifetime of this process. As long as it's been synced once, it's guaranteed to
Expand Down
Loading