Skip to content
This repository was archived by the owner on Sep 30, 2023. It is now read-only.

Commit 351beac

Browse files
committed
Refactor replication status info
1 parent 49df4ff commit 351beac

File tree

2 files changed

+81
-38
lines changed

2 files changed

+81
-38
lines changed

src/Store.js

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const mapSeries = require('p-each-series')
66
const Log = require('ipfs-log')
77
const Index = require('./Index')
88
const Replicator = require('./Replicator')
9+
const ReplicationInfo = require('./replication-info')
910

1011
const Logger = require('logplease')
1112
const logger = Logger.create("orbit-db.store", { color: Logger.Colors.Blue })
@@ -60,12 +61,7 @@ class Store {
6061
this._oplog = new Log(this._ipfs, this.id, null, null, null, this._key, this.access.write)
6162

6263
// Replication progress info
63-
this._replicationInfo = {
64-
buffered: 0,
65-
queued: 0,
66-
progress: 0,
67-
max: 0,
68-
}
64+
this._replicationStatus = new ReplicationInfo()
6965

7066
// Statistics
7167
this._stats = {
@@ -82,34 +78,33 @@ class Store {
8278
this._loader = this._replicator
8379
this._replicator.on('load.added', (entry) => {
8480
// Update the latest entry state (latest is the entry with largest clock time)
85-
this._replicationInfo.queued ++
86-
this._replicationInfo.max = Math.max.apply(null, [this._replicationInfo.max, this._oplog.length, entry.clock ? entry.clock.time : 0])
81+
this._replicationStatus.queued ++
82+
this._recalculateReplicationMax(entry.clock ? entry.clock.time : 0)
8783
// logger.debug(`<replicate>`)
8884
this.events.emit('replicate', this.address.toString(), entry)
8985
})
9086
this._replicator.on('load.progress', (id, hash, entry, have, bufferedLength) => {
91-
// console.log(">>", this._oplog.length, this._replicationInfo.progress, this._replicationInfo.buffered, bufferedLength)
92-
if (this._replicationInfo.buffered > bufferedLength) {
93-
this._replicationInfo.progress = this._replicationInfo.progress + bufferedLength
87+
if (this._replicationStatus.buffered > bufferedLength) {
88+
this._recalculateReplicationProgress(this.replicationStatus.progress + bufferedLength)
9489
} else {
95-
this._replicationInfo.progress = Math.max.apply(null, [this._oplog.length, this._replicationInfo.progress, this._oplog.length + bufferedLength])
90+
this._recalculateReplicationProgress(this._oplog.length + bufferedLength)
9691
}
97-
// console.log(">>>", this._replicationInfo.progress)
98-
this._replicationInfo.buffered = bufferedLength
99-
this._replicationInfo.max = Math.max.apply(null, [this._replicationInfo.max, this._replicationInfo.progress])
92+
this._replicationStatus.buffered = bufferedLength
93+
this._recalculateReplicationMax(this.replicationStatus.progress)
10094
// logger.debug(`<replicate.progress>`)
101-
this.events.emit('replicate.progress', this.address.toString(), hash, entry, this._replicationInfo.progress, have)
95+
this.events.emit('replicate.progress', this.address.toString(), hash, entry, this.replicationStatus.progress, null)
10296
})
10397

10498
const onLoadCompleted = async (logs, have) => {
10599
try {
106100
for (let log of logs) {
107101
await this._oplog.join(log, -1, this._oplog.id)
108102
}
109-
this._replicationInfo.max = Math.max(this._replicationInfo.max, this._oplog.length)
103+
this._replicationStatus.queued -= logs.length
104+
this._replicationStatus.buffered = this._replicator._buffer.length
105+
this._recalculateReplicationMax()
110106
this._index.updateIndex(this._oplog)
111-
this._replicationInfo.progress = Math.max.apply(null, [this._replicationInfo.progress, this._oplog.length])
112-
this._replicationInfo.queued -= logs.length
107+
this._recalculateReplicationProgress()
113108
// logger.debug(`<replicated>`)
114109
this.events.emit('replicated', this.address.toString(), logs.length)
115110
} catch (e) {
@@ -136,24 +131,29 @@ class Store {
136131
return this._key
137132
}
138133

134+
/**
135+
* Returns the database's current replication status information
136+
* @return {[Object]} [description]
137+
*/
138+
get replicationStatus () {
139+
return this._replicationStatus
140+
}
141+
139142
async close () {
140143
if (this.options.onClose)
141144
await this.options.onClose(this.address.toString())
142145

143146
// Reset replication statistics
144-
this._replicationInfo = {
145-
buffered: 0,
146-
queued: 0,
147-
progress: 0,
148-
max: 0,
149-
}
147+
this._replicationStatus.reset()
148+
150149
// Reset database statistics
151150
this._stats = {
152151
snapshot: {
153152
bytesLoaded: -1,
154153
},
155154
syncRequestsReceieved: 0,
156155
}
156+
157157
// Remove all event listeners
158158
this.events.removeAllListeners('load')
159159
this.events.removeAllListeners('load.progress')
@@ -173,9 +173,14 @@ class Store {
173173
return Promise.resolve()
174174
}
175175

176+
/**
177+
* Drops a database and removes local data
178+
* @return {[None]}
179+
*/
176180
async drop () {
177181
await this.close()
178182
await this._cache.destroy()
183+
// Reset
179184
this._index = new this.options.Index(this.id)
180185
this._oplog = new Log(this._ipfs, this.id, null, null, null, this._key, this.access.write)
181186
this._cache = this.options.cache
@@ -192,11 +197,10 @@ class Store {
192197
this.events.emit('load', this.address.toString(), heads)
193198

194199
await mapSeries(heads, async (head) => {
195-
this._replicationInfo.max = Math.max(this._replicationInfo.max, head.clock.time)
200+
this._recalculateReplicationMax(head.clock.time)
196201
let log = await Log.fromEntryHash(this._ipfs, head.hash, this._oplog.id, amount, this._oplog.values, this.key, this.access.write, this._onLoadProgress.bind(this))
197202
await this._oplog.join(log, amount, this._oplog.id)
198-
this._replicationInfo.progress = Math.max.apply(null, [this._replicationInfo.progress, this._oplog.length])
199-
this._replicationInfo.max = Math.max.apply(null, [this._replicationInfo.max, this._replicationInfo.progress])
203+
this._recalculateReplicationProgress()
200204
})
201205

202206
// Update the index
@@ -301,6 +305,8 @@ class Store {
301305
async loadFromSnapshot (onProgressCallback) {
302306
this.events.emit('load', this.address.toString())
303307

308+
const maxClock = (res, val) => Math.max(res, val.clock.time)
309+
304310
const queue = await this._cache.get('queue')
305311
this.sync(queue || [])
306312

@@ -364,10 +370,8 @@ class Store {
364370

365371
if (header) {
366372
this._type = header.type
367-
this._replicationInfo.max = Math.max(this._replicationInfo.max, values.reduce((res, val) => Math.max(res, val.clock.time), 0))
368373
resolve({ values: values, id: header.id, heads: header.heads, type: header.type })
369374
} else {
370-
this._replicationInfo.max = 0
371375
resolve({ values: values, id: null, heads: null, type: null })
372376
}
373377
}
@@ -377,34 +381,34 @@ class Store {
377381
}
378382

379383
const onProgress = (hash, entry, count, total) => {
380-
this._replicationInfo.max = Math.max(this._replicationInfo.max, entry.clock.time)
381-
this._replicationInfo.progress = Math.max.apply(null, [this._replicationInfo.progress, count, this._oplog.length])
382-
this._onLoadProgress(hash, entry, this._replicationInfo.progress, this._replicationInfo.max)
384+
this._recalculateReplicationStatus(count, entry.clock.time)
385+
this._onLoadProgress(hash, entry)
383386
}
384387

385388
// Fetch the entries
386389
// Timeout 1 sec to only load entries that are already fetched (in order to not get stuck at loading)
387390
const snapshotData = await loadSnapshotData()
391+
this._recalculateReplicationMax(snapshotData.values.reduce(maxClock, 0))
388392
if (snapshotData) {
389393
const log = await Log.fromJSON(this._ipfs, snapshotData, -1, this._key, this.access.write, 1000, onProgress)
390394
await this._oplog.join(log, -1, this._oplog.id)
391-
this._replicationInfo.max = Math.max.apply(null, [this._replicationInfo.max, this._replicationInfo.progress, this._oplog.length])
392-
this._replicationInfo.progress = Math.max(this._replicationInfo.progress, this._oplog.length)
395+
this._recalculateReplicationMax()
393396
this._index.updateIndex(this._oplog)
397+
this._recalculateReplicationProgress()
394398
this.events.emit('replicated', this.address.toString())
395399
}
396400
this.events.emit('ready', this.address.toString(), this._oplog.heads)
397401
} else {
398402
throw new Error(`Snapshot for ${this.address} not found!`)
399403
}
404+
400405
return this
401406
}
402407

403408
async _addOperation (data, batchOperation, lastOperation, onProgressCallback) {
404409
if (this._oplog) {
405410
const entry = await this._oplog.append(data, this.options.referenceCount)
406-
this._replicationInfo.progress++
407-
this._replicationInfo.max = Math.max.apply(null, [this._replicationInfo.max, this._replicationInfo.progress, entry.clock.time])
411+
this._recalculateReplicationStatus(this.replicationStatus.progress + 1, entry.clock.time)
408412
await this._cache.set('_localHeads', [entry])
409413
this._index.updateIndex(this._oplog)
410414
this.events.emit('write', this.address.toString(), entry, this._oplog.heads)
@@ -418,7 +422,32 @@ class Store {
418422
}
419423

420424
_onLoadProgress (hash, entry, progress, total) {
421-
this.events.emit('load.progress', this.address.toString(), hash, entry, Math.max(this._oplog.length, progress), Math.max(this._oplog.length || 0, this._replicationInfo.max || 0))
425+
this._recalculateReplicationStatus(progress, total)
426+
this.events.emit('load.progress', this.address.toString(), hash, entry, this.replicationStatus.progress, this.replicationStatus.max)
427+
}
428+
429+
/* Replication Status state updates */
430+
431+
_recalculateReplicationProgress (max) {
432+
this._replicationStatus.progress = Math.max.apply(null, [
433+
this._replicationStatus.progress,
434+
this._oplog.length,
435+
max || 0,
436+
])
437+
this._recalculateReplicationMax(this.replicationStatus.progress)
438+
}
439+
440+
_recalculateReplicationMax (max) {
441+
this._replicationStatus.max = Math.max.apply(null, [
442+
this._replicationStatus.max,
443+
this._oplog.length,
444+
max || 0,
445+
])
446+
}
447+
448+
_recalculateReplicationStatus (maxProgress, maxTotal) {
449+
this._recalculateReplicationProgress(maxProgress)
450+
this._recalculateReplicationMax(maxTotal)
422451
}
423452
}
424453

src/replication-info.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
class ReplicationInfo {
2+
constructor () {
3+
this.reset()
4+
}
5+
6+
reset () {
7+
this.progress = 0
8+
this.max = 0
9+
this.buffered = 0
10+
this.queued = 0
11+
}
12+
}
13+
14+
module.exports = ReplicationInfo

0 commit comments

Comments
 (0)