diff --git a/apps/proxy/entrypoint.sh b/apps/proxy/entrypoint.sh index 81d657c0..3132f815 100755 --- a/apps/proxy/entrypoint.sh +++ b/apps/proxy/entrypoint.sh @@ -21,7 +21,7 @@ trap 'cleanup' EXIT mkdir -p $S3FS_MOUNT # Mount the S3 bucket -s3fs $AWS_S3_BUCKET $S3FS_MOUNT -o use_path_request_style -o url=$AWS_ENDPOINT_URL_S3 -o endpoint=$AWS_REGION +s3fs $AWS_S3_BUCKET $S3FS_MOUNT -o use_path_request_style -o url=$AWS_ENDPOINT_URL_S3 -o endpoint=$AWS_REGION -o use_cache=/tmp # Check if the mount was successful if mountpoint -q $S3FS_MOUNT; then diff --git a/apps/proxy/src/index.ts b/apps/proxy/src/index.ts index b8939c2f..2a72f1ec 100644 --- a/apps/proxy/src/index.ts +++ b/apps/proxy/src/index.ts @@ -1,11 +1,7 @@ import { PGlite, PGliteInterface } from '@electric-sql/pglite' import { vector } from '@electric-sql/pglite/vector' -import { mkdir, readFile, access, rm } from 'node:fs/promises' +import { mkdir, readFile } from 'node:fs/promises' import net from 'node:net' -import { createReadStream } from 'node:fs' -import { pipeline } from 'node:stream/promises' -import { createGunzip } from 'node:zlib' -import { extract } from 'tar' import { PostgresConnection, ScramSha256Data, TlsOptions } from 'pg-gateway' import { createClient } from '@supabase/supabase-js' import type { Database } from '@postgres-new/supabase' @@ -13,7 +9,6 @@ import { findUp } from 'find-up' const supabaseUrl = process.env.SUPABASE_URL ?? 'http://127.0.0.1:54321' const supabaseKey = process.env.SUPABASE_SERVICE_ROLE_KEY ?? '' -const dataMount = process.env.DATA_MOUNT ?? './data' const s3fsMount = process.env.S3FS_MOUNT ?? './s3' const wildcardDomain = process.env.WILDCARD_DOMAIN ?? 'db.example.com' const packageLockJsonPath = await findUp('package-lock.json') @@ -29,11 +24,9 @@ const packageLockJson = JSON.parse(await readFile(packageLockJsonPath, 'utf8')) } const pgliteVersion = `(PGlite ${packageLockJson.packages['node_modules/@electric-sql/pglite'].version})` -const dumpDir = `${s3fsMount}/dbs` +const dbDir = `${s3fsMount}/dbs` const tlsDir = `${s3fsMount}/tls` -const dbDir = `${dataMount}/dbs` -await mkdir(dumpDir, { recursive: true }) await mkdir(dbDir, { recursive: true }) await mkdir(tlsDir, { recursive: true }) @@ -63,15 +56,6 @@ function sendFatalError(connection: PostgresConnection, code: string, message: s return new Error(message) } -async function fileExists(path: string): Promise { - try { - await access(path) - return true - } catch { - return false - } -} - const supabase = createClient(supabaseUrl, supabaseKey) const server = net.createServer((socket) => { @@ -159,44 +143,11 @@ const server = net.createServer((socket) => { // at this point we know sniServerName is set const databaseId = getIdFromServerName(tlsInfo!.sniServerName!) - console.log(`Serving database '${databaseId}'`) + log(`Serving database '${databaseId}'`) const dbPath = `${dbDir}/${databaseId}` - if (!(await fileExists(dbPath))) { - console.log(`Database '${databaseId}' is not cached, downloading...`) - - const dumpPath = `${dumpDir}/${databaseId}.tar.gz` - - if (!(await fileExists(dumpPath))) { - connection.sendError({ - severity: 'FATAL', - code: 'XX000', - message: `database ${databaseId} not found`, - }) - connection.socket.end() - return - } - - // Create a directory for the database - await mkdir(dbPath, { recursive: true }) - - try { - // Extract the .tar.gz file - await pipeline(createReadStream(dumpPath), createGunzip(), extract({ cwd: dbPath })) - } catch (error) { - console.error(error) - await rm(dbPath, { recursive: true, force: true }) // Clean up the partially created directory - connection.sendError({ - severity: 'FATAL', - code: 'XX000', - message: `Error extracting database: ${(error as Error).message}`, - }) - connection.socket.end() - return - } - } - + log('opening database...') db = new PGlite({ dataDir: dbPath, extensions: { @@ -204,6 +155,7 @@ const server = net.createServer((socket) => { }, }) await db.waitReady + log('database open and ready') const { rows } = await db.query("SELECT 1 FROM pg_roles WHERE rolname = 'readonly_postgres';") if (rows.length === 0) { await db.exec(` @@ -212,6 +164,7 @@ const server = net.createServer((socket) => { `) } await db.close() + log('reopening database...') db = new PGlite({ dataDir: dbPath, username: 'readonly_postgres', @@ -220,6 +173,7 @@ const server = net.createServer((socket) => { }, }) await db.waitReady + log('database reopened and ready') }, async onMessage(data, { isAuthenticated }) { // Only forward messages to PGlite after authentication @@ -239,11 +193,16 @@ const server = net.createServer((socket) => { }) socket.on('close', async () => { - console.log('Client disconnected') + log('Client disconnected') await db?.close() }) }) server.listen(5432, async () => { - console.log('Server listening on port 5432') + log('Server listening on port 5432') }) + +function log(message: string) { + const timestamp = new Date().toISOString() + console.log(`[${timestamp}] ${message}`) +} diff --git a/apps/web/app/api/databases/[id]/route.ts b/apps/web/app/api/databases/[id]/route.ts index fe6c7046..432ddf8d 100644 --- a/apps/web/app/api/databases/[id]/route.ts +++ b/apps/web/app/api/databases/[id]/route.ts @@ -1,4 +1,4 @@ -import { DeleteObjectCommand, S3Client } from '@aws-sdk/client-s3' +import { S3Client, ListObjectsV2Command, DeleteObjectsCommand } from '@aws-sdk/client-s3' import { NextRequest, NextResponse } from 'next/server' import { createClient } from '~/utils/supabase/server' @@ -53,17 +53,40 @@ export async function DELETE( await supabase.from('deployed_databases').delete().eq('database_id', databaseId) - const key = `dbs/${databaseId}.tar.gz` - try { - await s3Client.send( - new DeleteObjectCommand({ + async function recursiveDelete(token?: string) { + // get the files + const listCommand = new ListObjectsV2Command({ + Bucket: process.env.AWS_S3_BUCKET, + Prefix: `dbs/${databaseId}`, + ContinuationToken: token, + }) + let list = await s3Client.send(listCommand) + if (list.KeyCount) { + // if items to delete + // delete the files + const deleteCommand = new DeleteObjectsCommand({ Bucket: process.env.AWS_S3_BUCKET, - Key: key, + Delete: { + Objects: list.Contents!.map((item) => ({ Key: item.Key })), + Quiet: false, + }, }) - ) - } catch (error) { - console.error(`Error deleting S3 object ${key}:`, error) + let deleted = await s3Client.send(deleteCommand) + + // log any errors deleting files + if (deleted.Errors) { + deleted.Errors.map((error) => + console.log(`${error.Key} could not be deleted - ${error.Code}`) + ) + } + } + // repeat if more files to delete + if (list.NextContinuationToken) { + await recursiveDelete(list.NextContinuationToken) + } } + // start the recursive function + await recursiveDelete() return NextResponse.json({ success: true, diff --git a/apps/web/app/api/databases/[id]/upload/route.ts b/apps/web/app/api/databases/[id]/upload/route.ts index d2fe9392..e5939321 100644 --- a/apps/web/app/api/databases/[id]/upload/route.ts +++ b/apps/web/app/api/databases/[id]/upload/route.ts @@ -1,11 +1,10 @@ -import { S3Client } from '@aws-sdk/client-s3' +import { S3Client, CompleteMultipartUploadCommandOutput } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import { NextRequest, NextResponse } from 'next/server' -import { createGzip } from 'zlib' -import { Readable } from 'stream' import { createClient } from '~/utils/supabase/server' import { createScramSha256Data } from 'pg-gateway' import { generateDatabasePassword } from '~/utils/generate-database-password' +import { entries } from 'streaming-tar' const wildcardDomain = process.env.NEXT_PUBLIC_WILDCARD_DOMAIN ?? 'db.example.com' const s3Client = new S3Client({ forcePathStyle: true }) @@ -62,6 +61,7 @@ export async function POST( ) } + // TODO: we should check the size of the uncompressed tarball const dumpSizeInMB = dump.size / (1024 * 1024) if (dumpSizeInMB > 100) { return NextResponse.json( @@ -74,21 +74,51 @@ export async function POST( } const databaseId = params.id - const key = `dbs/${databaseId}.tar.gz` - - const gzip = createGzip() - const body = Readable.from(streamToAsyncIterable(dump.stream())) + const directoryPrefix = `dbs/${databaseId}` + const tarEntryStream = + dump.type === 'application/x-gzip' + ? dump.stream().pipeThrough(new DecompressionStream('gzip')) + : dump.stream() + const uploads: Promise[] = [] + + for await (const entry of entries(tarEntryStream)) { + let upload: Upload + + switch (entry.type) { + case 'file': { + const buffer = new Uint8Array(await entry.arrayBuffer()) + upload = new Upload({ + client: s3Client, + params: { + Bucket: process.env.AWS_S3_BUCKET, + Key: `${directoryPrefix}${entry.name}`, + Body: buffer, + }, + }) + break + } + case 'directory': { + // Directories end in '/' and have an empty body + upload = new Upload({ + client: s3Client, + params: { + Bucket: process.env.AWS_S3_BUCKET, + Key: `${directoryPrefix}${entry.name}/`, + Body: new Uint8Array(), + }, + }) + await entry.skip() + break + } + default: { + continue + } + } - const upload = new Upload({ - client: s3Client, - params: { - Bucket: process.env.AWS_S3_BUCKET, - Key: key, - Body: body.pipe(gzip), - }, - }) + uploads.push(upload.done()) + } - await upload.done() + await Promise.all(uploads) const { data: existingDeployedDatabase } = await supabase .from('deployed_databases') @@ -127,16 +157,3 @@ export async function POST( }, }) } - -async function* streamToAsyncIterable(stream: ReadableStream) { - const reader = stream.getReader() - try { - while (true) { - const { done, value } = await reader.read() - if (done) return - yield value - } - } finally { - reader.releaseLock() - } -}