Skip to content

Rewrite #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 100 additions & 77 deletions level-ttl.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ function expiryKey (db, exp, key) {
function buildQuery (db) {
const encode = db._ttl.encoding.encode
const expiryNs = db._ttl._expiryNs

// TODO: add limit (esp. important when checkInterval is long)
return {
keyEncoding: 'binary',
valueEncoding: 'binary',
Expand All @@ -24,58 +26,62 @@ function buildQuery (db) {
}
}

function startTtl (db, checkFrequency) {
function startTtl (db, checkFrequency, setInterval) {
db._ttl.intervalId = setInterval(function () {
if (db._ttl._checkInProgress) return

const batch = []
const subBatch = []
const sub = db._ttl.sub
const query = buildQuery(db)
const decode = db._ttl.encoding.decode
var createReadStream
const it = (sub || db).iterator(query)

db._ttl._checkInProgress = true
next()

if (sub) {
createReadStream = sub.createReadStream.bind(sub)
} else {
createReadStream = db.createReadStream.bind(db)
}
function next () {
it.next(function (err, key, value) {
if (err) {
it.end(function () {
doneReading(err)
})
} else if (key === undefined) {
it.end(doneReading)
} else {
// the value is the key!
const realKey = decode(value)

createReadStream(query)
.on('data', function (data) {
// the value is the key!
const key = decode(data.value)
// expiryKey that matches this query
subBatch.push({ type: 'del', key: data.key })
subBatch.push({ type: 'del', key: prefixKey(db, key) })
// the actual data that should expire now!
batch.push({ type: 'del', key: key })
})
.on('error', db.emit.bind(db, 'error'))
.on('end', function () {
if (!batch.length) return
// expiryKey that matches this query
subBatch.push({ type: 'del', key: key })
subBatch.push({ type: 'del', key: prefixKey(db, realKey) })

if (sub) {
sub.batch(subBatch, { keyEncoding: 'binary' }, function (err) {
if (err) db.emit('error', err)
})
// the actual data that should expire now!
batch.push({ type: 'del', key: realKey })

db._ttl.batch(batch, { keyEncoding: 'binary' }, function (err) {
if (err) db.emit('error', err)
})
} else {
db._ttl.batch(subBatch.concat(batch), { keyEncoding: 'binary' }, function (err) {
if (err) db.emit('error', err)
})
}
})
.on('close', function () {
db._ttl._checkInProgress = false
if (db._ttl._stopAfterCheck) {
stopTtl(db, db._ttl._stopAfterCheck)
db._ttl._stopAfterCheck = null
next()
}
})
}

function doneReading (err) {
if (err || !batch.length) {
doneWriting(err)
} else if (sub) {
const next = after(2, doneWriting)
sub.batch(subBatch, { keyEncoding: 'binary' }, next)
db._ttl.batch(batch, { keyEncoding: 'binary' }, next)
} else {
db._ttl.batch(subBatch.concat(batch), { keyEncoding: 'binary' }, doneWriting)
}
}

function doneWriting (err) {
if (err) db.emit('error', err)

db._ttl._checkInProgress = false
db.emit('ttl:sweep')
}
}, checkFrequency)

if (db._ttl.intervalId.unref) {
Expand All @@ -84,34 +90,34 @@ function startTtl (db, checkFrequency) {
}

function stopTtl (db, callback) {
db._ttl.options.clearInterval.call(null, db._ttl.intervalId)

// can't close a db while an interator is in progress
// so if one is, defer
if (db._ttl._checkInProgress) {
db._ttl._stopAfterCheck = callback
// TODO do we really need to return the callback here?
return db._ttl._stopAfterCheck
db.once('ttl:sweep', callback)
} else {
process.nextTick(callback)
}
clearInterval(db._ttl.intervalId)
callback && callback()
}

function ttlon (db, keys, ttl, callback) {
if (!keys.length) return process.nextTick(callback)

const exp = new Date(Date.now() + ttl)
const batch = []
const sub = db._ttl.sub
const batchFn = (sub ? sub.batch.bind(sub) : db._ttl.batch)
const encode = db._ttl.encoding.encode

db._ttl._lock(keys, function (release) {
callback = release(callback || function () {})
callback = release(callback)
ttloff(db, keys, function () {
keys.forEach(function (key) {
batch.push({ type: 'put', key: expiryKey(db, exp, key), value: encode(key) })
batch.push({ type: 'put', key: prefixKey(db, key), value: encode(exp) })
})

if (!batch.length) return callback()

batchFn(batch, { keyEncoding: 'binary', valueEncoding: 'binary' }, function (err) {
if (err) { db.emit('error', err) }
callback()
Expand All @@ -121,6 +127,8 @@ function ttlon (db, keys, ttl, callback) {
}

function ttloff (db, keys, callback) {
if (!keys.length) return process.nextTick(callback)

const batch = []
const sub = db._ttl.sub
const getFn = (sub ? sub.get.bind(sub) : db.get.bind(db))
Expand All @@ -129,11 +137,9 @@ function ttloff (db, keys, callback) {
const done = after(keys.length, function (err) {
if (err) db.emit('error', err)

if (!batch.length) return callback && callback()

batchFn(batch, { keyEncoding: 'binary', valueEncoding: 'binary' }, function (err) {
if (err) { db.emit('error', err) }
callback && callback()
callback()
})
})

Expand All @@ -153,6 +159,8 @@ function put (db, key, value, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof callback !== 'function') {
throw new Error('put() requires a callback argument')
}

options || (options = {})
Expand All @@ -161,15 +169,16 @@ function put (db, key, value, options, callback) {
options.ttl = db._ttl.options.defaultTTL
}

var done
var _callback = callback

if (options.ttl > 0 && key != null && value != null) {
done = after(2, _callback || function () {})
callback = done
ttlon(db, [key], options.ttl, done)
// TODO: batch together with actual key
return ttlon(db, [key], options.ttl, function (err) {
if (err) return callback(err)

db._ttl.put.call(db, key, value, options, callback)
})
}

// TODO: remove existing TTL if any?
db._ttl.put.call(db, key, value, options, callback)
}

Expand All @@ -180,13 +189,21 @@ function setTtl (db, key, ttl, callback) {
}

function del (db, key, options, callback) {
var done
var _callback = callback
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof callback !== 'function') {
throw new Error('del() requires a callback argument')
}

if (key != null) {
done = after(2, _callback || function () {})
callback = done
ttloff(db, [key], done)
// TODO: batch together with actual key
// TODO: or even skip this, should get swept up anyway
return ttloff(db, [key], function (err) {
if (err) return callback(err)

db._ttl.del.call(db, key, options, callback)
})
}

db._ttl.del.call(db, key, options, callback)
Expand All @@ -196,6 +213,8 @@ function batch (db, arr, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof callback !== 'function') {
throw new Error('batch() requires a callback argument')
}

options || (options = {})
Expand All @@ -207,11 +226,9 @@ function batch (db, arr, options, callback) {
var done
var on
var off
var _callback = callback

if (options.ttl > 0 && Array.isArray(arr)) {
done = after(3, _callback || function () {})
callback = done
done = after(2, write)

on = []
off = []
Expand All @@ -222,28 +239,32 @@ function batch (db, arr, options, callback) {
if (entry.type === 'del') off.push(entry.key)
})

if (on.length) {
ttlon(db, on, options.ttl, done)
} else {
done()
}

if (off.length) {
ttloff(db, off, done)
} else {
done()
}
// TODO: batch could contain a key twice. perhaps do on and off sequentially.
// TODO: better yet, perform both in one batch.
ttlon(db, on, options.ttl, done)
ttloff(db, off, done)
} else {
write()
}

db._ttl.batch.call(db, arr, options, callback)
function write (err) {
if (err) return callback(err)

db._ttl.batch.call(db, arr, options, callback)
}
}

function close (db, callback) {
if (typeof callback !== 'function') {
throw new Error('close() requires a callback argument')
}

stopTtl(db, function () {
// TODO: when/why is db._ttl not defined?
if (db._ttl && typeof db._ttl.close === 'function') {
return db._ttl.close.call(db, callback)
}
callback && callback()
process.nextTick(callback)
})
}

Expand All @@ -258,6 +279,8 @@ function setup (db, options) {
expiryNamespace: 'x',
separator: '!',
checkFrequency: 10000,
setInterval: global.setInterval,
clearInterval: global.clearInterval,
defaultTTL: 0
}, options)

Expand All @@ -284,7 +307,7 @@ function setup (db, options) {
// we must intercept close()
db.close = close.bind(null, db)

startTtl(db, options.checkFrequency)
startTtl(db, options.checkFrequency, options.setInterval)

return db
}
Expand Down
Loading