From 2a3cae19102b09704340a8d5fa1c38eb48c45d70 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Fri, 28 Mar 2025 18:41:54 +0000 Subject: [PATCH 01/11] feat(core): Add Supabase Queues support --- .../integrations/supabase/queues-rpc/init.js | 34 ++++++++ .../integrations/supabase/queues-rpc/test.ts | 64 ++++++++++++++ .../supabase/queues-schema/init.js | 38 ++++++++ .../supabase/queues-schema/test.ts | 63 ++++++++++++++ packages/core/src/integrations/supabase.ts | 86 +++++++++++++++++-- 5 files changed, 279 insertions(+), 6 deletions(-) create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js create mode 100644 dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js new file mode 100644 index 000000000000..7b0fdb096ebc --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js @@ -0,0 +1,34 @@ +import * as Sentry from '@sentry/browser'; + +import { createClient } from '@supabase/supabase-js'; +window.Sentry = Sentry; + +const queues = createClient('https://test.supabase.co', 'test-key', { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + tracesSampleRate: 1.0, +}); + +// Simulate queue operations +async function performQueueOperations() { + try { + await queues.rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); + + await queues.rpc('dequeue', { + queue_name: 'todos', + }); + } catch (error) { + Sentry.captureException(error); + } +} + +performQueueOperations(); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts new file mode 100644 index 000000000000..8b6ee89e9f81 --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts @@ -0,0 +1,64 @@ +import type { Page} from '@playwright/test'; +import { expect } from '@playwright/test'; +import type { Event } from '@sentry/core'; + +import { sentryTest } from '../../../../utils/fixtures'; +import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; + +async function mockSupabaseRoute(page: Page) { + await page.route('**/rest/v1/rpc**', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify({ + foo: ['bar', 'baz'], + }), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); +} + +sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => { + await mockSupabaseRoute(page); + + if (shouldSkipTracingTest()) { + return; + } + + const url = await getLocalTestUrl({ testDir: __dirname }); + + const event = await getFirstSentryEnvelopeRequest(page, url); + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + + expect(queueSpans).toHaveLength(2); + + expect(queueSpans![0]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + 'messaging.message.id': 'Test Todo', + }), + }); + + expect(queueSpans![1]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + }), + }); +}); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js new file mode 100644 index 000000000000..43c50357f1eb --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js @@ -0,0 +1,38 @@ +import * as Sentry from '@sentry/browser'; + +import { createClient } from '@supabase/supabase-js'; +window.Sentry = Sentry; + +const queues = createClient('https://test.supabase.co', 'test-key', { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + tracesSampleRate: 1.0, +}); + +// Simulate queue operations +async function performQueueOperations() { + try { + await queues + .schema('pgmq_public') + .rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); + + await queues + .schema('pgmq_public') + .rpc('dequeue', { + queue_name: 'todos', + }); + } catch (error) { + Sentry.captureException(error); + } +} + +performQueueOperations(); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts new file mode 100644 index 000000000000..8070a1b17357 --- /dev/null +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts @@ -0,0 +1,63 @@ +import { type Page, expect } from '@playwright/test'; +import type { Event } from '@sentry/core'; + +import { sentryTest } from '../../../../utils/fixtures'; +import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; + +async function mockSupabaseRoute(page: Page) { + await page.route('**/rest/v1/rpc**', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify({ + foo: ['bar', 'baz'], + }), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); +} + +sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => { + await mockSupabaseRoute(page); + + if (shouldSkipTracingTest()) { + return; + } + + const url = await getLocalTestUrl({ testDir: __dirname }); + + const event = await getFirstSentryEnvelopeRequest(page, url); + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + + expect(queueSpans).toHaveLength(2); + + expect(queueSpans![0]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + 'messaging.message.id': 'Test Todo', + }), + }); + + expect(queueSpans![1]).toMatchObject({ + description: 'supabase.db.rpc', + parent_span_id: event.contexts?.trace?.span_id, + span_id: expect.any(String), + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + trace_id: event.contexts?.trace?.trace_id, + data: expect.objectContaining({ + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + 'messaging.destination.name': 'todos', + }), + }); +}); diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index ac781e95ece6..39db5082f02e 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -12,6 +12,14 @@ import type { IntegrationFn } from '../types-hoist/integration'; import { isPlainObject } from '../utils/is'; import { logger } from '../utils/logger'; +export interface SupabaseClientConstructor { + prototype: { + from: (table: string) => PostgRESTQueryBuilder; + schema: (schema: string) => { rpc: (...args: unknown[]) => Promise }; + }; + rpc: (fn: string, params: Record) => Promise; +} + const AUTH_OPERATIONS_TO_INSTRUMENT = [ 'reauthenticate', 'signInAnonymously', @@ -113,12 +121,6 @@ export interface SupabaseBreadcrumb { }; } -export interface SupabaseClientConstructor { - prototype: { - from: (table: string) => PostgRESTQueryBuilder; - }; -} - export interface PostgRESTProtoThenable { then: ( onfulfilled?: ((value: T) => T | PromiseLike) | null, @@ -214,6 +216,76 @@ export function translateFiltersIntoMethods(key: string, query: string): string return `${method}(${key}, ${value.join('.')})`; } +function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { + (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, + { + apply(target, thisArg, argumentsList) { + const rv = Reflect.apply(target, thisArg, argumentsList); + + return instrumentRpc(rv); + }, + }, + ); +} + +function instrumentRpc(SupabaseClient: unknown): unknown { + (SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructor).rpc, + { + apply(target, thisArg, argumentsList) { + const isProducerSpan = argumentsList[0] === 'enqueue'; + const isConsumerSpan = argumentsList[0] === 'dequeue'; + + const maybeQueueParams = argumentsList[1]; + + // If the second argument is not an object, it's not a queue operation + if (!isPlainObject(maybeQueueParams)) { + return Reflect.apply(target, thisArg, argumentsList); + } + + const msg = maybeQueueParams?.msg as { title: string }; + + const messageId = msg?.title; + const queueName = maybeQueueParams?.queue_name as string; + + const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : ''; + + // If the operation is not a queue operation, return the original function + if (!op) { + return Reflect.apply(target, thisArg, argumentsList); + } + + return startSpan( + { + name: 'supabase.db.rpc', + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, + }, + }, + async span => { + return (Reflect.apply(target, thisArg, argumentsList) as Promise).then((res: unknown) => { + if (messageId) { + span.setAttribute('messaging.message.id', messageId); + } + + if (queueName) { + span.setAttribute('messaging.destination.name', queueName); + } + + span.end(); + return res; + }); + }, + ); + }, + }, + ); + + return SupabaseClient; +} + function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn { return new Proxy(operation, { apply(target, thisArg, argumentsList) { @@ -503,6 +575,8 @@ export const instrumentSupabaseClient = (supabaseClient: unknown): void => { supabaseClient.constructor === Function ? supabaseClient : supabaseClient.constructor; instrumentSupabaseClientConstructor(SupabaseClientConstructor); + instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor); + instrumentRpc(supabaseClient as SupabaseClientInstance); instrumentSupabaseAuthClient(supabaseClient as SupabaseClientInstance); }; From 2ba74cc1fc91315b582eb3a232d7f6867419eb07 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 22 Apr 2025 15:31:06 +0100 Subject: [PATCH 02/11] Skip tests on bundles --- .../suites/integrations/supabase/queues-rpc/test.ts | 6 ++++++ .../suites/integrations/supabase/queues-schema/test.ts | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts index 8b6ee89e9f81..2bd3f9bd4b1e 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts @@ -19,6 +19,12 @@ async function mockSupabaseRoute(page: Page) { }); } +const bundle = process.env.PW_BUNDLE || ''; +// We only want to run this in non-CDN bundle mode +if (bundle.startsWith('bundle')) { + sentryTest.skip(); +} + sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => { await mockSupabaseRoute(page); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts index 8070a1b17357..c08022acaa47 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts @@ -18,6 +18,12 @@ async function mockSupabaseRoute(page: Page) { }); } +const bundle = process.env.PW_BUNDLE || ''; +// We only want to run this in non-CDN bundle mode +if (bundle.startsWith('bundle')) { + sentryTest.skip(); +} + sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => { await mockSupabaseRoute(page); From e9f78e9c0201d5203642e18474ed4f3457141474 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 22 Apr 2025 16:30:35 +0100 Subject: [PATCH 03/11] Update test usage --- .../integrations/supabase/queues-rpc/init.js | 18 +++++++-------- .../supabase/queues-schema/init.js | 22 ++++++++----------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js index 7b0fdb096ebc..4ee653480bf5 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/browser'; import { createClient } from '@supabase/supabase-js'; window.Sentry = Sentry; -const queues = createClient('https://test.supabase.co', 'test-key', { +const supabaseClient = createClient('https://test.supabase.co', 'test-key', { db: { schema: 'pgmq_public', }, @@ -11,21 +11,21 @@ const queues = createClient('https://test.supabase.co', 'test-key', { Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', - integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })], tracesSampleRate: 1.0, }); // Simulate queue operations async function performQueueOperations() { try { - await queues.rpc('enqueue', { - queue_name: 'todos', - msg: { title: 'Test Todo' }, - }); + await supabaseClient.rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); - await queues.rpc('dequeue', { - queue_name: 'todos', - }); + await supabaseClient.rpc('dequeue', { + queue_name: 'todos', + }); } catch (error) { Sentry.captureException(error); } diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js index 43c50357f1eb..fa2c38cb4f43 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/browser'; import { createClient } from '@supabase/supabase-js'; window.Sentry = Sentry; -const queues = createClient('https://test.supabase.co', 'test-key', { +const supabaseClient = createClient('https://test.supabase.co', 'test-key', { db: { schema: 'pgmq_public', }, @@ -11,25 +11,21 @@ const queues = createClient('https://test.supabase.co', 'test-key', { Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', - integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration(queues)], + integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })], tracesSampleRate: 1.0, }); // Simulate queue operations async function performQueueOperations() { try { - await queues - .schema('pgmq_public') - .rpc('enqueue', { - queue_name: 'todos', - msg: { title: 'Test Todo' }, - }); + await supabaseClient.schema('pgmq_public').rpc('enqueue', { + queue_name: 'todos', + msg: { title: 'Test Todo' }, + }); - await queues - .schema('pgmq_public') - .rpc('dequeue', { - queue_name: 'todos', - }); + await supabaseClient.schema('pgmq_public').rpc('dequeue', { + queue_name: 'todos', + }); } catch (error) { Sentry.captureException(error); } From ae2cd7c21d8acac6a2df6bec95c3dd8b3f2eb28f Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 13 May 2025 12:45:54 +0100 Subject: [PATCH 04/11] Lint --- .../suites/integrations/supabase/queues-rpc/init.js | 2 +- .../suites/integrations/supabase/queues-rpc/test.ts | 3 +-- .../suites/integrations/supabase/queues-schema/init.js | 2 +- .../suites/integrations/supabase/queues-schema/test.ts | 1 - 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js index 4ee653480bf5..45c335254887 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js @@ -1,6 +1,6 @@ import * as Sentry from '@sentry/browser'; - import { createClient } from '@supabase/supabase-js'; + window.Sentry = Sentry; const supabaseClient = createClient('https://test.supabase.co', 'test-key', { diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts index 2bd3f9bd4b1e..0f11708bbedd 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts @@ -1,7 +1,6 @@ -import type { Page} from '@playwright/test'; +import type { Page } from '@playwright/test'; import { expect } from '@playwright/test'; import type { Event } from '@sentry/core'; - import { sentryTest } from '../../../../utils/fixtures'; import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js index fa2c38cb4f43..fbdbd38a4ccc 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js @@ -1,6 +1,6 @@ import * as Sentry from '@sentry/browser'; - import { createClient } from '@supabase/supabase-js'; + window.Sentry = Sentry; const supabaseClient = createClient('https://test.supabase.co', 'test-key', { diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts index c08022acaa47..e7ad4154f87b 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts @@ -1,6 +1,5 @@ import { type Page, expect } from '@playwright/test'; import type { Event } from '@sentry/core'; - import { sentryTest } from '../../../../utils/fixtures'; import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; From 494fe7e44cb17bc3cefbdaf7ac886cf8c98871d1 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 27 May 2025 14:26:33 +0100 Subject: [PATCH 05/11] Update implementation --- .../supabase-nextjs/package.json | 4 +- .../pages/api/batch_enqueue.ts | 37 ++ .../pages/api/dequeue-error.ts | 27 ++ .../supabase-nextjs/pages/api/dequeue-rpc.ts | 31 ++ .../pages/api/dequeue-schema.ts | 25 ++ .../supabase-nextjs/pages/api/enqueue-rpc.ts | 32 ++ .../pages/api/enqueue-schema.ts | 28 ++ .../supabase-nextjs/pages/api/queue_read.ts | 31 ++ .../supabase-nextjs/supabase/config.toml | 9 +- .../migrations/20230712094349_init.sql | 2 +- .../20250515080602_enable-queues.sql | 182 ++++++++++ .../supabase-nextjs/supabase/seed.sql | 2 - .../supabase-nextjs/tests/performance.test.ts | 335 +++++++++++++++++- packages/core/src/integrations/supabase.ts | 225 ++++++++---- 14 files changed, 898 insertions(+), 72 deletions(-) create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts create mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json index a46519e9c75d..7f505c88743d 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json @@ -7,7 +7,7 @@ "build": "next build", "start": "next start", "clean": "npx rimraf node_modules pnpm-lock.yaml .next", - "start-local-supabase": "supabase init --force --workdir . && supabase start -o env && supabase db reset", + "start-local-supabase": "supabase start -o env && supabase db reset", "test:prod": "TEST_ENV=production playwright test", "test:build": "pnpm install && pnpm start-local-supabase && pnpm build", "test:assert": "pnpm test:prod" @@ -25,7 +25,7 @@ "next": "14.2.25", "react": "18.2.0", "react-dom": "18.2.0", - "supabase": "2.19.7", + "supabase": "2.22.12", "typescript": "4.9.5" }, "devDependencies": { diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts new file mode 100644 index 000000000000..14208a00f450 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts @@ -0,0 +1,37 @@ +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Enqueue a job to the queue + const { data, error } = await supabaseClient.rpc('send_batch', { + queue_name: 'todos', + messages: [ + { + title: 'Test Todo 1', + }, + { + title: 'Test Todo 2', + }, + ], + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts new file mode 100644 index 000000000000..d6543c0d2ede --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts @@ -0,0 +1,27 @@ +// Enqueue a job to the queue + +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Enqueue a job to the queue + const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', { + queue_name: 'non-existing-queue', + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts new file mode 100644 index 000000000000..e1c7caa0c6d0 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts @@ -0,0 +1,31 @@ +// Enqueue a job to the queue + +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Enqueue a job to the queue + const { data, error } = await supabaseClient.rpc('pop', { + queue_name: 'todos', + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts new file mode 100644 index 000000000000..ec77e7258e1e --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts @@ -0,0 +1,25 @@ +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Process a job from the queue + const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', { + queue_name: 'todos', + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts new file mode 100644 index 000000000000..a4d161fc224e --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts @@ -0,0 +1,32 @@ +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Enqueue a job to the queue + const { data, error } = await supabaseClient.rpc('send', { + queue_name: 'todos', + message: { + title: 'Test Todo', + }, + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts new file mode 100644 index 000000000000..92f81f27d49e --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts @@ -0,0 +1,28 @@ +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Enqueue a job to the queue + const { data, error } = await supabaseClient.schema('pgmq_public').rpc('send', { + queue_name: 'todos', + message: { + title: 'Test Todo', + }, + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts new file mode 100644 index 000000000000..8fbc98584128 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts @@ -0,0 +1,31 @@ +import { NextApiRequest, NextApiResponse } from 'next'; +import { createClient } from '@supabase/supabase-js'; +import * as Sentry from '@sentry/nextjs'; + +// These are the default development keys for a local Supabase instance +const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; +const SUPABASE_SERVICE_ROLE_KEY = + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; + +const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { + db: { + schema: 'pgmq_public', + }, +}); + +Sentry.instrumentSupabaseClient(supabaseClient); + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + // Read from queue + const { data, error } = await supabaseClient.rpc('read', { + queue_name: 'todos', + n: 2, + sleep_seconds: 0, + }); + + if (error) { + return res.status(500).json({ error: error.message }); + } + + return res.status(200).json({ data }); +} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml index 35dcff35bec4..6d003c8a64fd 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/config.toml @@ -10,9 +10,9 @@ enabled = true port = 54321 # Schemas to expose in your API. Tables, views and stored procedures in this schema will get API # endpoints. `public` and `graphql_public` schemas are included by default. -schemas = ["public", "graphql_public"] +schemas = ["public", "graphql_public", "storage", "pgmq_public"] # Extra schemas to add to the search_path of every request. -extra_search_path = ["public", "extensions"] +extra_search_path = ["public", "extensions", "pgmq_public"] # The maximum number of rows returns from a view, table, or stored procedure. Limits payload size # for accidental or malicious requests. max_rows = 1000 @@ -28,7 +28,7 @@ port = 54322 shadow_port = 54320 # The database major version to use. This has to be the same as your remote database's. Run `SHOW # server_version;` on the remote database to check. -major_version = 15 +major_version = 17 [db.pooler] enabled = false @@ -141,7 +141,6 @@ sign_in_sign_ups = 30 # Number of OTP / Magic link verifications that can be made in a 5 minute interval per IP address. token_verifications = 30 - # Configure one of the supported captcha providers: `hcaptcha`, `turnstile`. # [auth.captcha] # enabled = true @@ -283,6 +282,8 @@ enabled = true policy = "oneshot" # Port to attach the Chrome inspector for debugging edge functions. inspector_port = 8083 +# The Deno major version to use. +deno_version = 1 # [edge_runtime.secrets] # secret_key = "env(SECRET_VALUE)" diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql index 1b1a98ace2e4..2af0497506c6 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20230712094349_init.sql @@ -13,4 +13,4 @@ create policy "Individuals can view their own todos. " on todos for create policy "Individuals can update their own todos." on todos for update using (auth.uid() = user_id); create policy "Individuals can delete their own todos." on todos for - delete using (auth.uid() = user_id); \ No newline at end of file + delete using (auth.uid() = user_id); diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql new file mode 100644 index 000000000000..8eba5c8de3a4 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/migrations/20250515080602_enable-queues.sql @@ -0,0 +1,182 @@ + +-- Enable queues +create extension if not exists "pgmq"; +select pgmq.create('todos'); +alter table "pgmq"."q_todos" enable row level security; + +--- The following code is vendored in from the supabase implementation for now +--- By default, the pgmq schema is not exposed to the public +--- And there is no other way to enable access locally without using the UI +--- Vendored from: https://github.com/supabase/supabase/blob/aa9070c9087ce8c37a27e7c74ea0353858aed6c2/apps/studio/data/database-queues/database-queues-toggle-postgrest-mutation.ts#L18-L191 +create schema if not exists pgmq_public; +grant usage on schema pgmq_public to postgres, anon, authenticated, service_role; + +create or replace function pgmq_public.pop( + queue_name text +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.pop( + queue_name := queue_name + ); +end; +$$; + +comment on function pgmq_public.pop(queue_name text) is 'Retrieves and locks the next message from the specified queue.'; + + +create or replace function pgmq_public.send( + queue_name text, + message jsonb, + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send( + queue_name := queue_name, + msg := message, + delay := sleep_seconds + ); +end; +$$; + +comment on function pgmq_public.send(queue_name text, message jsonb, sleep_seconds integer) is 'Sends a message to the specified queue, optionally delaying its availability by a number of seconds.'; + + +create or replace function pgmq_public.send_batch( + queue_name text, + messages jsonb[], + sleep_seconds integer default 0 -- renamed from 'delay' +) + returns setof bigint + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.send_batch( + queue_name := queue_name, + msgs := messages, + delay := sleep_seconds + ); +end; +$$; + +comment on function pgmq_public.send_batch(queue_name text, messages jsonb[], sleep_seconds integer) is 'Sends a batch of messages to the specified queue, optionally delaying their availability by a number of seconds.'; + + +create or replace function pgmq_public.archive( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.archive( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; + +comment on function pgmq_public.archive(queue_name text, message_id bigint) is 'Archives a message by moving it from the queue to a permanent archive.'; + + +create or replace function pgmq_public.delete( + queue_name text, + message_id bigint +) + returns boolean + language plpgsql + set search_path = '' +as $$ +begin + return + pgmq.delete( + queue_name := queue_name, + msg_id := message_id + ); +end; +$$; + +comment on function pgmq_public.delete(queue_name text, message_id bigint) is 'Permanently deletes a message from the specified queue.'; + +create or replace function pgmq_public.read( + queue_name text, + sleep_seconds integer, + n integer +) + returns setof pgmq.message_record + language plpgsql + set search_path = '' +as $$ +begin + return query + select * + from pgmq.read( + queue_name := queue_name, + vt := sleep_seconds, + qty := n + ); +end; +$$; + +comment on function pgmq_public.read(queue_name text, sleep_seconds integer, n integer) is 'Reads up to "n" messages from the specified queue with an optional "sleep_seconds" (visibility timeout).'; + +-- Grant execute permissions on wrapper functions to roles +grant execute on function pgmq_public.pop(text) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.pop(text) to postgres, service_role, anon, authenticated; + +grant execute on function pgmq_public.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send(text, jsonb, integer) to postgres, service_role, anon, authenticated; + +grant execute on function pgmq_public.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.send_batch(text, jsonb[], integer) to postgres, service_role, anon, authenticated; + +grant execute on function pgmq_public.archive(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.archive(text, bigint) to postgres, service_role, anon, authenticated; + +grant execute on function pgmq_public.delete(text, bigint) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.delete(text, bigint) to postgres, service_role, anon, authenticated; + +grant execute on function pgmq_public.read(text, integer, integer) to postgres, service_role, anon, authenticated; +grant execute on function pgmq.read(text, integer, integer) to postgres, service_role, anon, authenticated; + +-- For the service role, we want full access +-- Grant permissions on existing tables +grant all privileges on all tables in schema pgmq to postgres, service_role; + +-- Ensure service_role has permissions on future tables +alter default privileges in schema pgmq grant all privileges on tables to postgres, service_role; + +grant usage on schema pgmq to postgres, anon, authenticated, service_role; + + +/* + Grant access to sequences to API roles by default. Existing table permissions + continue to enforce insert restrictions. This is necessary to accommodate the + on-backup hook that rebuild queue table primary keys to avoid a pg_dump segfault. + This can be removed once logical backups are completely retired. +*/ +grant usage, select, update +on all sequences in schema pgmq +to anon, authenticated, service_role; + +alter default privileges in schema pgmq +grant usage, select, update +on sequences +to anon, authenticated, service_role; diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql index 57b5c4d07e05..e69de29bb2d1 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/supabase/seed.sql @@ -1,2 +0,0 @@ -TRUNCATE auth.users CASCADE; -TRUNCATE auth.identities CASCADE; diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts index cfb66b372420..c8485cff264a 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts @@ -1,5 +1,5 @@ import { expect, test } from '@playwright/test'; -import { waitForTransaction } from '@sentry-internal/test-utils'; +import { waitForError, waitForTransaction } from '@sentry-internal/test-utils'; // This test should be run in serial mode to ensure that the test user is created before the other tests test.describe.configure({ mode: 'serial' }); @@ -210,3 +210,336 @@ test('Sends server-side Supabase auth admin `listUsers` span', async ({ page, ba origin: 'auto.db.supabase', }); }); + +test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/enqueue-schema' + ); + }); + + const result = await fetch(`${baseURL}/api/enqueue-schema`); + + expect(result.status).toBe(200); + expect(await result.json()).toEqual({ data: [1] }); + + const transactionEvent = await httpTransactionPromise; + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '1', + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.publish', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.breadcrumbs).toContainEqual({ + timestamp: expect.any(Number), + type: 'supabase', + category: 'db.rpc.send', + message: 'rpc(send)', + data: { + 'messaging.destination.name': 'todos', + 'messaging.message.id': '1', + }, + }); +}); + +test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/enqueue-rpc' + ); + }); + + const result = await fetch(`${baseURL}/api/enqueue-rpc`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(200); + expect(await result.json()).toEqual({ data: [2] }); + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '2', + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.publish', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.breadcrumbs).toContainEqual({ + timestamp: expect.any(Number), + type: 'supabase', + category: 'db.rpc.send', + message: 'rpc(send)', + data: { + 'messaging.destination.name': 'todos', + 'messaging.message.id': '2', + }, + }); +}); + +test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/dequeue-schema' + ); + }); + + const result = await fetch(`${baseURL}/api/dequeue-schema`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(200); + expect(await result.json()).toEqual( + expect.objectContaining({ data: [expect.objectContaining({ message: { title: 'Test Todo' }, msg_id: 1 })] }), + ); + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '1', + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.process', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.breadcrumbs).toContainEqual({ + timestamp: expect.any(Number), + type: 'supabase', + category: 'db.rpc.pop', + message: 'rpc(pop)', + data: { + 'messaging.destination.name': 'todos', + 'messaging.message.id': '1', + }, + }); +}); + +test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/dequeue-rpc' + ); + }); + + const result = await fetch(`${baseURL}/api/dequeue-rpc`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(200); + expect(await result.json()).toEqual( + expect.objectContaining({ data: [expect.objectContaining({ message: { title: 'Test Todo' }, msg_id: 2 })] }), + ); + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '2', + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.process', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.breadcrumbs).toContainEqual({ + timestamp: expect.any(Number), + type: 'supabase', + category: 'db.rpc.pop', + message: 'rpc(pop)', + data: { + 'messaging.destination.name': 'todos', + 'messaging.message.id': '2', + }, + }); +}); + +test('Sends queue process error spans with `rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/dequeue-error' + ); + }); + + const errorEventPromise = waitForError('supabase-nextjs', errorEvent => { + return errorEvent?.exception?.values?.[0]?.value?.includes('pgmq.q_non-existing-queue'); + }); + + const result = await fetch(`${baseURL}/api/dequeue-error`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(500); + expect(await result.json()).toEqual( + expect.objectContaining({ + error: expect.stringContaining('relation "pgmq.q_non-existing-queue" does not exist'), + }), + ); + + const errorEvent = await errorEventPromise; + expect(errorEvent).toBeDefined(); + + expect(errorEvent.exception?.values?.[0].value).toBe('relation "pgmq.q_non-existing-queue" does not exist'); + expect(errorEvent.contexts?.supabase).toEqual({ + queueName: 'non-existing-queue', + }); + + expect(errorEvent.breadcrumbs).toContainEqual( + expect.objectContaining({ + type: 'supabase', + category: 'db.rpc.pop', + message: 'rpc(pop)', + data: { + 'messaging.destination.name': 'non-existing-queue', + }, + }), + ); + + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'non-existing-queue', + 'messaging.system': 'supabase', + 'sentry.op': 'queue.process', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.process', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'unknown_error', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); +}); + +test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && + transactionEvent?.transaction === 'GET /api/batch_enqueue' + ); + }); + + const result = await fetch(`${baseURL}/api/batch_enqueue`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(200); + expect(await result.json()).toEqual({ data: [3, 4] }); + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '3,4', + 'sentry.op': 'queue.publish', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.publish', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.breadcrumbs).toContainEqual({ + timestamp: expect.any(Number), + type: 'supabase', + category: 'db.rpc.send_batch', + message: 'rpc(send_batch)', + data: { + 'messaging.destination.name': 'todos', + 'messaging.message.id': '3,4', + }, + }); +}); + +test('Sends `read` queue operation spans with `rpc(...)`', async ({ page, baseURL }) => { + const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + return ( + transactionEvent?.contexts?.trace?.op === 'http.server' && transactionEvent?.transaction === 'GET /api/queue_read' + ); + }); + const result = await fetch(`${baseURL}/api/queue_read`); + const transactionEvent = await httpTransactionPromise; + + expect(result.status).toBe(200); + expect(await result.json()).toEqual( + expect.objectContaining({ data: [ + expect.objectContaining({ message: { title: 'Test Todo 1' }, msg_id: 3 }), + expect.objectContaining({ message: { title: 'Test Todo 2' }, msg_id: 4 }), + ] }), + ); + + expect(transactionEvent.spans).toHaveLength(2); + expect(transactionEvent.spans).toContainEqual({ + data: { + 'messaging.destination.name': 'todos', + 'messaging.system': 'supabase', + 'messaging.message.id': '3,4', + 'sentry.op': 'queue.receive', + 'sentry.origin': 'auto.db.supabase', + }, + description: 'supabase.db.rpc', + op: 'queue.receive', + origin: 'auto.db.supabase', + parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), + span_id: expect.stringMatching(/[a-f0-9]{16}/), + start_timestamp: expect.any(Number), + status: 'ok', + timestamp: expect.any(Number), + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); +}); diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index 39db5082f02e..a14dd9d570ea 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -80,6 +80,7 @@ type AuthAdminOperationName = (typeof AUTH_ADMIN_OPERATIONS_TO_INSTRUMENT)[numbe type PostgRESTQueryOperationFn = (...args: unknown[]) => PostgRESTFilterBuilder; export interface SupabaseClientInstance { + rpc: (fn: string, params: Record) => Promise; auth: { admin: Record; } & Record; @@ -99,6 +100,12 @@ export interface PostgRESTFilterBuilder { export interface SupabaseResponse { status?: number; + data?: Array< + | number + | { + msg_id?: number; + } + >; error?: { message: string; code?: string; @@ -217,73 +224,160 @@ export function translateFiltersIntoMethods(key: string, query: string): string } function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { + if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { + return; + } + (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, { apply(target, thisArg, argumentsList) { - const rv = Reflect.apply(target, thisArg, argumentsList); + const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); - return instrumentRpc(rv); + (supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( + (supabaseInstance as unknown as SupabaseClientInstance).rpc, + { + apply(target, thisArg, argumentsList) { + return instrumentRpcImpl(target, thisArg, argumentsList); + }, + }, + ); + + return supabaseInstance; }, }, ); + + markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); } -function instrumentRpc(SupabaseClient: unknown): unknown { - (SupabaseClient as unknown as SupabaseClientConstructor).rpc = new Proxy( - (SupabaseClient as unknown as SupabaseClientConstructor).rpc, +const instrumentRpcImpl = (target: any, thisArg: any, argumentsList: any[]): Promise => { + const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; + const isConsumerSpan = argumentsList[0] === 'pop'; + const isReceiverSpan = argumentsList[0] === 'read'; + + if (!isProducerSpan && !isConsumerSpan && !isReceiverSpan) { + return Reflect.apply(target, thisArg, argumentsList); + } + + const maybeQueueParams = argumentsList[1]; + + // If the second argument is not an object, it's not a queue operation + if (!isPlainObject(maybeQueueParams)) { + return Reflect.apply(target, thisArg, argumentsList); + } + + const queueName = maybeQueueParams?.queue_name as string; + + const op = isProducerSpan + ? 'queue.publish' + : isConsumerSpan + ? 'queue.process' + : isReceiverSpan + ? 'queue.receive' + : ''; + + // If the operation is not a queue operation, return the original function + if (!op) { + return Reflect.apply(target, thisArg, argumentsList); + } + + return startSpan( { - apply(target, thisArg, argumentsList) { - const isProducerSpan = argumentsList[0] === 'enqueue'; - const isConsumerSpan = argumentsList[0] === 'dequeue'; + name: 'supabase.db.rpc', + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, + 'messaging.system': 'supabase', + }, + }, + async span => { + try { + return (Reflect.apply(target, thisArg, argumentsList) as Promise).then( + (res: SupabaseResponse) => { + const messageId = + res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined; - const maybeQueueParams = argumentsList[1]; + if (messageId) { + span.setAttribute('messaging.message.id', messageId); + } - // If the second argument is not an object, it's not a queue operation - if (!isPlainObject(maybeQueueParams)) { - return Reflect.apply(target, thisArg, argumentsList); - } + if (queueName) { + span.setAttribute('messaging.destination.name', queueName); + } - const msg = maybeQueueParams?.msg as { title: string }; + const breadcrumb: SupabaseBreadcrumb = { + type: 'supabase', + category: `db.rpc.${argumentsList[0]}`, + message: `rpc(${argumentsList[0]})`, + }; - const messageId = msg?.title; - const queueName = maybeQueueParams?.queue_name as string; + const data: Record = {}; - const op = isProducerSpan ? 'queue.publish' : isConsumerSpan ? 'queue.process' : ''; + if (messageId) { + data['messaging.message.id'] = messageId; + } - // If the operation is not a queue operation, return the original function - if (!op) { - return Reflect.apply(target, thisArg, argumentsList); - } + if (queueName) { + data['messaging.destination.name'] = queueName; + } - return startSpan( - { - name: 'supabase.db.rpc', - attributes: { - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', - [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, - }, - }, - async span => { - return (Reflect.apply(target, thisArg, argumentsList) as Promise).then((res: unknown) => { - if (messageId) { - span.setAttribute('messaging.message.id', messageId); + if (Object.keys(data).length) { + breadcrumb.data = data; + } + + addBreadcrumb(breadcrumb); + + if (res.error) { + const err = new Error(res.error.message) as SupabaseError; + + if (res.error.code) { + err.code = res.error.code; } - if (queueName) { - span.setAttribute('messaging.destination.name', queueName); + if (res.error.details) { + err.details = res.error.details; } - span.end(); - return res; - }); + captureException(err, { + contexts: { + supabase: { + queueName, + messageId, + }, + }, + }); + + span.setStatus({ code: SPAN_STATUS_ERROR }); + } + + span.end(); + + return res; }, ); - }, + } catch (err: unknown) { + span.setStatus({ code: SPAN_STATUS_ERROR }); + span.end(); + captureException(err, { + mechanism: { + handled: false, + }, + }); + } }, ); +}; - return SupabaseClient; +function instrumentRpc(SupabaseClient: unknown): void { + (SupabaseClient as unknown as SupabaseClientInstance).rpc = new Proxy( + (SupabaseClient as unknown as SupabaseClientInstance).rpc, + { + apply(target, thisArg, argumentsList) { + return instrumentRpcImpl(target, thisArg, argumentsList); + }, + }, + ); } function instrumentAuthOperation(operation: AuthOperationFn, isAdmin = false): AuthOperationFn { @@ -414,6 +508,13 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte } const pathParts = typedThis.url.pathname.split('/'); + + if (pathParts.includes('rpc')) { + // RPC calls are instrumented in the `instrumentRpc` function + // and should not be instrumented here. + return Reflect.apply(target, thisArg, argumentsList); + } + const table = pathParts.length > 0 ? pathParts[pathParts.length - 1] : ''; const queryItems: string[] = []; @@ -471,6 +572,28 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte span.end(); } + const breadcrumb: SupabaseBreadcrumb = { + type: 'supabase', + category: `db.${operation}`, + message: description, + }; + + const data: Record = {}; + + if (queryItems.length) { + data.query = queryItems; + } + + if (Object.keys(body).length) { + data.body = body; + } + + if (Object.keys(data).length) { + breadcrumb.data = data; + } + + addBreadcrumb(breadcrumb); + if (res.error) { const err = new Error(res.error.message) as SupabaseError; if (res.error.code) { @@ -495,28 +618,6 @@ function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilte }); } - const breadcrumb: SupabaseBreadcrumb = { - type: 'supabase', - category: `db.${operation}`, - message: description, - }; - - const data: Record = {}; - - if (queryItems.length) { - data.query = queryItems; - } - - if (Object.keys(body).length) { - data.body = body; - } - - if (Object.keys(data).length) { - breadcrumb.data = data; - } - - addBreadcrumb(breadcrumb); - return res; }, (err: Error) => { From 18400c8895197d92a4510fbf57b1e4073422c84b Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Tue, 27 May 2025 15:21:19 +0100 Subject: [PATCH 06/11] Update playwright tests --- .../integrations/supabase/queues-rpc/init.js | 4 +-- .../integrations/supabase/queues-rpc/test.ts | 29 +++++++++++++----- .../supabase/queues-schema/init.js | 4 +-- .../supabase/queues-schema/test.ts | 30 ++++++++++++++----- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js index 45c335254887..15309015bbd9 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js @@ -18,12 +18,12 @@ Sentry.init({ // Simulate queue operations async function performQueueOperations() { try { - await supabaseClient.rpc('enqueue', { + await supabaseClient.rpc('send', { queue_name: 'todos', msg: { title: 'Test Todo' }, }); - await supabaseClient.rpc('dequeue', { + await supabaseClient.rpc('pop', { queue_name: 'todos', }); } catch (error) { diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts index 0f11708bbedd..d0f1534a404a 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/test.ts @@ -5,12 +5,24 @@ import { sentryTest } from '../../../../utils/fixtures'; import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; async function mockSupabaseRoute(page: Page) { - await page.route('**/rest/v1/rpc**', route => { + await page.route('**/rpc/**/send', route => { return route.fulfill({ status: 200, - body: JSON.stringify({ - foo: ['bar', 'baz'], - }), + body: JSON.stringify([0]), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); + + await page.route('**/rpc/**/pop', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify([ + { + msg_id: 0, + }, + ]), headers: { 'Content-Type': 'application/json', }, @@ -25,16 +37,16 @@ if (bundle.startsWith('bundle')) { } sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => { - await mockSupabaseRoute(page); - if (shouldSkipTracingTest()) { return; } + await mockSupabaseRoute(page); + const url = await getLocalTestUrl({ testDir: __dirname }); const event = await getFirstSentryEnvelopeRequest(page, url); - const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.')); expect(queueSpans).toHaveLength(2); @@ -49,7 +61,7 @@ sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLo 'sentry.op': 'queue.publish', 'sentry.origin': 'auto.db.supabase', 'messaging.destination.name': 'todos', - 'messaging.message.id': 'Test Todo', + 'messaging.message.id': '0', }), }); @@ -64,6 +76,7 @@ sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLo 'sentry.op': 'queue.process', 'sentry.origin': 'auto.db.supabase', 'messaging.destination.name': 'todos', + 'messaging.message.id': '0', }), }); }); diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js index fbdbd38a4ccc..0cbc629a2b3e 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js @@ -18,12 +18,12 @@ Sentry.init({ // Simulate queue operations async function performQueueOperations() { try { - await supabaseClient.schema('pgmq_public').rpc('enqueue', { + await supabaseClient.schema('pgmq_public').rpc('send', { queue_name: 'todos', msg: { title: 'Test Todo' }, }); - await supabaseClient.schema('pgmq_public').rpc('dequeue', { + await supabaseClient.schema('pgmq_public').rpc('pop', { queue_name: 'todos', }); } catch (error) { diff --git a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts index e7ad4154f87b..6417f7796964 100644 --- a/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts +++ b/dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/test.ts @@ -4,12 +4,24 @@ import { sentryTest } from '../../../../utils/fixtures'; import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers'; async function mockSupabaseRoute(page: Page) { - await page.route('**/rest/v1/rpc**', route => { + await page.route('**/rpc/**/send', route => { return route.fulfill({ status: 200, - body: JSON.stringify({ - foo: ['bar', 'baz'], - }), + body: JSON.stringify([0]), + headers: { + 'Content-Type': 'application/json', + }, + }); + }); + + await page.route('**/rpc/**/pop', route => { + return route.fulfill({ + status: 200, + body: JSON.stringify([ + { + msg_id: 0, + }, + ]), headers: { 'Content-Type': 'application/json', }, @@ -24,16 +36,17 @@ if (bundle.startsWith('bundle')) { } sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => { - await mockSupabaseRoute(page); - if (shouldSkipTracingTest()) { return; } + await mockSupabaseRoute(page); + const url = await getLocalTestUrl({ testDir: __dirname }); const event = await getFirstSentryEnvelopeRequest(page, url); - const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue')); + + const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.')); expect(queueSpans).toHaveLength(2); @@ -48,7 +61,7 @@ sentryTest('should capture Supabase queue spans from client.schema(...).rpc', as 'sentry.op': 'queue.publish', 'sentry.origin': 'auto.db.supabase', 'messaging.destination.name': 'todos', - 'messaging.message.id': 'Test Todo', + 'messaging.message.id': '0', }), }); @@ -63,6 +76,7 @@ sentryTest('should capture Supabase queue spans from client.schema(...).rpc', as 'sentry.op': 'queue.process', 'sentry.origin': 'auto.db.supabase', 'messaging.destination.name': 'todos', + 'messaging.message.id': '0', }), }); }); From 1e15e150301aacedba9daa2ba7e591fc58ce5bee Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Wed, 28 May 2025 12:25:03 +0100 Subject: [PATCH 07/11] Tidy up test endpoints --- .../consumer-error.ts} | 0 .../{dequeue-rpc.ts => queue/consumer-rpc.ts} | 0 .../consumer-schema.ts} | 0 .../producer-batch.ts} | 0 .../{enqueue-rpc.ts => queue/producer-rpc.ts} | 0 .../producer-schema.ts} | 0 .../{queue_read.ts => queue/receiver-rpc.ts} | 0 .../supabase-nextjs/tests/performance.test.ts | 28 +++++++++---------- 8 files changed, 14 insertions(+), 14 deletions(-) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{dequeue-error.ts => queue/consumer-error.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{dequeue-rpc.ts => queue/consumer-rpc.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{dequeue-schema.ts => queue/consumer-schema.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{batch_enqueue.ts => queue/producer-batch.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{enqueue-rpc.ts => queue/producer-rpc.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{enqueue-schema.ts => queue/producer-schema.ts} (100%) rename dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/{queue_read.ts => queue/receiver-rpc.ts} (100%) diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-error.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-rpc.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-schema.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/dequeue-schema.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-schema.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-batch.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/batch_enqueue.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-batch.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-rpc.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-rpc.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-rpc.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-schema.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/enqueue-schema.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/producer-schema.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts similarity index 100% rename from dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue_read.ts rename to dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts index c8485cff264a..f3b46ad0f4e4 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts @@ -215,11 +215,11 @@ test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, bas const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/enqueue-schema' + transactionEvent?.transaction === 'GET /api/queue/producer-schema' ); }); - const result = await fetch(`${baseURL}/api/enqueue-schema`); + const result = await fetch(`${baseURL}/api/queue/producer-schema`); expect(result.status).toBe(200); expect(await result.json()).toEqual({ data: [1] }); @@ -262,11 +262,11 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => { const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/enqueue-rpc' + transactionEvent?.transaction === 'GET /api/queue/producer-rpc' ); }); - const result = await fetch(`${baseURL}/api/enqueue-rpc`); + const result = await fetch(`${baseURL}/api/queue/producer-rpc`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(200); @@ -308,11 +308,11 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/dequeue-schema' + transactionEvent?.transaction === 'GET /api/queue/consumer-schema' ); }); - const result = await fetch(`${baseURL}/api/dequeue-schema`); + const result = await fetch(`${baseURL}/api/queue/consumer-schema`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(200); @@ -356,11 +356,11 @@ test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => { const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/dequeue-rpc' + transactionEvent?.transaction === 'GET /api/queue/consumer-rpc' ); }); - const result = await fetch(`${baseURL}/api/dequeue-rpc`); + const result = await fetch(`${baseURL}/api/queue/consumer-rpc`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(200); @@ -404,7 +404,7 @@ test('Sends queue process error spans with `rpc(...)`', async ({ page, baseURL } const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/dequeue-error' + transactionEvent?.transaction === 'GET /api/queue/consumer-error' ); }); @@ -412,7 +412,7 @@ test('Sends queue process error spans with `rpc(...)`', async ({ page, baseURL } return errorEvent?.exception?.values?.[0]?.value?.includes('pgmq.q_non-existing-queue'); }); - const result = await fetch(`${baseURL}/api/dequeue-error`); + const result = await fetch(`${baseURL}/api/queue/consumer-error`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(500); @@ -464,11 +464,11 @@ test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL } const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/batch_enqueue' + transactionEvent?.transaction === 'GET /api/queue/producer-batch' ); }); - const result = await fetch(`${baseURL}/api/batch_enqueue`); + const result = await fetch(`${baseURL}/api/queue/producer-batch`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(200); @@ -509,10 +509,10 @@ test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL } test('Sends `read` queue operation spans with `rpc(...)`', async ({ page, baseURL }) => { const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( - transactionEvent?.contexts?.trace?.op === 'http.server' && transactionEvent?.transaction === 'GET /api/queue_read' + transactionEvent?.contexts?.trace?.op === 'http.server' && transactionEvent?.transaction === 'GET /api/queue/receiver-rpc' ); }); - const result = await fetch(`${baseURL}/api/queue_read`); + const result = await fetch(`${baseURL}/api/queue/receiver-rpc`); const transactionEvent = await httpTransactionPromise; expect(result.status).toBe(200); From 5a312c98f8f99fc193818be6b83a34792f5fcbac Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Sat, 31 May 2025 21:08:24 +0100 Subject: [PATCH 08/11] Refactor / reimplement --- .../supabase-nextjs/package.json | 2 +- .../pages/api/queue/receiver-rpc.ts | 31 --- .../supabase-nextjs/tests/performance.test.ts | 107 +++----- packages/core/src/integrations/supabase.ts | 257 ++++++++++++++---- 4 files changed, 246 insertions(+), 151 deletions(-) delete mode 100644 dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json index 7f505c88743d..6429b0e8b9c2 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/package.json @@ -25,7 +25,7 @@ "next": "14.2.25", "react": "18.2.0", "react-dom": "18.2.0", - "supabase": "2.22.12", + "supabase": "2.23.4", "typescript": "4.9.5" }, "devDependencies": { diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts deleted file mode 100644 index 8fbc98584128..000000000000 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/receiver-rpc.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { NextApiRequest, NextApiResponse } from 'next'; -import { createClient } from '@supabase/supabase-js'; -import * as Sentry from '@sentry/nextjs'; - -// These are the default development keys for a local Supabase instance -const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321'; -const SUPABASE_SERVICE_ROLE_KEY = - 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU'; - -const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, { - db: { - schema: 'pgmq_public', - }, -}); - -Sentry.instrumentSupabaseClient(supabaseClient); - -export default async function handler(req: NextApiRequest, res: NextApiResponse) { - // Read from queue - const { data, error } = await supabaseClient.rpc('read', { - queue_name: 'todos', - n: 2, - sleep_seconds: 0, - }); - - if (error) { - return res.status(500).json({ error: error.message }); - } - - return res.status(200).json({ data }); -} diff --git a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts index f3b46ad0f4e4..c91e9f50bb98 100644 --- a/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts +++ b/dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts @@ -305,42 +305,48 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => { }); test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => { - const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + const consumerSpanPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( - transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/queue/consumer-schema' + transactionEvent?.contexts?.trace?.op === 'queue.process' && transactionEvent?.transaction === 'supabase.db.rpc' ); }); const result = await fetch(`${baseURL}/api/queue/consumer-schema`); - const transactionEvent = await httpTransactionPromise; + const consumerEvent = await consumerSpanPromise; expect(result.status).toBe(200); expect(await result.json()).toEqual( - expect.objectContaining({ data: [expect.objectContaining({ message: { title: 'Test Todo' }, msg_id: 1 })] }), + expect.objectContaining({ + data: [ + expect.objectContaining({ + message: { + title: 'Test Todo', + }, + msg_id: expect.any(Number), + }), + ], + }), ); - expect(transactionEvent.spans).toHaveLength(2); - expect(transactionEvent.spans).toContainEqual({ + expect(consumerEvent.contexts.trace).toEqual({ data: { 'messaging.destination.name': 'todos', 'messaging.system': 'supabase', 'messaging.message.id': '1', + 'messaging.message.receive.latency': expect.any(Number), 'sentry.op': 'queue.process', 'sentry.origin': 'auto.db.supabase', + 'sentry.source': 'route', }, - description: 'supabase.db.rpc', op: 'queue.process', origin: 'auto.db.supabase', - parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), - span_id: expect.stringMatching(/[a-f0-9]{16}/), - start_timestamp: expect.any(Number), + parent_span_id: expect.any(String), + span_id: expect.any(String), status: 'ok', - timestamp: expect.any(Number), - trace_id: expect.stringMatching(/[a-f0-9]{32}/), + trace_id: expect.any(String), }); - expect(transactionEvent.breadcrumbs).toContainEqual({ + expect(consumerEvent.breadcrumbs).toContainEqual({ timestamp: expect.any(Number), type: 'supabase', category: 'db.rpc.pop', @@ -353,42 +359,47 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas }); test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => { - const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { + const consumerSpanPromise = waitForTransaction('supabase-nextjs', transactionEvent => { return ( - transactionEvent?.contexts?.trace?.op === 'http.server' && - transactionEvent?.transaction === 'GET /api/queue/consumer-rpc' + transactionEvent?.contexts?.trace?.op === 'queue.process' && transactionEvent?.transaction === 'supabase.db.rpc' ); }); const result = await fetch(`${baseURL}/api/queue/consumer-rpc`); - const transactionEvent = await httpTransactionPromise; + const consumerEvent = await consumerSpanPromise; expect(result.status).toBe(200); expect(await result.json()).toEqual( - expect.objectContaining({ data: [expect.objectContaining({ message: { title: 'Test Todo' }, msg_id: 2 })] }), + expect.objectContaining({ + data: [ + expect.objectContaining({ + message: { + title: 'Test Todo', + }, + msg_id: expect.any(Number), + }), + ], + }), ); - - expect(transactionEvent.spans).toHaveLength(2); - expect(transactionEvent.spans).toContainEqual({ + expect(consumerEvent.contexts.trace).toEqual({ data: { 'messaging.destination.name': 'todos', 'messaging.system': 'supabase', 'messaging.message.id': '2', + 'messaging.message.receive.latency': expect.any(Number), 'sentry.op': 'queue.process', 'sentry.origin': 'auto.db.supabase', + 'sentry.source': 'route', }, - description: 'supabase.db.rpc', op: 'queue.process', origin: 'auto.db.supabase', - parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), - span_id: expect.stringMatching(/[a-f0-9]{16}/), - start_timestamp: expect.any(Number), + parent_span_id: expect.any(String), + span_id: expect.any(String), status: 'ok', - timestamp: expect.any(Number), - trace_id: expect.stringMatching(/[a-f0-9]{32}/), + trace_id: expect.any(String), }); - expect(transactionEvent.breadcrumbs).toContainEqual({ + expect(consumerEvent.breadcrumbs).toContainEqual({ timestamp: expect.any(Number), type: 'supabase', category: 'db.rpc.pop', @@ -505,41 +516,3 @@ test('Sends queue batch publish spans with `rpc(...)`', async ({ page, baseURL } }, }); }); - -test('Sends `read` queue operation spans with `rpc(...)`', async ({ page, baseURL }) => { - const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => { - return ( - transactionEvent?.contexts?.trace?.op === 'http.server' && transactionEvent?.transaction === 'GET /api/queue/receiver-rpc' - ); - }); - const result = await fetch(`${baseURL}/api/queue/receiver-rpc`); - const transactionEvent = await httpTransactionPromise; - - expect(result.status).toBe(200); - expect(await result.json()).toEqual( - expect.objectContaining({ data: [ - expect.objectContaining({ message: { title: 'Test Todo 1' }, msg_id: 3 }), - expect.objectContaining({ message: { title: 'Test Todo 2' }, msg_id: 4 }), - ] }), - ); - - expect(transactionEvent.spans).toHaveLength(2); - expect(transactionEvent.spans).toContainEqual({ - data: { - 'messaging.destination.name': 'todos', - 'messaging.system': 'supabase', - 'messaging.message.id': '3,4', - 'sentry.op': 'queue.receive', - 'sentry.origin': 'auto.db.supabase', - }, - description: 'supabase.db.rpc', - op: 'queue.receive', - origin: 'auto.db.supabase', - parent_span_id: expect.stringMatching(/[a-f0-9]{16}/), - span_id: expect.stringMatching(/[a-f0-9]{16}/), - start_timestamp: expect.any(Number), - status: 'ok', - timestamp: expect.any(Number), - trace_id: expect.stringMatching(/[a-f0-9]{32}/), - }); -}); diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index a14dd9d570ea..b04e860e32ab 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -7,7 +7,7 @@ import { DEBUG_BUILD } from '../debug-build'; import { captureException } from '../exports'; import { defineIntegration } from '../integration'; import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../semanticAttributes'; -import { setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing'; +import { continueTrace, setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startSpan } from '../tracing'; import type { IntegrationFn } from '../types-hoist/integration'; import { isPlainObject } from '../utils/is'; import { logger } from '../utils/logger'; @@ -100,12 +100,16 @@ export interface PostgRESTFilterBuilder { export interface SupabaseResponse { status?: number; - data?: Array< - | number - | { - msg_id?: number; - } - >; + data?: Array<{ + msg_id?: number; + enqueued_at?: string; + message?: { + _sentry?: { + sentry_trace?: string; + baggage?: string; + }; + }; + }>; error?: { message: string; code?: string; @@ -238,7 +242,21 @@ function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { (supabaseInstance as unknown as SupabaseClientInstance).rpc, { apply(target, thisArg, argumentsList) { - return instrumentRpcImpl(target, thisArg, argumentsList); + const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; + const isConsumerSpan = argumentsList[0] === 'pop'; + + if (!isProducerSpan && !isConsumerSpan) { + return Reflect.apply(target, thisArg, argumentsList); + } + + if (isProducerSpan) { + return instrumentRpcProducer(target, thisArg, argumentsList); + } else if (isConsumerSpan) { + return instrumentRpcConsumer(target, thisArg, argumentsList); + } + + // If the operation is not a queue operation, return the original function + return Reflect.apply(target, thisArg, argumentsList); }, }, ); @@ -251,50 +269,58 @@ function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); } -const instrumentRpcImpl = (target: any, thisArg: any, argumentsList: any[]): Promise => { - const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; - const isConsumerSpan = argumentsList[0] === 'pop'; - const isReceiverSpan = argumentsList[0] === 'read'; - - if (!isProducerSpan && !isConsumerSpan && !isReceiverSpan) { - return Reflect.apply(target, thisArg, argumentsList); +function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { + sentryTrace?: string; + baggage?: string; +} { + if (message?._sentry) { + return { + sentryTrace: message._sentry.sentry_trace, + baggage: message._sentry.baggage, + }; } + return {}; +} - const maybeQueueParams = argumentsList[1]; +const instrumentRpcConsumer = (target: any, thisArg: any, argumentsList: any[]): Promise => { + const [operationName, queueParams] = argumentsList as [ + 'pop', + { + queue_name?: string; + }, + ]; - // If the second argument is not an object, it's not a queue operation - if (!isPlainObject(maybeQueueParams)) { - return Reflect.apply(target, thisArg, argumentsList); + const isConsumerSpan = operationName === 'pop'; + const queueName = queueParams?.queue_name; + + if (!isConsumerSpan) { + return Reflect.apply(target, thisArg, argumentsList); // Not a consumer operation } - const queueName = maybeQueueParams?.queue_name as string; + return (Reflect.apply(target, thisArg, argumentsList) as Promise).then((res: SupabaseResponse) => { + const latency = res.data?.[0]?.enqueued_at ? Date.now() - Date.parse(res.data?.[0]?.enqueued_at) : undefined; - const op = isProducerSpan - ? 'queue.publish' - : isConsumerSpan - ? 'queue.process' - : isReceiverSpan - ? 'queue.receive' - : ''; + const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {}); - // If the operation is not a queue operation, return the original function - if (!op) { - return Reflect.apply(target, thisArg, argumentsList); - } + // Remove Sentry metadata from the returned message + delete res.data?.[0]?.message?._sentry; - return startSpan( - { - name: 'supabase.db.rpc', - attributes: { - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', - [SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, - 'messaging.system': 'supabase', + return continueTrace( + { + sentryTrace, + baggage, }, - }, - async span => { - try { - return (Reflect.apply(target, thisArg, argumentsList) as Promise).then( - (res: SupabaseResponse) => { + () => { + return startSpan( + { + name: 'supabase.db.rpc', + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process', + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', + 'messaging.system': 'supabase', + }, + }, + span => { const messageId = res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined; @@ -306,6 +332,10 @@ const instrumentRpcImpl = (target: any, thisArg: any, argumentsList: any[]): Pro span.setAttribute('messaging.destination.name', queueName); } + if (latency) { + span.setAttribute('messaging.message.receive.latency', latency); + } + const breadcrumb: SupabaseBreadcrumb = { type: 'supabase', category: `db.rpc.${argumentsList[0]}`, @@ -349,6 +379,8 @@ const instrumentRpcImpl = (target: any, thisArg: any, argumentsList: any[]): Pro }); span.setStatus({ code: SPAN_STATUS_ERROR }); + } else { + span.setStatus({ code: SPAN_STATUS_OK }); } span.end(); @@ -356,25 +388,146 @@ const instrumentRpcImpl = (target: any, thisArg: any, argumentsList: any[]): Pro return res; }, ); - } catch (err: unknown) { - span.setStatus({ code: SPAN_STATUS_ERROR }); - span.end(); - captureException(err, { - mechanism: { - handled: false, - }, + }, + ); + }); +}; + +function instrumentRpcProducer(target: any, thisArg: any, argumentsList: any[]): Promise { + const maybeQueueParams = argumentsList[1]; + + // If the second argument is not an object, it's not a queue operation + if (!isPlainObject(maybeQueueParams)) { + return Reflect.apply(target, thisArg, argumentsList); + } + + const queueName = maybeQueueParams?.queue_name as string; + + // If the queue name is not provided, return the original function + if (!queueName) { + return Reflect.apply(target, thisArg, argumentsList); + } + + return startSpan( + { + name: 'supabase.db.rpc', + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', + [SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.publish', + 'messaging.system': 'supabase', + }, + }, + span => { + const { 'sentry-trace': sentryTrace, baggage: sentryBaggage } = getTraceData(); + const [, sentryArgumentsQueueParams] = argumentsList as [ + 'send' | 'send_batch', + { + queue_name: string; + messages?: Array<{ _sentry?: { sentry_trace?: string; baggage?: string } }>; + message?: { _sentry?: { sentry_trace?: string; baggage?: string } }; + }, + ]; + + if (sentryArgumentsQueueParams?.message) { + sentryArgumentsQueueParams.message._sentry = { + sentry_trace: sentryTrace, + baggage: sentryBaggage, + }; + } else if (sentryArgumentsQueueParams?.messages) { + sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => { + message._sentry = { + sentry_trace: sentryTrace, + baggage: sentryBaggage, + }; + return message; }); } + + argumentsList[1] = sentryArgumentsQueueParams; + + return (Reflect.apply(target, thisArg, argumentsList) as Promise) + .then((res: SupabaseResponse) => { + const messageId = + res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined; + + if (messageId) { + span.setAttribute('messaging.message.id', messageId || ''); + } + + if (queueName) { + span.setAttribute('messaging.destination.name', queueName || ''); + } + + const breadcrumb: SupabaseBreadcrumb = { + type: 'supabase', + category: `db.rpc.${argumentsList[0]}`, + message: `rpc(${argumentsList[0]})`, + }; + const data: Record = {}; + if (messageId) { + data['messaging.message.id'] = messageId; + } + if (queueName) { + data['messaging.destination.name'] = queueName; + } + if (Object.keys(data).length) { + breadcrumb.data = data; + } + addBreadcrumb(breadcrumb); + if (res.error) { + const err = new Error(res.error.message) as SupabaseError; + if (res.error.code) { + err.code = res.error.code; + } + if (res.error.details) { + err.details = res.error.details; + } + captureException(err, { + contexts: { + supabase: { + queueName, + messageId, + }, + }, + }); + span.setStatus({ code: SPAN_STATUS_ERROR }); + } else { + span.setStatus({ code: SPAN_STATUS_OK }); + } + span.end(); + + return res; + }) + .catch((err: unknown) => { + span.setStatus({ code: SPAN_STATUS_ERROR }); + span.end(); + captureException(err, { + mechanism: { + handled: false, + }, + }); + throw err; + }); }, ); -}; +} function instrumentRpc(SupabaseClient: unknown): void { (SupabaseClient as unknown as SupabaseClientInstance).rpc = new Proxy( (SupabaseClient as unknown as SupabaseClientInstance).rpc, { apply(target, thisArg, argumentsList) { - return instrumentRpcImpl(target, thisArg, argumentsList); + let result: Promise; + + if (argumentsList[0] === 'send' || argumentsList[0] === 'send_batch') { + result = instrumentRpcProducer(target, thisArg, argumentsList); + } else if (argumentsList[0] === 'pop') { + result = instrumentRpcConsumer(target, thisArg, argumentsList); + } else { + result = Reflect.apply(target, thisArg, argumentsList) as Promise; + } + + return result; }, }, ); From 54be67261cfc0ab4a69ec58c8e067e1ae8bdcede Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Fri, 20 Jun 2025 21:25:43 +0100 Subject: [PATCH 09/11] Add missing import --- packages/core/src/integrations/supabase.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index b04e860e32ab..e0c61b1a9270 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -11,6 +11,7 @@ import { continueTrace, setHttpStatus, SPAN_STATUS_ERROR, SPAN_STATUS_OK, startS import type { IntegrationFn } from '../types-hoist/integration'; import { isPlainObject } from '../utils/is'; import { logger } from '../utils/logger'; +import { getTraceData } from '../utils/traceData'; export interface SupabaseClientConstructor { prototype: { From 5e6db7374b05bdc69b59fd3719b2a841d8b31ca3 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Fri, 27 Jun 2025 14:08:20 +0100 Subject: [PATCH 10/11] Rename `SupabaseClientConstructor` to `SupabaseClientConstructorType` --- packages/core/src/integrations/supabase.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index e0c61b1a9270..4ae2245fb9eb 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -13,7 +13,7 @@ import { isPlainObject } from '../utils/is'; import { logger } from '../utils/logger'; import { getTraceData } from '../utils/traceData'; -export interface SupabaseClientConstructor { +export interface SupabaseClientConstructorType { prototype: { from: (table: string) => PostgRESTQueryBuilder; schema: (schema: string) => { rpc: (...args: unknown[]) => Promise }; @@ -229,17 +229,17 @@ export function translateFiltersIntoMethods(key: string, query: string): string } function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { - if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { + if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema)) { return; } - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, + (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema, { apply(target, thisArg, argumentsList) { const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); - (supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( + (supabaseInstance as unknown as SupabaseClientConstructorType).rpc = new Proxy( (supabaseInstance as unknown as SupabaseClientInstance).rpc, { apply(target, thisArg, argumentsList) { @@ -267,7 +267,7 @@ function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { }, ); - markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); + markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema); } function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { @@ -619,12 +619,12 @@ function instrumentSupabaseAuthClient(supabaseClientInstance: SupabaseClientInst } function instrumentSupabaseClientConstructor(SupabaseClient: unknown): void { - if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.from)) { + if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.from)) { return; } - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.from = new Proxy( - (SupabaseClient as unknown as SupabaseClientConstructor).prototype.from, + (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.from = new Proxy( + (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.from, { apply(target, thisArg, argumentsList) { const rv = Reflect.apply(target, thisArg, argumentsList); @@ -637,7 +637,7 @@ function instrumentSupabaseClientConstructor(SupabaseClient: unknown): void { }, ); - markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.from); + markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.from); } function instrumentPostgRESTFilterBuilder(PostgRESTFilterBuilder: PostgRESTFilterBuilder['constructor']): void { From 1c518a2728c23c3b8a71c323e4c2690f45a7fc22 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Fri, 27 Jun 2025 14:30:09 +0100 Subject: [PATCH 11/11] Extract `instrumentRpcMethod` --- packages/core/src/integrations/supabase.ts | 45 +++++++++------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/packages/core/src/integrations/supabase.ts b/packages/core/src/integrations/supabase.ts index 4ae2245fb9eb..a2ed866e8430 100644 --- a/packages/core/src/integrations/supabase.ts +++ b/packages/core/src/integrations/supabase.ts @@ -232,44 +232,37 @@ function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema)) { return; } - (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema = new Proxy( (SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema, { apply(target, thisArg, argumentsList) { const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); - - (supabaseInstance as unknown as SupabaseClientConstructorType).rpc = new Proxy( - (supabaseInstance as unknown as SupabaseClientInstance).rpc, - { - apply(target, thisArg, argumentsList) { - const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; - const isConsumerSpan = argumentsList[0] === 'pop'; - - if (!isProducerSpan && !isConsumerSpan) { - return Reflect.apply(target, thisArg, argumentsList); - } - - if (isProducerSpan) { - return instrumentRpcProducer(target, thisArg, argumentsList); - } else if (isConsumerSpan) { - return instrumentRpcConsumer(target, thisArg, argumentsList); - } - - // If the operation is not a queue operation, return the original function - return Reflect.apply(target, thisArg, argumentsList); - }, - }, - ); - + instrumentRpcMethod(supabaseInstance as unknown as SupabaseClientConstructorType); return supabaseInstance; }, }, ); - markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructorType).prototype.schema); } +function instrumentRpcMethod(supabaseInstance: SupabaseClientConstructorType): void { + supabaseInstance.rpc = new Proxy((supabaseInstance as unknown as SupabaseClientInstance).rpc, { + apply(target, thisArg, argumentsList) { + const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; + const isConsumerSpan = argumentsList[0] === 'pop'; + if (!isProducerSpan && !isConsumerSpan) { + return Reflect.apply(target, thisArg, argumentsList); + } + if (isProducerSpan) { + return instrumentRpcProducer(target, thisArg, argumentsList); + } else if (isConsumerSpan) { + return instrumentRpcConsumer(target, thisArg, argumentsList); + } + return Reflect.apply(target, thisArg, argumentsList); + }, + }); +} + function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { sentryTrace?: string; baggage?: string;