From 6261bf139bbe00104170b8b549d7acdbc9149c31 Mon Sep 17 00:00:00 2001 From: Abhaya Chauhan Date: Mon, 18 Apr 2016 16:24:17 +1000 Subject: [PATCH 1/5] Added ability to leverage MultiTenancy Column in incremental backups Also updated callbacks to work with Node 4 --- README.md | 2 +- index.js | 28 +++++++++++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f48825d..36e89d0 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # dynamodb-replicator -[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables. +[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables. This version is designed for multitenanted tables, which can be split when storing incremental backups. - A **replicator** function that processes events from a DynamoDB stream, replaying changes made to the primary table and onto a replica table. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot). - An **incremental backup** function that processes events from a DynamoDB stream, replaying them as writes to individual objects on S3. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot). diff --git a/index.js b/index.js index 90bc126..de2dbc4 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,10 @@ module.exports.streambotReplicate = streambot(replicate); module.exports.backup = incrementalBackup; module.exports.streambotBackup = streambot(incrementalBackup); +process.env.BackupBucket = 'dynamo-incremental-backups' +process.env.BackupPrefix = 'forms.blue' +process.env.MultiTenancyColumn = 'InstanceId' + function replicate(event, callback) { var replicaConfig = { table: process.env.ReplicaTable, @@ -88,7 +92,7 @@ function replicate(event, callback) { })(replica.batchWriteItemRequests(params), 0); } -function incrementalBackup(event, callback) { +function incrementalBackup(event, context, callback) { var allRecords = event.Records.reduce(function(allRecords, action) { var id = JSON.stringify(action.dynamodb.Keys); @@ -115,7 +119,7 @@ function incrementalBackup(event, callback) { }); q.awaitAll(function(err) { - if (err) throw err; + if (err) callback(err); callback(); }); @@ -124,15 +128,33 @@ function incrementalBackup(event, callback) { changes.forEach(function(change) { q.defer(function(next) { + console.log('Processing: ' + JSON.stringify(change)); var id = crypto.createHash('md5') .update(JSON.stringify(change.dynamodb.Keys)) .digest('hex'); var table = change.eventSourceARN.split('/')[1]; + var key = [process.env.BackupPrefix, table, id].join('/'); + + if (process.env.MultiTenancyColumn) { + var mtCol; + if (change.dynamodb.NewImage) + mtCol = change.dynamodb.NewImage[process.env.MultiTenancyColumn]; + else + mtCol = change.dynamodb.OldImage[process.env.MultiTenancyColumn]; + + if (!mtCol) { + var message = '[error] MultiTenancyColumn %s does not exist.'; + console.log(message, process.env.MultiTenancyColumn); + return next(message); + } + var mtColDt = Object.keys(mtCol)[0]; + key = [process.env.BackupPrefix, table, mtCol[mtColDt], id].join('/'); + } var params = { Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, id].join('/') + Key: key }; var req = change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject'; From 49431877abd6de9244c5771a8536582680380114 Mon Sep 17 00:00:00 2001 From: Abhaya Chauhan Date: Tue, 19 Apr 2016 14:53:39 +1000 Subject: [PATCH 2/5] Added multi key filename --- index.js | 28 +- test/fixtures/events/adjust-many.json | 398 ++++++++++++------------ test/index.test.js | 422 ++++++++++++++++++++++++++ 3 files changed, 659 insertions(+), 189 deletions(-) diff --git a/index.js b/index.js index de2dbc4..89a2bca 100644 --- a/index.js +++ b/index.js @@ -128,7 +128,33 @@ function incrementalBackup(event, context, callback) { changes.forEach(function(change) { q.defer(function(next) { - console.log('Processing: ' + JSON.stringify(change)); + + var id; + if (process.env.PlainTextKeyAsFilename) { + id = ''; + Object.keys(change.dynamodb.Keys).sort().forEach(function(current) { + if (id !== '') + id += ' | '; + + var key = change.dynamodb.Keys[current]; + var value = key[Object.keys(key)[0]]; + id += value; + }); + } else { + id = crypto.createHash('md5') + var id; + if (process.env.PlainTextKeyAsFilename) { + id = ''; + Object.keys(change.dynamodb.Keys).sort().forEach(function(current) { + if (id !== '') + id += ' | '; + + var key = change.dynamodb.Keys[current]; + var value = key[Object.keys(key)[0]]; + id += value; + }); + } else { + id = crypto.createHash('md5') var id = crypto.createHash('md5') .update(JSON.stringify(change.dynamodb.Keys)) .digest('hex'); diff --git a/test/fixtures/events/adjust-many.json b/test/fixtures/events/adjust-many.json index 0ee01c0..17f25cf 100644 --- a/test/fixtures/events/adjust-many.json +++ b/test/fixtures/events/adjust-many.json @@ -1,210 +1,232 @@ { - "Records":[ - { - "eventName":"INSERT", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-1" - } - }, - "SizeBytes":26, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"111", - "Keys":{ - "id": { - "S": "record-1" - } + "Records": [{ + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" } }, - "eventID":"1", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } }, - { - "eventName":"MODIFY", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "11" - }, - "id": { - "S": "record-1" - } - }, - "SizeBytes":59, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"222", - "OldImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-1" - } - }, - "Keys":{ - "id": { - "S": "record-1" - } + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + }, + "blah": { + "S": "Another property" } }, - "eventID":"2", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } }, - { - "eventName":"INSERT", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-3" - } - }, - "SizeBytes":26, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"111", - "Keys":{ - "id": { - "S": "record-3" - } + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" } }, - "eventID":"1", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } }, - { - "eventName":"INSERT", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-2" - } - }, - "SizeBytes":26, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"111", - "Keys":{ - "id": { - "S": "record-2" - } + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" } }, - "eventID":"1", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } }, - { - "eventName":"MODIFY", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "22" - }, - "id": { - "S": "record-2" - } - }, - "SizeBytes":59, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"222", - "OldImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-2" - } - }, - "Keys":{ - "id": { - "S": "record-2" - } + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + }, + "blah": { + "S": "Another property here" } }, - "eventID":"2", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + } + }, + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } }, - { - "eventName":"REMOVE", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "SizeBytes":38, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"333", - "OldImage":{ - "range": { - "N": "11" - }, - "id": { - "S": "record-1" - } - }, - "Keys":{ - "id": { - "S": "record-1" - } + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "333", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" } }, - "eventID":"3", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } }, - { - "eventName":"MODIFY", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "dynamodb": { - "NewImage":{ - "range": { - "N": "33" - }, - "id": { - "S": "record-3" - } - }, - "SizeBytes":59, - "StreamViewType":"NEW_AND_OLD_IMAGES", - "SequenceNumber":"222", - "OldImage":{ - "range": { - "N": "1" - }, - "id": { - "S": "record-3" - } - }, - "Keys":{ - "id": { - "S": "record-3" - } + "eventID": "3", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + }, + "blah": { + "S": "Another property over there" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" } }, - "eventID":"2", - "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion":"us-east-1" - } - ] + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }] } diff --git a/test/index.test.js b/test/index.test.js index 90e316b..3e65b7a 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -282,4 +282,426 @@ test('[incremental backup] adjust many', function(assert) { }); }); +test('[incremental backup] adjust many with multitenancy column', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.MultiTenancyColumn = 'id' + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, this, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = crypto.createHash('md5') + .update(JSON.stringify(key)) + .digest('hex'); + + var mtCol = record.id.S; + var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = crypto.createHash('md5') + .update(JSON.stringify({ id: { S: 'record-1' } })) + .digest('hex'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + }); + }); +}); + +test('[incremental backup] adjust many with multitenancy column', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.MultiTenancyColumn = 'id' + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = crypto.createHash('md5') + .update(JSON.stringify(key)) + .digest('hex'); + + var mtCol = record.id.S; + var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); + console.log('Checking: ' + s3Key); + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = crypto.createHash('md5') + .update(JSON.stringify({ id: { S: 'record-1' } })) + .digest('hex'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.MultiTenancyColumn; + }); + }); +}); + +test('[incremental backup] adjust many with keys used as filename', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.PlainTextKeyAsFilename = true; + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = record.id.S; + + var s3Key = [process.env.BackupPrefix, table, id].join('/'); + + console.log('Checking s3Key: ' + s3Key + ' | ' + process.env.BackupBucket); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = 'record-1'; + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.PlainTextKeyAsFilename; + }); + }); + +}); + +test('[incremental backup] adjust many with multitenancy column', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.MultiTenancyColumn = 'id' + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, this, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = crypto.createHash('md5') + .update(JSON.stringify(key)) + .digest('hex'); + + var mtCol = record.id.S; + var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = crypto.createHash('md5') + .update(JSON.stringify({ id: { S: 'record-1' } })) + .digest('hex'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + }); + }); +}); + +test('[incremental backup] adjust many with multitenancy column', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.MultiTenancyColumn = 'id' + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = crypto.createHash('md5') + .update(JSON.stringify(key)) + .digest('hex'); + + var mtCol = record.id.S; + var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = crypto.createHash('md5') + .update(JSON.stringify({ id: { S: 'record-1' } })) + .digest('hex'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.MultiTenancyColumn; + }); + }); +}); + +test('[incremental backup] adjust many with hash key used as filename', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.PlainTextKeyAsFilename = true; + + var event = require(path.join(events, 'adjust-many.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '22' }, id: { S: 'record-2' } }, + { range: { N: '33' }, id: { S: 'record-3' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = record.id.S; + + var s3Key = [process.env.BackupPrefix, table, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = 'record-1'; + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.PlainTextKeyAsFilename; + }); + }); + +}); + +test('[incremental backup] adjust many with hash and range keys used as filename', function(assert) { + process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); + process.env.PlainTextKeyAsFilename = true; + + var event = require(path.join(events, 'adjust-many-multi-key.json')); + var table = event.Records[0].eventSourceARN.split('/')[1]; + + var expected = [ + { range: { N: '2' }, id: { S: 'record-2' }, 'blah': { 'S': 'Another property here' } }, + { range: { N: '3' }, id: { S: 'record-3' }, 'blah': { 'S': 'Another property over there' } } + ]; + + backup(event, context, function(err) { + assert.ifError(err, 'success'); + var q = queue(); + + expected.forEach(function(record) { + q.defer(function(next) { + var key = { id: record.id }; + var id = record.id.S + ' | ' + record.range.N; + + var s3Key = [process.env.BackupPrefix, table, id].join('/'); + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: s3Key + }, function(err, data) { + assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); + if (!data) return next(); + assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); + + var found = JSON.parse(data.Body.toString()); + assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); + next(); + }); + }); + }); + + q.defer(function(next) { + var id = 'record-1 | 1'; + + s3.getObject({ + Bucket: process.env.BackupBucket, + Key: [process.env.BackupPrefix, table, id].join('/') + }, function(err) { + assert.equal(err.code, 'NoSuchKey', 'object was deleted'); + next(); + }); + }); + + q.awaitAll(function() { + assert.end(); + delete process.env.PlainTextKeyAsFilename; + }); + }); + +}); + replica.close(); From b3af446c2369a81406a6bb8ad9339cee7eaee9d1 Mon Sep 17 00:00:00 2001 From: Abhaya Chauhan Date: Tue, 19 Apr 2016 15:18:21 +1000 Subject: [PATCH 3/5] Updated to Node 4 API in Lambda Also Added tests for Plain Key as filename in S3 --- backup.js | 19 +- index.js | 42 +- test/backup.test.js | 20 +- .../events/adjust-many-multi-key.json | 232 ++++++++++ test/fixtures/events/adjust-many.json | 398 +++++++++--------- test/incremental.test.js | 2 +- test/index.test.js | 274 +----------- test/live-test.backup-table.js | 2 +- 8 files changed, 476 insertions(+), 513 deletions(-) create mode 100644 test/fixtures/events/adjust-many-multi-key.json diff --git a/backup.js b/backup.js index 79e2e9a..bcbf0bd 100644 --- a/backup.js +++ b/backup.js @@ -3,13 +3,17 @@ var Dyno = require('dyno'); var stream = require('stream'); var zlib = require('zlib'); -module.exports = function(config, done) { +module.exports = function(config, context, done) { + function next(err) { + if (err) return done(err); + done(null, { size: size, count: count }); + } + var primary = Dyno(config); var s3 = new AWS.S3(); var log = config.log || console.log; - var scanOpts = config.hasOwnProperty('segment') && config.segments ? - { Segment: config.segment, TotalSegments: config.segments } : undefined; + var scanOpts = config.hasOwnProperty('segment') && config.segments ? { Segment: config.segment, TotalSegments: config.segments } : undefined; if (config.backup) if (!config.backup.bucket || !config.backup.prefix || !config.backup.jobid) @@ -33,9 +37,9 @@ module.exports = function(config, done) { var data = primary.scanStream(scanOpts) .on('error', next) - .pipe(stringify) + .pipe(stringify) .on('error', next) - .pipe(zlib.createGzip()); + .pipe(zlib.createGzip()); log('[segment %s] Starting backup job %s of %s', index, config.backup.jobid, config.region + '/' + config.table); @@ -53,8 +57,5 @@ module.exports = function(config, done) { size = progress.total; }); - function next(err) { - if (err) return done(err); - done(null, { size: size, count: count }); - } + }; diff --git a/index.js b/index.js index 89a2bca..a6d1c32 100644 --- a/index.js +++ b/index.js @@ -9,11 +9,12 @@ module.exports.streambotReplicate = streambot(replicate); module.exports.backup = incrementalBackup; module.exports.streambotBackup = streambot(incrementalBackup); -process.env.BackupBucket = 'dynamo-incremental-backups' -process.env.BackupPrefix = 'forms.blue' -process.env.MultiTenancyColumn = 'InstanceId' +// process.env.BackupBucket = 'dynamo-incremental-backups' +// process.env.BackupPrefix = 'forms.blue' +// process.env.MultiTenancyColumn = 'InstanceId' +//process.env.PlainTextKeyAsFilename = true; -function replicate(event, callback) { +function replicate(event, context, callback) { var replicaConfig = { table: process.env.ReplicaTable, region: process.env.ReplicaRegion, @@ -75,8 +76,10 @@ function replicate(event, callback) { if (errs) { var messages = errs - .filter(function(err) { return !!err; }) - .map(function(err) { return err.message; }) + .filter(function(err) { + return !!err; }) + .map(function(err) { + return err.message; }) .join(' | '); console.log('[error] %s', messages); return callback(errs); @@ -128,20 +131,6 @@ function incrementalBackup(event, context, callback) { changes.forEach(function(change) { q.defer(function(next) { - - var id; - if (process.env.PlainTextKeyAsFilename) { - id = ''; - Object.keys(change.dynamodb.Keys).sort().forEach(function(current) { - if (id !== '') - id += ' | '; - - var key = change.dynamodb.Keys[current]; - var value = key[Object.keys(key)[0]]; - id += value; - }); - } else { - id = crypto.createHash('md5') var id; if (process.env.PlainTextKeyAsFilename) { id = ''; @@ -155,29 +144,30 @@ function incrementalBackup(event, context, callback) { }); } else { id = crypto.createHash('md5') - var id = crypto.createHash('md5') .update(JSON.stringify(change.dynamodb.Keys)) .digest('hex'); + } var table = change.eventSourceARN.split('/')[1]; - var key = [process.env.BackupPrefix, table, id].join('/'); - + var key; if (process.env.MultiTenancyColumn) { var mtCol; if (change.dynamodb.NewImage) mtCol = change.dynamodb.NewImage[process.env.MultiTenancyColumn]; else - mtCol = change.dynamodb.OldImage[process.env.MultiTenancyColumn]; + mtCol = change.dynamodb.OldImage[process.env.MultiTenancyColumn]; - if (!mtCol) { + if (!mtCol) { var message = '[error] MultiTenancyColumn %s does not exist.'; console.log(message, process.env.MultiTenancyColumn); return next(message); } var mtColDt = Object.keys(mtCol)[0]; key = [process.env.BackupPrefix, table, mtCol[mtColDt], id].join('/'); + } else { + key = [process.env.BackupPrefix, table, id].join('/'); } - + var params = { Bucket: process.env.BackupBucket, Key: key diff --git a/test/backup.test.js b/test/backup.test.js index 99e4ca3..50f1679 100644 --- a/test/backup.test.js +++ b/test/backup.test.js @@ -8,6 +8,8 @@ var s3 = new AWS.S3(); var queue = require('queue-async'); var zlib = require('zlib'); +var context = {}; + var primaryItems = [ {hash: 'hash1', range: 'range1', other:1}, {hash: 'hash1', range: 'range2', other:2}, @@ -27,7 +29,7 @@ dynamodb.start(); dynamodb.test('backup: one segment', primaryItems, function(assert) { var config = { backup: { - bucket: 'mapbox', + bucket: 'pageup-mapbox', prefix: 'dynamodb-replicator/test', jobid: crypto.randomBytes(4).toString('hex') }, @@ -38,7 +40,7 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { endpoint: 'http://localhost:4567' }; - backup(config, function(err, details) { + backup(config, context, function(err, details) { assert.ifError(err, 'backup completed'); if (err) return assert.end(); @@ -46,7 +48,7 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { assert.equal(details.size, 98, 'reported 98 bytes'); s3.getObject({ - Bucket: 'mapbox', + Bucket: 'pageup-mapbox', Key: [config.backup.prefix, config.backup.jobid, '0'].join('/') }, function(err, data) { assert.ifError(err, 'retrieved backup from S3'); @@ -72,12 +74,12 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) { dynamodb.test('backup: parallel', records, function(assert) { var config = { backup: { - bucket: 'mapbox', + bucket: 'pageup-mapbox', prefix: 'dynamodb-replicator/test', jobid: crypto.randomBytes(4).toString('hex') }, table: dynamodb.tableName, - region: 'us-east-1', + region: 'ap-southeast-2', accessKeyId: 'fake', secretAccessKey: 'fake', endpoint: 'http://localhost:4567', @@ -90,10 +92,10 @@ dynamodb.test('backup: parallel', records, function(assert) { var secondKey = [config.backup.prefix, config.backup.jobid, secondConfig.segment].join('/'); queue(1) - .defer(backup, firstConfig) - .defer(backup, secondConfig) - .defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: firstKey }) - .defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: secondKey }) + .defer(backup, firstConfig, context) + .defer(backup, secondConfig, context) + .defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: firstKey }) + .defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: secondKey }) .awaitAll(function(err, results) { assert.ifError(err, 'all requests completed'); if (err) return assert.end(); diff --git a/test/fixtures/events/adjust-many-multi-key.json b/test/fixtures/events/adjust-many-multi-key.json new file mode 100644 index 0000000..17f25cf --- /dev/null +++ b/test/fixtures/events/adjust-many-multi-key.json @@ -0,0 +1,232 @@ +{ + "Records": [{ + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + }, + "blah": { + "S": "Another property" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + } + }, + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } + }, + "eventID": "1", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + }, + "blah": { + "S": "Another property here" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "2" + }, + "id": { + "S": "record-2" + } + }, + "Keys": { + "id": { + "S": "record-2" + }, + "range": { + "N": "2" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "333", + "OldImage": { + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys": { + "id": { + "S": "record-1" + }, + "range": { + "N": "1" + } + } + }, + "eventID": "3", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }, { + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "dynamodb": { + "NewImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + }, + "blah": { + "S": "Another property over there" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "222", + "OldImage": { + "range": { + "N": "3" + }, + "id": { + "S": "record-3" + } + }, + "Keys": { + "id": { + "S": "record-3" + }, + "range": { + "N": "3" + } + } + }, + "eventID": "2", + "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion": "us-east-1" + }] +} diff --git a/test/fixtures/events/adjust-many.json b/test/fixtures/events/adjust-many.json index 17f25cf..0ee01c0 100644 --- a/test/fixtures/events/adjust-many.json +++ b/test/fixtures/events/adjust-many.json @@ -1,232 +1,210 @@ { - "Records": [{ - "eventName": "INSERT", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "1" - }, - "id": { - "S": "record-1" + "Records":[ + { + "eventName":"INSERT", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "SizeBytes":26, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"111", + "Keys":{ + "id": { + "S": "record-1" + } } }, - "SizeBytes": 26, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "111", - "Keys": { - "id": { - "S": "record-1" - }, - "range": { - "N": "1" - } - } + "eventID":"1", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "1", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "MODIFY", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "1" - }, - "id": { - "S": "record-1" - }, - "blah": { - "S": "Another property" + { + "eventName":"MODIFY", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "11" + }, + "id": { + "S": "record-1" + } + }, + "SizeBytes":59, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"222", + "OldImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-1" + } + }, + "Keys":{ + "id": { + "S": "record-1" + } } }, - "SizeBytes": 59, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "222", - "OldImage": { - "range": { - "N": "1" - }, - "id": { - "S": "record-1" - } - }, - "Keys": { - "id": { - "S": "record-1" - }, - "range": { - "N": "1" - } - } + "eventID":"2", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "2", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "INSERT", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "3" - }, - "id": { - "S": "record-3" + { + "eventName":"INSERT", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-3" + } + }, + "SizeBytes":26, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"111", + "Keys":{ + "id": { + "S": "record-3" + } } }, - "SizeBytes": 26, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "111", - "Keys": { - "id": { - "S": "record-3" - }, - "range": { - "N": "3" - } - } + "eventID":"1", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "1", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "INSERT", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "2" - }, - "id": { - "S": "record-2" + { + "eventName":"INSERT", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-2" + } + }, + "SizeBytes":26, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"111", + "Keys":{ + "id": { + "S": "record-2" + } } }, - "SizeBytes": 26, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "111", - "Keys": { - "id": { - "S": "record-2" - }, - "range": { - "N": "2" - } - } + "eventID":"1", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "1", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "MODIFY", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "2" - }, - "id": { - "S": "record-2" - }, - "blah": { - "S": "Another property here" + { + "eventName":"MODIFY", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "22" + }, + "id": { + "S": "record-2" + } + }, + "SizeBytes":59, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"222", + "OldImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-2" + } + }, + "Keys":{ + "id": { + "S": "record-2" + } } }, - "SizeBytes": 59, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "222", - "OldImage": { - "range": { - "N": "2" - }, - "id": { - "S": "record-2" - } - }, - "Keys": { - "id": { - "S": "record-2" - }, - "range": { - "N": "2" - } - } + "eventID":"2", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "2", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "REMOVE", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "SizeBytes": 38, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "333", - "OldImage": { - "range": { - "N": "1" - }, - "id": { - "S": "record-1" + { + "eventName":"REMOVE", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "SizeBytes":38, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"333", + "OldImage":{ + "range": { + "N": "11" + }, + "id": { + "S": "record-1" + } + }, + "Keys":{ + "id": { + "S": "record-1" + } } }, - "Keys": { - "id": { - "S": "record-1" - }, - "range": { - "N": "1" - } - } + "eventID":"3", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" }, - "eventID": "3", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }, { - "eventName": "MODIFY", - "eventVersion": "1.0", - "eventSource": "aws:dynamodb", - "dynamodb": { - "NewImage": { - "range": { - "N": "3" - }, - "id": { - "S": "record-3" - }, - "blah": { - "S": "Another property over there" - } - }, - "SizeBytes": 59, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "222", - "OldImage": { - "range": { - "N": "3" - }, - "id": { - "S": "record-3" + { + "eventName":"MODIFY", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "dynamodb": { + "NewImage":{ + "range": { + "N": "33" + }, + "id": { + "S": "record-3" + } + }, + "SizeBytes":59, + "StreamViewType":"NEW_AND_OLD_IMAGES", + "SequenceNumber":"222", + "OldImage":{ + "range": { + "N": "1" + }, + "id": { + "S": "record-3" + } + }, + "Keys":{ + "id": { + "S": "record-3" + } } }, - "Keys": { - "id": { - "S": "record-3" - }, - "range": { - "N": "3" - } - } - }, - "eventID": "2", - "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/fake", - "awsRegion": "us-east-1" - }] + "eventID":"2", + "eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/fake", + "awsRegion":"us-east-1" + } + ] } diff --git a/test/incremental.test.js b/test/incremental.test.js index 894ba09..053d39f 100644 --- a/test/incremental.test.js +++ b/test/incremental.test.js @@ -14,7 +14,7 @@ var fs = require('fs'); var backfill = require('../s3-backfill'); var snapshot = require('../s3-snapshot'); -var bucket = 'mapbox'; +var bucket = 'pageup-mapbox'; var prefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); var records = require('./fixtures/records')(50); diff --git a/test/index.test.js b/test/index.test.js index 3e65b7a..3b94a96 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -25,12 +25,16 @@ var dyno = Dyno({ endpoint: 'http://localhost:4567' }); +var context = {}; + process.env.ReplicaTable = replica.tableName; process.env.ReplicaRegion = 'mock'; process.env.ReplicaEndpoint = 'http://localhost:4567'; -process.env.AWS_ACCESS_KEY_ID = 'mock'; -process.env.AWS_SECRET_ACCESS_KEY = 'mock'; -process.env.BackupBucket = 'mapbox'; +process.env.AWS_ACCESS_KEY_ID = 'fake'; +process.env.AWS_SECRET_ACCESS_KEY = 'fake'; +process.env.AWS_SESSION_TOKEN = 'fake'; +process.env.AWS_REGION = 'ap-southeast-2' +process.env.BackupBucket = 'pageup-mapbox'; test('[agent] use http agent for replication tests', function(assert) { streambot.agent = require('http').globalAgent; @@ -39,7 +43,7 @@ test('[agent] use http agent for replication tests', function(assert) { replica.test('[replicate] insert', function(assert) { var event = require(path.join(events, 'insert.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -51,7 +55,7 @@ replica.test('[replicate] insert', function(assert) { replica.test('[replicate] insert & modify', function(assert) { var event = require(path.join(events, 'insert-modify.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -63,7 +67,7 @@ replica.test('[replicate] insert & modify', function(assert) { replica.test('[replicate] insert, modify & delete', function(assert) { var event = require(path.join(events, 'insert-modify-delete.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -75,7 +79,7 @@ replica.test('[replicate] insert, modify & delete', function(assert) { replica.test('[replicate] adjust many', function(assert) { var event = require(path.join(events, 'adjust-many.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -101,7 +105,7 @@ replica.test('[replicate] adjust many', function(assert) { replica.test('[lambda] insert with buffers', function(assert) { var event = require(path.join(events, 'insert-buffer.json')); - replicate(event, function(err) { + replicate(event, context, function(err) { assert.ifError(err, 'success'); dyno.scan(function(err, data) { if (err) throw err; @@ -142,7 +146,7 @@ test('[incremental backup] configurable region', function(assert) { assert.equal(config.region, 'fake', 'configured region on S3 client'); }; - backup({ Records: [] }, function(err) { + backup({ Records: [] }, context, function(err) { assert.ifError(err, 'backup success'); AWS.S3 = S3; delete process.env.BackupRegion; @@ -158,7 +162,7 @@ test('[incremental backup] insert', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -185,7 +189,7 @@ test('[incremental backup] insert & modify', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -212,7 +216,7 @@ test('[incremental backup] insert, modify & delete', function(assert) { .update(JSON.stringify(event.Records[0].dynamodb.Keys)) .digest('hex'); - backup(event, function(err) { + backup(event, context, function(err) { assert.ifError(err, 'success'); s3.getObject({ @@ -236,125 +240,6 @@ test('[incremental backup] adjust many', function(assert) { { range: { N: '33' }, id: { S: 'record-3' } } ]; - backup(event, function(err) { - assert.ifError(err, 'success'); - var q = queue(); - - expected.forEach(function(record) { - q.defer(function(next) { - var key = { id: record.id }; - var id = crypto.createHash('md5') - .update(JSON.stringify(key)) - .digest('hex'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, id].join('/') - }, function(err, data) { - assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); - if (!data) return next(); - assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); - - var found = JSON.parse(data.Body.toString()); - assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); - next(); - }); - }); - }); - - q.defer(function(next) { - var id = crypto.createHash('md5') - .update(JSON.stringify({ id: { S: 'record-1' } })) - .digest('hex'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, id].join('/') - }, function(err) { - assert.equal(err.code, 'NoSuchKey', 'object was deleted'); - next(); - }); - }); - - q.awaitAll(function() { - assert.end(); - }); - }); -}); - -test('[incremental backup] adjust many with multitenancy column', function(assert) { - process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); - process.env.MultiTenancyColumn = 'id' - - var event = require(path.join(events, 'adjust-many.json')); - var table = event.Records[0].eventSourceARN.split('/')[1]; - - var expected = [ - { range: { N: '22' }, id: { S: 'record-2' } }, - { range: { N: '33' }, id: { S: 'record-3' } } - ]; - - backup(event, this, function(err) { - assert.ifError(err, 'success'); - var q = queue(); - - expected.forEach(function(record) { - q.defer(function(next) { - var key = { id: record.id }; - var id = crypto.createHash('md5') - .update(JSON.stringify(key)) - .digest('hex'); - - var mtCol = record.id.S; - var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: s3Key - }, function(err, data) { - assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); - if (!data) return next(); - assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); - - var found = JSON.parse(data.Body.toString()); - assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); - next(); - }); - }); - }); - - q.defer(function(next) { - var id = crypto.createHash('md5') - .update(JSON.stringify({ id: { S: 'record-1' } })) - .digest('hex'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') - }, function(err) { - assert.equal(err.code, 'NoSuchKey', 'object was deleted'); - next(); - }); - }); - - q.awaitAll(function() { - assert.end(); - }); - }); -}); - -test('[incremental backup] adjust many with multitenancy column', function(assert) { - process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); - process.env.MultiTenancyColumn = 'id' - - var event = require(path.join(events, 'adjust-many.json')); - var table = event.Records[0].eventSourceARN.split('/')[1]; - - var expected = [ - { range: { N: '22' }, id: { S: 'record-2' } }, - { range: { N: '33' }, id: { S: 'record-3' } } - ]; - backup(event, context, function(err) { assert.ifError(err, 'success'); var q = queue(); @@ -366,12 +251,9 @@ test('[incremental backup] adjust many with multitenancy column', function(asser .update(JSON.stringify(key)) .digest('hex'); - var mtCol = record.id.S; - var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); - console.log('Checking: ' + s3Key); s3.getObject({ Bucket: process.env.BackupBucket, - Key: s3Key + Key: [process.env.BackupPrefix, table, id].join('/') }, function(err, data) { assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); if (!data) return next(); @@ -389,65 +271,6 @@ test('[incremental backup] adjust many with multitenancy column', function(asser .update(JSON.stringify({ id: { S: 'record-1' } })) .digest('hex'); - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') - }, function(err) { - assert.equal(err.code, 'NoSuchKey', 'object was deleted'); - next(); - }); - }); - - q.awaitAll(function() { - assert.end(); - delete process.env.MultiTenancyColumn; - }); - }); -}); - -test('[incremental backup] adjust many with keys used as filename', function(assert) { - process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); - process.env.PlainTextKeyAsFilename = true; - - var event = require(path.join(events, 'adjust-many.json')); - var table = event.Records[0].eventSourceARN.split('/')[1]; - - var expected = [ - { range: { N: '22' }, id: { S: 'record-2' } }, - { range: { N: '33' }, id: { S: 'record-3' } } - ]; - - backup(event, context, function(err) { - assert.ifError(err, 'success'); - var q = queue(); - - expected.forEach(function(record) { - q.defer(function(next) { - var key = { id: record.id }; - var id = record.id.S; - - var s3Key = [process.env.BackupPrefix, table, id].join('/'); - - console.log('Checking s3Key: ' + s3Key + ' | ' + process.env.BackupBucket); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: s3Key - }, function(err, data) { - assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); - if (!data) return next(); - assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); - - var found = JSON.parse(data.Body.toString()); - assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); - next(); - }); - }); - }); - - q.defer(function(next) { - var id = 'record-1'; - s3.getObject({ Bucket: process.env.BackupBucket, Key: [process.env.BackupPrefix, table, id].join('/') @@ -457,69 +280,6 @@ test('[incremental backup] adjust many with keys used as filename', function(ass }); }); - q.awaitAll(function() { - assert.end(); - delete process.env.PlainTextKeyAsFilename; - }); - }); - -}); - -test('[incremental backup] adjust many with multitenancy column', function(assert) { - process.env.BackupPrefix = 'dynamodb-replicator/test/' + crypto.randomBytes(4).toString('hex'); - process.env.MultiTenancyColumn = 'id' - - var event = require(path.join(events, 'adjust-many.json')); - var table = event.Records[0].eventSourceARN.split('/')[1]; - - var expected = [ - { range: { N: '22' }, id: { S: 'record-2' } }, - { range: { N: '33' }, id: { S: 'record-3' } } - ]; - - backup(event, this, function(err) { - assert.ifError(err, 'success'); - var q = queue(); - - expected.forEach(function(record) { - q.defer(function(next) { - var key = { id: record.id }; - var id = crypto.createHash('md5') - .update(JSON.stringify(key)) - .digest('hex'); - - var mtCol = record.id.S; - var s3Key = [process.env.BackupPrefix, table, mtCol, id].join('/'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: s3Key - }, function(err, data) { - assert.ifError(err, 'no S3 error for ' + JSON.stringify(key)); - if (!data) return next(); - assert.ok(data.Body, 'got S3 object for ' + JSON.stringify(key)); - - var found = JSON.parse(data.Body.toString()); - assert.deepEqual(found, record, 'expected item modified on S3 for ' + JSON.stringify(key)); - next(); - }); - }); - }); - - q.defer(function(next) { - var id = crypto.createHash('md5') - .update(JSON.stringify({ id: { S: 'record-1' } })) - .digest('hex'); - - s3.getObject({ - Bucket: process.env.BackupBucket, - Key: [process.env.BackupPrefix, table, 'record-1', id].join('/') - }, function(err) { - assert.equal(err.code, 'NoSuchKey', 'object was deleted'); - next(); - }); - }); - q.awaitAll(function() { assert.end(); }); diff --git a/test/live-test.backup-table.js b/test/live-test.backup-table.js index efa525d..1b94a8c 100644 --- a/test/live-test.backup-table.js +++ b/test/live-test.backup-table.js @@ -40,7 +40,7 @@ dynamodb.test('backup-table shell script', primaryItems, function(assert) { queue() .defer(function(next) { s3.getObject({ - Bucket: 'mapbox', + Bucket: 'pageup-mapbox', Key: 'dynamodb-replicator/test/' + jobid + '/0' }, function(err, data) { assert.ifError(err, 'S3 getObject success'); From 156f81f971d3f1e53cdc4f37db902d8a13467e14 Mon Sep 17 00:00:00 2001 From: joshuat Date: Tue, 21 Jun 2016 14:14:12 +1000 Subject: [PATCH 4/5] Rewrite to remove sensitive files. Adding script for baking. Adding readme. Making changes for x-account permissions Allowing the app to be packaged with custom env config Renaming config files to be more forms specific Naming packages does nothing for lambda Updating readme to reflect addition config + scripts improvements to powershell script Add a canned ACL for the S3 upload to that we don't have permission issues cross-account. Remove baked config. Add ACL permission. Alter package script --- .gitignore | 2 ++ README.md | 8 ++++++++ backup.js | 3 ++- index.js | 11 +++++------ package.ps1 | 31 +++++++++++++++++++++++++++++++ s3-snapshot.js | 3 ++- 6 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 package.ps1 diff --git a/.gitignore b/.gitignore index fd4f2b0..6f1ff99 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ node_modules .DS_Store +package +LambdaFunction.zip \ No newline at end of file diff --git a/README.md b/README.md index 36e89d0..75d6ee9 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,14 @@ Managing table redundancy and backups involves many moving parts. Please read [D [dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) provides several CLI tools to help manage your DynamoDB table. +### Config and Packaging + +For windows users: a powershell script is available to packing the lambda function with all the packages it needs. + +``` +> npm install +> .\package.ps1 + ### diff-record Given two tables and an item's key, this script looks up the record in both tables and checks for consistency. diff --git a/backup.js b/backup.js index bcbf0bd..f73cff3 100644 --- a/backup.js +++ b/backup.js @@ -46,7 +46,8 @@ module.exports = function(config, context, done) { s3.upload({ Bucket: config.backup.bucket, Key: key, - Body: data + Body: data, + ACL: 'bucket-owner-full-control' }, function(err) { if (err) return next(err); log('[segment %s] Uploaded dynamo backup to s3://%s/%s', index, config.backup.bucket, key); diff --git a/index.js b/index.js index a6d1c32..5230dc9 100644 --- a/index.js +++ b/index.js @@ -9,11 +9,6 @@ module.exports.streambotReplicate = streambot(replicate); module.exports.backup = incrementalBackup; module.exports.streambotBackup = streambot(incrementalBackup); -// process.env.BackupBucket = 'dynamo-incremental-backups' -// process.env.BackupPrefix = 'forms.blue' -// process.env.MultiTenancyColumn = 'InstanceId' -//process.env.PlainTextKeyAsFilename = true; - function replicate(event, context, callback) { var replicaConfig = { table: process.env.ReplicaTable, @@ -174,7 +169,11 @@ function incrementalBackup(event, context, callback) { }; var req = change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject'; - if (req === 'putObject') params.Body = JSON.stringify(change.dynamodb.NewImage); + + if (req === 'putObject') { + params.Body = JSON.stringify(change.dynamodb.NewImage); + params.ACL = 'bucket-owner-full-control'; + } s3[req](params, function(err) { if (err) console.log( diff --git a/package.ps1 b/package.ps1 new file mode 100644 index 0000000..2be14de --- /dev/null +++ b/package.ps1 @@ -0,0 +1,31 @@ +Write-Output "Packaging Lambda app" +if (Test-Path .\package) { + Remove-Item .\package -Recurse -Force +} + +New-Item .\package -type directory -f | Out-Null +New-Item .\package\temp -type directory -f | Out-Null +Write-Output "Copying dependencies..." +Copy-Item .\backup.js .\package\temp\ +Copy-Item .\diff.js .\package\temp\ +Copy-Item .\s3-backfill.js .\package\temp\ +Copy-Item .\s3-snapshot.js .\package\temp\ +robocopy .\node_modules\ .\package\temp\ /E | Out-Null +Write-Output "Dependencies sorted" + +Write-Output "Generating output..." +Copy-Item .\index.js .\package\temp\ +Write-Output "Output generated" + +Add-Type -assembly "system.io.compression.filesystem" +$currentPath = (Get-Item -Path ".\" -Verbose).FullName +$sourcePath = $currentPath + "\package\temp" +$outputFile = $currentPath + "\LambdaFunction.zip" + +if (Test-Path $outputFile) { + Remove-Item $outputFile -Force +} + +[io.compression.zipfile]::CreateFromDirectory($sourcePath, $outputFile) + +Write-Output "λ function ready to be uploaded at: $($outputFile)" \ No newline at end of file diff --git a/s3-snapshot.js b/s3-snapshot.js index fe3b3d0..c3ddc0b 100644 --- a/s3-snapshot.js +++ b/s3-snapshot.js @@ -47,7 +47,8 @@ module.exports = function(config, done) { var upload = s3.upload({ Bucket: config.destination.bucket, Key: config.destination.key, - Body: gzip + Body: gzip, + ACL: 'bucket-owner-full-control' }).on('httpUploadProgress', function(details) { if (details.part !== partsLoaded) { log( From 17b8991ede7dfa0b14eb6fe1bd4622c06210bd53 Mon Sep 17 00:00:00 2001 From: dixia Date: Thu, 12 Jan 2017 13:43:02 +0100 Subject: [PATCH 5/5] correct Markdown formatting --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 75d6ee9..b66260d 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ For windows users: a powershell script is available to packing the lambda functi ``` > npm install > .\package.ps1 +``` ### diff-record