Skip to content

Move cache clear after tx committed #2386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Update cache to always flush with a block height and clear after transaction commit (#2386)

## [10.2.0] - 2024-05-08
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,18 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB

if (this.nodeConfig.storeCacheAsync) {
// Flush all completed block data and don't wait
await this.storeCacheService.flushAndWaitForCapacity(false, false)?.catch((e) => {
await this.storeCacheService.flushAndWaitForCapacity(false)?.catch((e) => {
logger.error(e, 'Flushing cache failed');
process.exit(1);
});
} else {
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false, true);
await this.storeCacheService.flushCache(false);
}

if (!this.projectService.hasDataSourcesAfterHeight(height)) {
logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`);
await this.storeCacheService.flushCache(false, true);
await this.storeCacheService.flushCache(false);
process.exit(0);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export abstract class BaseProjectService<
}

// Flush any pending operations to set up DB
await this.storeService.storeCache.flushCache(true, true);
await this.storeService.storeCache.flushCache(true);
} else {
assert(startHeight, 'ProjectService must be initalized with a start height in workers');
this.projectUpgradeService.initWorker(startHeight, this.handleProjectChange.bind(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown {
private queuedFlush?: Promise<void>;
protected logger: Pino.Logger;

protected constructor(private loggerName: string) {
protected constructor(loggerName: string) {
this.logger = getLogger(loggerName);
}

@profiler()
async flushCache(forceFlush?: boolean, flushAll?: boolean): Promise<void> {
async flushCache(forceFlush?: boolean): Promise<void> {
const flushCacheGuarded = async (forceFlush?: boolean): Promise<void> => {
// When we force flush, this will ensure not interrupt current block flushing,
// Force flush will continue after last block flush tx committed.
if (this.pendingFlush !== undefined) {
await this.pendingFlush;
}
if ((this.isFlushable() || forceFlush) && this.flushableRecords > 0) {
this.pendingFlush = this._flushCache(flushAll);
this.pendingFlush = this._flushCache();
// Remove reference to pending flush once it completes
this.pendingFlush.finally(() => (this.pendingFlush = undefined));
await this.pendingFlush;
Expand All @@ -45,7 +45,7 @@ export abstract class BaseCacheService implements BeforeApplicationShutdown {
async resetCache(): Promise<void> {
await this._resetCache();
}
abstract _flushCache(flushAll?: boolean): Promise<void>;
abstract _flushCache(): Promise<void>;
abstract _resetCache(): Promise<void> | void;
abstract isFlushable(): boolean;
abstract get flushableRecords(): number;
Expand Down
2 changes: 2 additions & 0 deletions packages/node-core/src/indexer/storeCache/cacheModel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ describe('cacheModel', () => {
throw new Error('Entity should exist');
}

// updated height to 2
blockHeight = 2;
testModel.set(
'entity1_id_0x01',
{
Expand Down
10 changes: 3 additions & 7 deletions packages/node-core/src/indexer/storeCache/cacheModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ export class CachedModel<
readonly model: ModelStatic<Model<T, T>>,
private readonly historical = true,
config: NodeConfig,
private getNextStoreOperationIndex: () => number,
// This is used by methods such as getByFields which don't support caches
private readonly useCockroachDb = false
private getNextStoreOperationIndex: () => number
) {
super();
// In case, this might be want to be 0
Expand Down Expand Up @@ -275,11 +273,9 @@ export class CachedModel<
return !!Object.keys(this.setCache).length || !!Object.keys(this.removeCache).length;
}

async runFlush(tx: Transaction, blockHeight?: number): Promise<void> {
async runFlush(tx: Transaction, blockHeight: number): Promise<void> {
// Get records relevant to the block height
const {removeRecords, setRecords} = blockHeight
? this.filterRecordsWithHeight(blockHeight)
: {removeRecords: this.removeCache, setRecords: this.setCache};
const {removeRecords, setRecords} = this.filterRecordsWithHeight(blockHeight);
// Filter non-historical could return undefined due to it been removed
let records = this.applyBlockRange(setRecords).filter((r) => !!r);
let dbOperation: Promise<unknown>;
Expand Down
5 changes: 1 addition & 4 deletions packages/node-core/src/indexer/storeCache/cacheable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ export abstract class Cacheable {

try {
tx.afterCommit(() => {
this.clear(blockHeight);
release();
});

const pendingFlush = this.runFlush(tx, blockHeight);

// Don't await DB operations to complete before clearing.
// This allows new data to be cached while flushing
this.clear(blockHeight);
await pendingFlush;
} catch (e) {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ describe('Store cache upper threshold', () => {

beforeEach(() => {
storeService = new StoreCacheService(sequelize, nodeConfig, eventEmitter);
storeService.init(false, false, {} as any, undefined);
storeService.init(false, false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined);
});

it('doesnt wait for flushing cache when threshold not met', async () => {
Expand All @@ -261,7 +261,7 @@ describe('Store cache upper threshold', () => {
}

const start = new Date().getTime();
await storeService.flushAndWaitForCapacity(true, true);
await storeService.flushAndWaitForCapacity(true);
const end = new Date().getTime();

// Should be less than 1s, we're not waiting
Expand All @@ -283,7 +283,7 @@ describe('Store cache upper threshold', () => {
}

const start = new Date().getTime();
await storeService.flushAndWaitForCapacity(true, true);
await storeService.flushAndWaitForCapacity(true);
const end = new Date().getTime();

// Should be more than 1s, we set the db tx.commit to take 1s
Expand Down
11 changes: 5 additions & 6 deletions packages/node-core/src/indexer/storeCache/storeCache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ export class StoreCacheService extends BaseCacheService {
model,
this._historical,
this.config,
this.getNextStoreOperationIndex.bind(this),
this._useCockroachDb
this.getNextStoreOperationIndex.bind(this)
);
if (this.config.csvOutDir) {
const exporterStore = new CsvStoreService(entityName, this.config.csvOutDir);
Expand Down Expand Up @@ -140,15 +139,15 @@ export class StoreCacheService extends BaseCacheService {
}

@profiler()
async _flushCache(flushAll?: boolean): Promise<void> {
async _flushCache(): Promise<void> {
this.logger.debug('Flushing cache');
// With historical disabled we defer the constraints check so that it doesn't matter what order entities are modified
const tx = await this.sequelize.transaction({
deferrable: this._historical || this._useCockroachDb ? undefined : Deferrable.SET_DEFERRED(),
});
try {
// Get the block height of all data we want to flush up to
const blockHeight = flushAll ? undefined : await this.metadata.find('lastProcessedHeight');
const blockHeight = await this.metadata.find('lastProcessedHeight');
// Get models that have data to flush
const updatableModels = Object.values(this.cachedModels).filter((m) => m.isFlushable);
if (this._useCockroachDb) {
Expand Down Expand Up @@ -179,10 +178,10 @@ export class StoreCacheService extends BaseCacheService {
}
}

async flushAndWaitForCapacity(forceFlush?: boolean, flushAll?: boolean): Promise<void> {
async flushAndWaitForCapacity(forceFlush?: boolean): Promise<void> {
const flushableRecords = this.flushableRecords;

const pendingFlush = this.flushCache(forceFlush, flushAll);
const pendingFlush = this.flushCache(forceFlush);

if (flushableRecords >= this.cacheUpperLimit) {
await pendingFlush;
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/test.runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class TestRunner<A, SA, B, DS> {

try {
await indexBlock(block, test.handler, this.indexerManager, this.apiService);
await this.storeService.storeCache.flushCache(true, true);
await this.storeService.storeCache.flushCache(true);
} catch (e: any) {
logger.warn(`Test: ${test.name} field due to runtime error`, e);
this.failedTestSummary = {
Expand Down Expand Up @@ -125,7 +125,7 @@ export class TestRunner<A, SA, B, DS> {
}
}

await this.storeService.storeCache.flushCache(true, true);
await this.storeService.storeCache.flushCache(true);
logger.info(
`Test: ${test.name} completed with ${chalk.green(`${this.passedTests} passed`)} and ${chalk.red(
`${this.failedTests} failed`
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/subcommands/reindex.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,6 @@ export class ReindexService<P extends ISubqueryProject, DS extends BaseDataSourc
this.forceCleanService
);

await this.storeService.storeCache.flushCache(true, true);
await this.storeService.storeCache.flushCache(true);
}
}
2 changes: 1 addition & 1 deletion packages/node-core/src/utils/reindex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export async function reindex(
await forceCleanService.forceClean();
} else {
logger.info(`Reindexing to block: ${targetBlockHeight}`);
await storeService.storeCache.flushCache(true, false);
await storeService.storeCache.flushCache(true);
await storeService.storeCache.resetCache();
const transaction = await sequelize.transaction();
try {
Expand Down
Loading