diff --git a/.gitignore b/.gitignore index fd4f2b0..6f1ff99 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ node_modules .DS_Store +package +LambdaFunction.zip \ No newline at end of file diff --git a/README.md b/README.md index f48825d..b66260d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # dynamodb-replicator -[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables. +[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables. This version is designed for multitenanted tables, which can be split when storing incremental backups. - A **replicator** function that processes events from a DynamoDB stream, replaying changes made to the primary table and onto a replica table. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot). - An **incremental backup** function that processes events from a DynamoDB stream, replaying them as writes to individual objects on S3. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot). @@ -16,6 +16,15 @@ Managing table redundancy and backups involves many moving parts. Please read [D [dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) provides several CLI tools to help manage your DynamoDB table. +### Config and Packaging + +For windows users: a powershell script is available to packing the lambda function with all the packages it needs. + +``` +> npm install +> .\package.ps1 +``` + ### diff-record Given two tables and an item's key, this script looks up the record in both tables and checks for consistency. diff --git a/backup.js b/backup.js index 79e2e9a..f73cff3 100644 --- a/backup.js +++ b/backup.js @@ -3,13 +3,17 @@ var Dyno = require('dyno'); var stream = require('stream'); var zlib = require('zlib'); -module.exports = function(config, done) { +module.exports = function(config, context, done) { + function next(err) { + if (err) return done(err); + done(null, { size: size, count: count }); + } + var primary = Dyno(config); var s3 = new AWS.S3(); var log = config.log || console.log; - var scanOpts = config.hasOwnProperty('segment') && config.segments ? - { Segment: config.segment, TotalSegments: config.segments } : undefined; + var scanOpts = config.hasOwnProperty('segment') && config.segments ? { Segment: config.segment, TotalSegments: config.segments } : undefined; if (config.backup) if (!config.backup.bucket || !config.backup.prefix || !config.backup.jobid) @@ -33,16 +37,17 @@ module.exports = function(config, done) { var data = primary.scanStream(scanOpts) .on('error', next) - .pipe(stringify) + .pipe(stringify) .on('error', next) - .pipe(zlib.createGzip()); + .pipe(zlib.createGzip()); log('[segment %s] Starting backup job %s of %s', index, config.backup.jobid, config.region + '/' + config.table); s3.upload({ Bucket: config.backup.bucket, Key: key, - Body: data + Body: data, + ACL: 'bucket-owner-full-control' }, function(err) { if (err) return next(err); log('[segment %s] Uploaded dynamo backup to s3://%s/%s', index, config.backup.bucket, key); @@ -53,8 +58,5 @@ module.exports = function(config, done) { size = progress.total; }); - function next(err) { - if (err) return done(err); - done(null, { size: size, count: count }); - } + }; diff --git a/index.js b/index.js index 90bc126..5230dc9 100644 --- a/index.js +++ b/index.js @@ -9,7 +9,7 @@ module.exports.streambotReplicate = streambot(replicate); module.exports.backup = incrementalBackup; module.exports.streambotBackup = streambot(incrementalBackup); -function replicate(event, callback) { +function replicate(event, context, callback) { var replicaConfig = { table: process.env.ReplicaTable, region: process.env.ReplicaRegion, @@ -71,8 +71,10 @@ function replicate(event, callback) { if (errs) { var messages = errs - .filter(function(err) { return !!err; }) - .map(function(err) { return err.message; }) + .filter(function(err) { + return !!err; }) + .map(function(err) { + return err.message; }) .join(' | '); console.log('[error] %s', messages); return callback(errs); @@ -88,7 +90,7 @@ function replicate(event, callback) { })(replica.batchWriteItemRequests(params), 0); } -function incrementalBackup(event, callback) { +function incrementalBackup(event, context, callback) { var allRecords = event.Records.reduce(function(allRecords, action) { var id = JSON.stringify(action.dynamodb.Keys); @@ -115,7 +117,7 @@ function incrementalBackup(event, callback) { }); q.awaitAll(function(err) { - if (err) throw err; + if (err) callback(err); callback(); }); @@ -124,19 +126,54 @@ function incrementalBackup(event, callback) { changes.forEach(function(change) { q.defer(function(next) { - var id = crypto.createHash('md5') + var id; + if (process.env.PlainTextKeyAsFilename) { + id = ''; + Object.keys(change.dynamodb.Keys).sort().forEach(function(current) { + if (id !== '') + id += ' | '; + + var key = change.dynamodb.Keys[current]; + var value = key[Object.keys(key)[0]]; + id += value; + }); + } else { + id = crypto.createHash('md5') .update(JSON.stringify(change.dynamodb.Keys)) .digest('hex'); + } var table = change.eventSourceARN.split('/')[1]; - + var key; + if (process.env.MultiTenancyColumn) { + var mtCol; + if (change.dynamodb.NewImage) + mtCol = change.dynamodb.NewImage[process.env.MultiTenancyColumn]; + else + mtCol = change.dynamodb.OldImage[process.env.MultiTenancyColumn]; + + if (!mtCol) { + var message = '[error] MultiTenancyColumn %s does not exist.'; + console.log(message, process.env.MultiTenancyColumn); + return next(message); + } + var mtColDt = Object.keys(mtCol)[0]; + key = [process.env.BackupPrefix, table, mtCol[mtColDt], id].join('/'); + } else { + key = [process.env.BackupPrefix, table, id].join('/'); + } + var params = { Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, id].join('/') + Key: key }; var req = change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject'; - if (req === 'putObject') params.Body = JSON.stringify(change.dynamodb.NewImage); + + if (req === 'putObject') { + params.Body = JSON.stringify(change.dynamodb.NewImage); + params.ACL = 'bucket-owner-full-control'; + } s3[req](params, function(err) { if (err) console.log( diff --git a/package.ps1 b/package.ps1 new file mode 100644 index 0000000..2be14de --- /dev/null +++ b/package.ps1 @@ -0,0 +1,31 @@ +Write-Output "Packaging Lambda app" +if (Test-Path .\package) { + Remove-Item .\package -Recurse -Force +} + +New-Item .\package -type directory -f | Out-Null +New-Item .\package\temp -type directory -f | Out-Null +Write-Output "Copying dependencies..." +Copy-Item .\backup.js .\package\temp\ +Copy-Item .\diff.js .\package\temp\ +Copy-Item .\s3-backfill.js .\package\temp\ +Copy-Item .\s3-snapshot.js .\package\temp\ +robocopy .\node_modules\ .\package\temp\ /E | Out-Null +Write-Output "Dependencies sorted" + +Write-Output "Generating output..." +Copy-Item .\index.js .\package\temp\ +Write-Output "Output generated" + +Add-Type -assembly "system.io.compression.filesystem" +$currentPath = (Get-Item -Path ".\" -Verbose).FullName +$sourcePath = $currentPath + "\package\temp" +$outputFile = $currentPath + "\LambdaFunction.zip" + +if (Test-Path $outputFile) { + Remove-Item $outputFile -Force +} + +[io.compression.zipfile]::CreateFromDirectory($sourcePath, $outputFile) + +Write-Output "λ function ready to be uploaded at: $($outputFile)" \ No newline at end of file diff --git a/s3-snapshot.js b/s3-snapshot.js index fe3b3d0..c3ddc0b 100644 --- a/s3-snapshot.js +++ b/s3-snapshot.js @@ -47,7 +47,8 @@ module.exports = function(config, done) { var upload = s3.upload({ Bucket: config.destination.bucket, Key: config.destination.key, - Body: gzip + Body: gzip, + ACL: 'bucket-owner-full-control' }).on('httpUploadProgress', function(details) { if (details.part !== partsLoaded) { log( diff --git a/test/backup.test.js b/test/backup.test.js index 99e4ca3..50f1679 100644 --- a/test/backup.test.js +++ b/test/backup.test.js @@ -8,6 +8,8 @@ var s3 = new AWS.S3(); var queue = require('queue-async'); var zlib = require('zlib'); +var context = {}; + var primaryItems = [ {hash: 'hash1', range: 'range1', other:1}, {hash: 'hash1', range: 'range2', other:2}, @@ -27,7 +29,7 @@ dynamodb.start(); dynamodb.test('backup: one segment', primaryItems, function(assert) { var config = { backup: { - bucket: 'mapbox', + bucket: 'pageup-mapbox', prefix: 'dynamodb-replicator/test', jobid: crypto.randomBytes(4).toString('hex') }, @@ -38,7 +40,7 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { endpoint: 'http://localhost:4567' }; - backup(config, function(err, details) { + backup(config, context, function(err, details) { assert.ifError(err, 'backup completed'); if (err) return assert.end(); @@ -46,7 +48,7 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { assert.equal(details.size, 98, 'reported 98 bytes'); s3.getObject({ - Bucket: 'mapbox', + Bucket: 'pageup-mapbox', Key: [config.backup.prefix, config.backup.jobid, '0'].join('/') }, function(err, data) { assert.ifError(err, 'retrieved backup from S3'); @@ -72,12 +74,12 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { dynamodb.test('backup: parallel', records, function(assert) { var config = { backup: { - bucket: 'mapbox', + bucket: 'pageup-mapbox', prefix: 'dynamodb-replicator/test', jobid: crypto.randomBytes(4).toString('hex') }, table: dynamodb.tableName, - region: 'us-east-1', + region: 'ap-southeast-2', accessKeyId: 'fake', secretAccessKey: 'fake', endpoint: 'http://localhost:4567', @@ -90,10 +92,10 @@ dynamodb.test('backup: parallel', records, function(assert) { var secondKey = [config.backup.prefix, config.backup.jobid, secondConfig.segment].join('/'); queue(1) - .defer(backup, firstConfig) - .defer(backup, secondConfig) - .defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: firstKey }) - .defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: secondKey }) + .defer(backup, firstConfig, context) + .defer(backup, secondConfig, context) + .defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: firstKey }) + .defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: secondKey }) .awaitAll(function(err, results) { assert.ifError(err, 'all requests completed'); if (err) return assert.end(); diff --git a/test/fixtures/events/adjust-many-multi-key.json b/test/fixtures/events/adjust-many-multi-key.json new file mode 100644 index 0000000..17f25cf --- /dev/null +++ b/test/fixtures/events/adjust-many-multi-key.json @@ -0,0 +1,232 @@ +{ + "Records": [{ + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + }, + "blah": { + "S": "Another property" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + }, + "blah": { + "S": "Another property here" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + } + }, + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "333", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "3", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + }, + "blah": { + "S": "Another property over there" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + } + }, + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }] +} diff --git a/test/incremental.test.js b/test/incremental.test.js index 894ba09..053d39f 100644 --- a/test/incremental.test.js +++ b/test/incremental.test.js @@ -14,7 +14,7 @@ var fs = require('fs'); var backfill = require('../s3-backfill'); var snapshot = require('../s3-snapshot'); -var bucket = 'mapbox'; +var bucket = 'pageup-mapbox'; var prefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); var records = require('./fixtures/records')(50); diff --git a/test/index.test.js b/test/index.test.js index 90e316b..3b94a96 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -25,12 +25,16 @@ var dyno = Dyno({ endpoint: 'http://localhost:4567' }); +var context = {}; + process.env.ReplicaTable = replica.tableName; process.env.ReplicaRegion = 'mock'; process.env.ReplicaEndpoint = 'http://localhost:4567'; -process.env.AWS_ACCESS_KEY_ID = 'mock'; -process.env.AWS_SECRET_ACCESS_KEY = 'mock'; -process.env.BackupBucket = 'mapbox'; +process.env.AWS_ACCESS_KEY_ID = 'fake'; +process.env.AWS_SECRET_ACCESS_KEY = 'fake'; +process.env.AWS_SESSION_TOKEN = 'fake'; +process.env.AWS_REGION = 'ap-southeast-2' +process.env.BackupBucket = 'pageup-mapbox'; test('[agent] use http agent for replication tests', function(assert) { streambot.agent = require('http').globalAgent; @@ -39,7 +43,7 @@ test('[agent] use http agent for replication tests', function(assert) { replica.test('[replicate] insert', function(assert) { var event = require(path.join(events, 'insert.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -51,7 +55,7 @@ replica.test('[replicate] insert', function(assert) { replica.test('[replicate] insert & modify', function(assert) { var event = require(path.join(events, 'insert-modify.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -63,7 +67,7 @@ replica.test('[replicate] insert & modify', function(assert) { replica.test('[replicate] insert, modify & delete', function(assert) { var event = require(path.join(events, 'insert-modify-delete.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -75,7 +79,7 @@ replica.test('[replicate] insert, modify & delete', function(assert) { replica.test('[replicate] adjust many', function(assert) { var event = require(path.join(events, 'adjust-many.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -101,7 +105,7 @@ replica.test('[replicate] adjust many', function(assert) { replica.test('[lambda] insert with buffers', function(assert) { var event = require(path.join(events, 'insert-buffer.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -142,7 +146,7 @@ test('[incremental backup] configurable region', function(assert) { assert.equal(config.region, 'fake', 'configured region on S3 client'); }; - backup({ Records: [] }, function(err) { + backup({ Records: [] }, context, function(err) { assert.ifError(err, 'backup success'); AWS.S3 = S3; delete process.env.BackupRegion; @@ -158,7 +162,7 @@ test('[incremental backup] insert', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -185,7 +189,7 @@ test('[incremental backup] insert & modify', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -212,7 +216,7 @@ test('[incremental backup] insert, modify & delete', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -236,7 +240,7 @@ test('[incremental backup] adjust many', function(assert) { { range: { N: '33' }, id: { S: 'record-3' } } ]; - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); var q = queue(); @@ -282,4 +286,182 @@ test('[incremental backup] adjust many', function(assert) { }); }); +test('[incremental backup] adjust many with multitenancy column', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.MultiTenancyColumn = 'id' + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = crypto.createHash('md5') + .update(JSON.stringify(key)) + .digest('hex'); + + var mtCol = record.id.S; + var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = crypto.createHash('md5') + .update(JSON.stringify({ id: { S: 'record-1' } })) + .digest('hex'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.MultiTenancyColumn; + }); + }); +}); + +test('[incremental backup] adjust many with hash key used as filename', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.PlainTextKeyAsFilename = true; + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = record.id.S; + + var s3Key = [process.env.BackupPrefix, table, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = 'record-1'; + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.PlainTextKeyAsFilename; + }); + }); + +}); + +test('[incremental backup] adjust many with hash and range keys used as filename', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.PlainTextKeyAsFilename = true; + + var event = require(path.join(events, 'adjust-many-multi-key.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '2' }, id: { S: 'record-2' }, 'blah': { 'S': 'Another property here' } }, + { range: { N: '3' }, id: { S: 'record-3' }, 'blah': { 'S': 'Another property over there' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = record.id.S + ' | ' + record.range.N; + + var s3Key = [process.env.BackupPrefix, table, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = 'record-1 | 1'; + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.PlainTextKeyAsFilename; + }); + }); + +}); + replica.close(); diff --git a/test/live-test.backup-table.js b/test/live-test.backup-table.js index efa525d..1b94a8c 100644 --- a/test/live-test.backup-table.js +++ b/test/live-test.backup-table.js @@ -40,7 +40,7 @@ dynamodb.test('backup-table shell script', primaryItems, function(assert) { queue() .defer(function(next) { s3.getObject({ - Bucket: 'mapbox', + Bucket: 'pageup-mapbox', Key: 'dynamodb-replicator/test/' + jobid + '/0' }, function(err, data) { assert.ifError(err, 'S3 getObject success');