Skip to content

Plain Text Keys as S3 Filename and S3 files can be grouped on Multitenancy columns #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
node_modules
.DS_Store
package
LambdaFunction.zip
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# dynamodb-replicator

[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables.
[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) offers several different mechanisms to manage redundancy and recoverability on [DynamoDB](http://aws.amazon.com/documentation/dynamodb) tables. This version is designed for multitenanted tables, which can be split when storing incremental backups.

- A **replicator** function that processes events from a DynamoDB stream, replaying changes made to the primary table and onto a replica table. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot).
- An **incremental backup** function that processes events from a DynamoDB stream, replaying them as writes to individual objects on S3. The function is designed to be run as an [AWS Lambda function](http://aws.amazon.com/documentation/lambda/), optionally with deployment assistance from [streambot](https://github.com/mapbox/streambot).
Expand All @@ -16,6 +16,15 @@ Managing table redundancy and backups involves many moving parts. Please read [D

[dynamodb-replicator](https://github.com/mapbox/dynamodb-replicator) provides several CLI tools to help manage your DynamoDB table.

### Config and Packaging

For windows users: a powershell script is available to packing the lambda function with all the packages it needs.

```
> npm install
> .\package.ps1
```

### diff-record

Given two tables and an item's key, this script looks up the record in both tables and checks for consistency.
Expand Down
22 changes: 12 additions & 10 deletions backup.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ var Dyno = require('dyno');
var stream = require('stream');
var zlib = require('zlib');

module.exports = function(config, done) {
module.exports = function(config, context, done) {
function next(err) {
if (err) return done(err);
done(null, { size: size, count: count });
}

var primary = Dyno(config);
var s3 = new AWS.S3();

var log = config.log || console.log;
var scanOpts = config.hasOwnProperty('segment') && config.segments ?
{ Segment: config.segment, TotalSegments: config.segments } : undefined;
var scanOpts = config.hasOwnProperty('segment') && config.segments ? { Segment: config.segment, TotalSegments: config.segments } : undefined;

if (config.backup)
if (!config.backup.bucket || !config.backup.prefix || !config.backup.jobid)
Expand All @@ -33,16 +37,17 @@ module.exports = function(config, done) {

var data = primary.scanStream(scanOpts)
.on('error', next)
.pipe(stringify)
.pipe(stringify)
.on('error', next)
.pipe(zlib.createGzip());
.pipe(zlib.createGzip());

log('[segment %s] Starting backup job %s of %s', index, config.backup.jobid, config.region + '/' + config.table);

s3.upload({
Bucket: config.backup.bucket,
Key: key,
Body: data
Body: data,
ACL: 'bucket-owner-full-control'
}, function(err) {
if (err) return next(err);
log('[segment %s] Uploaded dynamo backup to s3://%s/%s', index, config.backup.bucket, key);
Expand All @@ -53,8 +58,5 @@ module.exports = function(config, done) {
size = progress.total;
});

function next(err) {
if (err) return done(err);
done(null, { size: size, count: count });
}

};
55 changes: 46 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module.exports.streambotReplicate = streambot(replicate);
module.exports.backup = incrementalBackup;
module.exports.streambotBackup = streambot(incrementalBackup);

function replicate(event, callback) {
function replicate(event, context, callback) {
var replicaConfig = {
table: process.env.ReplicaTable,
region: process.env.ReplicaRegion,
Expand Down Expand Up @@ -71,8 +71,10 @@ function replicate(event, callback) {

if (errs) {
var messages = errs
.filter(function(err) { return !!err; })
.map(function(err) { return err.message; })
.filter(function(err) {
return !!err; })
.map(function(err) {
return err.message; })
.join(' | ');
console.log('[error] %s', messages);
return callback(errs);
Expand All @@ -88,7 +90,7 @@ function replicate(event, callback) {
})(replica.batchWriteItemRequests(params), 0);
}

function incrementalBackup(event, callback) {
function incrementalBackup(event, context, callback) {
var allRecords = event.Records.reduce(function(allRecords, action) {
var id = JSON.stringify(action.dynamodb.Keys);

Expand All @@ -115,7 +117,7 @@ function incrementalBackup(event, callback) {
});

q.awaitAll(function(err) {
if (err) throw err;
if (err) callback(err);
callback();
});

Expand All @@ -124,19 +126,54 @@ function incrementalBackup(event, callback) {

changes.forEach(function(change) {
q.defer(function(next) {
var id = crypto.createHash('md5')
var id;
if (process.env.PlainTextKeyAsFilename) {
id = '';
Object.keys(change.dynamodb.Keys).sort().forEach(function(current) {
if (id !== '')
id += ' | ';

var key = change.dynamodb.Keys[current];
var value = key[Object.keys(key)[0]];
id += value;
});
} else {
id = crypto.createHash('md5')
.update(JSON.stringify(change.dynamodb.Keys))
.digest('hex');
}

var table = change.eventSourceARN.split('/')[1];

var key;
if (process.env.MultiTenancyColumn) {
var mtCol;
if (change.dynamodb.NewImage)
mtCol = change.dynamodb.NewImage[process.env.MultiTenancyColumn];
else
mtCol = change.dynamodb.OldImage[process.env.MultiTenancyColumn];

if (!mtCol) {
var message = '[error] MultiTenancyColumn %s does not exist.';
console.log(message, process.env.MultiTenancyColumn);
return next(message);
}
var mtColDt = Object.keys(mtCol)[0];
key = [process.env.BackupPrefix, table, mtCol[mtColDt], id].join('/');
} else {
key = [process.env.BackupPrefix, table, id].join('/');
}

var params = {
Bucket: process.env.BackupBucket,
Key: [process.env.BackupPrefix, table, id].join('/')
Key: key
};

var req = change.eventName === 'REMOVE' ? 'deleteObject' : 'putObject';
if (req === 'putObject') params.Body = JSON.stringify(change.dynamodb.NewImage);

if (req === 'putObject') {
params.Body = JSON.stringify(change.dynamodb.NewImage);
params.ACL = 'bucket-owner-full-control';
}

s3[req](params, function(err) {
if (err) console.log(
Expand Down
31 changes: 31 additions & 0 deletions package.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Write-Output "Packaging Lambda app"
if (Test-Path .\package) {
Remove-Item .\package -Recurse -Force
}

New-Item .\package -type directory -f | Out-Null
New-Item .\package\temp -type directory -f | Out-Null
Write-Output "Copying dependencies..."
Copy-Item .\backup.js .\package\temp\
Copy-Item .\diff.js .\package\temp\
Copy-Item .\s3-backfill.js .\package\temp\
Copy-Item .\s3-snapshot.js .\package\temp\
robocopy .\node_modules\ .\package\temp\ /E | Out-Null
Write-Output "Dependencies sorted"

Write-Output "Generating output..."
Copy-Item .\index.js .\package\temp\
Write-Output "Output generated"

Add-Type -assembly "system.io.compression.filesystem"
$currentPath = (Get-Item -Path ".\" -Verbose).FullName
$sourcePath = $currentPath + "\package\temp"
$outputFile = $currentPath + "\LambdaFunction.zip"

if (Test-Path $outputFile) {
Remove-Item $outputFile -Force
}

[io.compression.zipfile]::CreateFromDirectory($sourcePath, $outputFile)

Write-Output "λ function ready to be uploaded at: $($outputFile)"
3 changes: 2 additions & 1 deletion s3-snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ module.exports = function(config, done) {
var upload = s3.upload({
Bucket: config.destination.bucket,
Key: config.destination.key,
Body: gzip
Body: gzip,
ACL: 'bucket-owner-full-control'
}).on('httpUploadProgress', function(details) {
if (details.part !== partsLoaded) {
log(
Expand Down
20 changes: 11 additions & 9 deletions test/backup.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ var s3 = new AWS.S3();
var queue = require('queue-async');
var zlib = require('zlib');

var context = {};

var primaryItems = [
{hash: 'hash1', range: 'range1', other:1},
{hash: 'hash1', range: 'range2', other:2},
Expand All @@ -27,7 +29,7 @@ dynamodb.start();
dynamodb.test('backup: one segment', primaryItems, function(assert) {
var config = {
backup: {
bucket: 'mapbox',
bucket: 'pageup-mapbox',
prefix: 'dynamodb-replicator/test',
jobid: crypto.randomBytes(4).toString('hex')
},
Expand All @@ -38,15 +40,15 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) {
endpoint: 'http://localhost:4567'
};

backup(config, function(err, details) {
backup(config, context, function(err, details) {
assert.ifError(err, 'backup completed');
if (err) return assert.end();

assert.equal(details.count, 3, 'reported 3 records');
assert.equal(details.size, 98, 'reported 98 bytes');

s3.getObject({
Bucket: 'mapbox',
Bucket: 'pageup-mapbox',
Key: [config.backup.prefix, config.backup.jobid, '0'].join('/')
}, function(err, data) {
assert.ifError(err, 'retrieved backup from S3');
Expand All @@ -72,12 +74,12 @@ dynamodb.test('backup: one segment', primaryItems, function(assert) {
dynamodb.test('backup: parallel', records, function(assert) {
var config = {
backup: {
bucket: 'mapbox',
bucket: 'pageup-mapbox',
prefix: 'dynamodb-replicator/test',
jobid: crypto.randomBytes(4).toString('hex')
},
table: dynamodb.tableName,
region: 'us-east-1',
region: 'ap-southeast-2',
accessKeyId: 'fake',
secretAccessKey: 'fake',
endpoint: 'http://localhost:4567',
Expand All @@ -90,10 +92,10 @@ dynamodb.test('backup: parallel', records, function(assert) {
var secondKey = [config.backup.prefix, config.backup.jobid, secondConfig.segment].join('/');

queue(1)
.defer(backup, firstConfig)
.defer(backup, secondConfig)
.defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: firstKey })
.defer(s3.getObject.bind(s3), { Bucket: 'mapbox', Key: secondKey })
.defer(backup, firstConfig, context)
.defer(backup, secondConfig, context)
.defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: firstKey })
.defer(s3.getObject.bind(s3), { Bucket: 'pageup-mapbox', Key: secondKey })
.awaitAll(function(err, results) {
assert.ifError(err, 'all requests completed');
if (err) return assert.end();
Expand Down
Loading