Skip to content

Commit 9f219a4

Browse files
authored
Add context object to handlers for platform resource access (#185)
* feat: add context utilities, memoization, and supabase client factories - Introduced createQueueWorkerContext and createFlowWorkerContext for worker contexts - Added core/memoize.ts for simple function memoization - Implemented supabase-factories.ts for lazy-loaded supabase clients - Enhanced supabase-utils.ts with memoized singleton clients and reset function - Updated handler-types.ts for backward-compatible step handler signatures - Expanded platform adapters with environment variable access methods - Added comprehensive unit tests for supabase-utils functions - Exported context types and utility functions for better modularity and testing * refactor: update EdgeWorker start method to return void and encapsulate connection string access - Changed EdgeWorker.start() to return Promise<void> instead of Promise<PlatformAdapter> - Removed returning platform from start methods to improve encapsulation - Updated internal calls to startQueueWorker and startFlowWorker accordingly - Modified platform DenoAdapter to use getter for connectionString instead of method - Updated platform types to reflect connectionString as a getter - Overall, improved API encapsulation and consistency in platform configuration access * chore: update dependencies, enhance context handling, and extend environment support - Bumped @supabase/supabase-js to v2.50.3 and added related dependencies - Updated deno.lock and test JSON to include latest @supabase dependencies - Added env property to createFlowWorker configuration for environment variables - Extended EdgeWorker, StepTaskExecutor, MessageExecutor, and createQueueWorker to pass env and sql - Modified MessageHandlerFn to support both legacy and new handler signatures with context - Improved queue and flow worker creation functions to accept optional env and sql parameters - Enhanced queue message processing to handle context-aware handlers and environment variables * test: add comprehensive tests for message and step task executor contexts - Introduced integration tests for message executor context handling, including payload and raw message verification - Added tests for backward compatibility with single-argument handlers in message and step executors - Included tests for context properties such as environment variables, SQL, abort signals, and Supabase clients - Created unit tests for context utility functions to ensure correct creation and memoization of clients - Cover scenarios with environment variables missing and ensure rawMessage is handled appropriately in different contexts * refactor: enhance context creation and handler invocation across worker modules - Updated StepDefinition handler to accept context parameter - Modified Flow class to pass context to step handlers - Changed MessageExecutor to always create and pass context to handlers - Added platform-resources interface for platform-specific dependencies - Updated queue message handler type to always include context parameter - Removed redundant context creation functions for flow workers - Improved code consistency and future-proofing for context handling in workers * feat: Add utility type AllStepInputs and enhance context types for flow workers - Introduced AllStepInputs to extract all step input types from a flow - Updated context interfaces to include rawMessage as PgmqMessageRecord<AllStepInputs<TFlow>> - Refined StepTaskWithMessage to pair messages with step tasks - Modified poller to return message-task pairs, avoiding race conditions - Added createSql factory for PostgreSQL client with dedicated connection pool - Enhanced createContext to lazy-load Supabase clients with environment validation - Updated handler types to support optional context parameter for backward compatibility - Added Supabase platform adapter and related types for better platform abstraction - Included new sql-factory.ts for SQL client creation and Supabase platform integration - Overall improvements facilitate more robust flow handling and platform extensibility * refactor: replace platform's abort signals with centralized shutdown signal and update context handling - Replaced individual AbortController signals with platform's shutdownSignal for graceful shutdown - Updated createFlowWorker to create context at factory level using platformAdapter.createStepTaskContext - Modified createQueueWorker and MessageExecutor to use platform's context creation - Removed unnecessary platform adapter imports and references in EdgeWorker and related files - Ensured consistent use of context objects for step tasks and message processing across modules - Minor type adjustments for clarity and alignment with new context handling approach * refactor(platform): replace DenoAdapter with SupabasePlatformAdapter and update worker setup - Replaced DenoAdapter with SupabasePlatformAdapter for enhanced platform-specific functionality - Added methods to spawn new edge functions and extend their lifetime using EdgeRuntime - Updated startWorker to manage resources and handle shutdown signals properly - Modified context creation to include SQL and Supabase clients for message handling - Export index now consolidates platform adapters, removing legacy DenoAdapter references - Overall improvements enable better resource management and platform integration in edge environment * refactor: improve type safety and code clarity in executor classes - Introduced generic TContext with default types for StepTaskExecutor and MessageExecutor - Added convenience getters for context properties to avoid deep property access - Updated references to context properties to use getters for better readability - Minor adjustments to logging messages to use consistent identifiers - Enhanced type annotations for context parameters to improve type safety and maintainability * refactor: update import types and add context creation utilities for queue and flow workers - Removed unused AllStepInputs import from PgflowSqlClient.ts - Added new context utility functions for queue workers, flow workers, and step tasks - Implemented simplified context creation functions with environment, SQL, and Supabase client setup - Improved modularity and testability of worker context initialization * refactor: update Supabase client factory imports and memoization, improve context types and tests - Replaced deprecated createAnonSupabaseClient and createServiceSupabaseClient with get* variants - Removed unused supabase-factories.ts file - Updated platform adapter to use new client getters with memoization - Enhanced context interfaces with more precise typing and backward compatibility - Fixed test assertions to match new context properties and message handling - Cleaned up test files, including removing obsolete supabase factory tests - Overall improvements streamline client creation, context handling, and testing consistency * refactor: enhance type safety and context handling in queue worker modules - Updated QueueWorkerContextParams to default TPayload to Json with explicit typing - Modified MessageExecutor class to accept more precise context type with TResources - Improved createQueueWorker to build context with platform resources and correct typing - Changed import of MessageHandlerContext to use the unified type across modules - Refined MessageHandlerFn to support generic context parameter for flexibility * refactor: update supabase client creation functions and optimize resource initialization - Renamed getAnonSupabaseClient and getServiceSupabaseClient to createAnonSupabaseClient and createServiceSupabaseClient for clarity - Changed functions to throw errors if required environment variables are missing, removing memoization - Updated platform adapter to initialize and expose platform resources once, improving efficiency - Modified tests to reflect new behavior, ensuring proper error handling and instance creation - Overall improvements enhance code clarity, error handling, and resource management in Supabase integrations * chore: improve environment validation and refactor environment variable handling - Add env-validation.ts for centralized validation of required environment variables - Update platform classes to use validated environment data, ensuring consistency - Enhance createSql to accept environment object directly, removing dependency on process env - Improve type safety and robustness of Supabase client creation functions - Update tests to reflect validation logic and ensure environment variables are correctly handled - Overall, strengthen environment management and reduce runtime errors related to missing configs * feat: add environment validation and update SupabasePlatformAdapter - Introduced runtime environment validation for required Supabase variables - Added assertSupabaseEnv function to ensure all necessary env vars are present - Updated SupabasePlatformAdapter to validate env variables at startup - Removed deprecated env-validation.ts file - Enhanced type safety for environment variables in platform adapter * refactor: improve context utility functions and test setup for message handlers - Updated context creation functions to handle missing environment variables gracefully - Added mock Supabase client creation for testing scenarios - Enhanced test helpers to generate full contexts for message and step task handlers - Changed test imports to use the new test context utilities - Improved type definitions for context objects to ensure better type safety and flexibility * refactor: update Flow type definitions for improved flexibility and context extraction - Changed EmptyFlow to use an empty object for deps in dsl.ts - Added more permissive type parameters to Flow class for input, context, steps, deps - Enhanced type extraction utilities for flow input, output, steps, deps, and context - Updated context type to be more minimal and backward compatible in context.ts - Improved type inference for handler context and step dependencies - Overall, these changes make flow typing more flexible and better suited for complex workflows * test: add comprehensive type inference tests for Flow context extraction and handling - Introduced new test files with detailed scenarios for context type inference - Verified default minimal context and explicit type annotations - Ensured accumulation of context properties across multiple steps - Tested explicit, inferred, and mixed context configurations - Included type assertions to validate correct inference and extraction - Covered custom context-only handlers and preservation of step type inference - Added type-level assertions for correctness of context extraction logic * refactor: update context inference and flow type handling for platform resources - Changed ExtractFlowContext to return base Context instead of empty object - Updated tests to reflect that context always includes base properties - Modified flow class to merge context with platform resources and explicit types - Added flow compatibility type guard to ensure context requirements are met - Enhanced type definitions for better resource and context management - Updated flow and worker start functions to accept compatible flows with platform resources - Introduced current platform resource types and compatibility checks - Adjusted default flow type to include base Context for consistency - Added new flowCompatibility and index type exports for improved type safety - Updated EdgeWorker to support flow compatibility with current platform resources - Added example and test files demonstrating type safety and resource constraints - Refactored type files to centralize resource and context logic for better maintainability * feat: add Supabase platform support with full context typing and resource integration - Introduced src/platforms/supabase.ts for Supabase-specific resources and context - Updated dsl.ts to export a specialized Flow class with Supabase resources - Modified context.ts to include Supabase resources and extend base context - Re-exported platform types in platforms/index.ts for platform implementations - Updated package.json to include @supabase/supabase-js dependency - Added comprehensive tests for context inference and flow resource access - Enhanced existing tests to verify full autocomplete and resource availability - Improved type safety for handlers accessing platform-specific resources - Refactored code to ensure consistent context merging and resource typing across flows - Marked as a feature addition to support seamless Supabase integration in flows * feat(dsl): add environment validation tests and enhance type safety for UserEnv and platform contexts - Introduced new test file to verify environment type validation system - Ensured Supabase flows get typed environment variables correctly - Verified UserEnv augmentation works as intended - Updated platform and context types to enforce environment constraints - Improved type safety and validation for environment variables across platform implementations * refactor: replace type assertions with void expressions in test files and add empty interface for user environment - Updated test files to use void expressions instead of type assertions for better type safety - Added empty interface UserEnv to allow user augmentation without errors - Ensured consistent typing practices across environment validation and platform resource tests - Minor code style adjustments for clarity and maintainability * refactor: update type definitions to use Record<string, unknown> for better type safety - Changed Context<T> to default to Record<string, never> for consistency - Updated Extra type parameter in Flow class to default to Record<string, never> - Modified Custom type parameter in flowCompatibility to default to Record<string, never> - Ensured all type aliases now use Record<string, unknown> instead of object or empty object types for improved type safety and clarity * fix: update type assertions and pubsub mock data in context inference tests Refactors test assertions for more precise type matching and corrects mock pubsub data to improve test accuracy. * refactor: improve type safety and update import statements across test and core files - Changed import syntax to use 'type' for better type-only imports - Updated context and resource mocks to use 'unknown' instead of 'any' for safety - Removed redundant imports and cleaned up import statements - Adjusted test helpers to handle type assertions more accurately - Minor formatting and consistency improvements throughout test and core modules * refactor: update test files to comment out unused imports and improve type safety - Commented out unused imports in supabase-test-utils.ts to clean up code - Modified test files to replace 'any' types with 'unknown' for better type safety - Commented out deprecated or unused handlers and variables to improve test clarity - Adjusted handler signatures to use more precise context types - Minor formatting and consistency improvements across test files * feat: add Supabase resource re-export and update test context typings - Introduced a new supabase.ts file to re-export platform-specific resources - Updated various test utility functions to cast environment properties as never - Modified lock files to update dependency versions and remove redundant entries - Added new mock message and step task structures for improved test coverage - Enhanced createSupabaseMessageContext and createSupabaseStepTaskContext functions for better type safety * refactor: improve environment type handling and fix env property typings across modules - Updated Env and UserEnv interfaces for better extensibility and type safety - Removed redundant Env type and ValidEnv utility from dsl.ts - Corrected env property typings in test context creation functions for consistency - Fixed env assignment in createQueueWorker to ensure proper typing - Adjusted rawMessage typings in test helpers for better type inference - Overall improvements enhance type safety and consistency in environment-related code * chore: update std/async dependency version and add its integrity hash - Added new dependency entry for @std/async@0.224.2 with integrity hash - Updated deno.lock to include the new version and its checksum - Minor version bump for std/async to ensure consistency in dependencies * chore: add createAdapter to tests and update context types for Supabase integration - Updated test helpers to include createAdapter for platform-specific setup - Modified context types to improve type safety and backward compatibility - Ensured all test files instantiate adapter for consistent environment setup - Refactored context creation functions for message and step task handlers - Minor adjustments to flow example to handle input types correctly * refactor: update context type imports and remove legacy alias for better type safety - Removed deprecated legacy context alias in context.ts for clarity - Updated test files to import Context type from @pgflow/dsl instead of local - Changed context variable types in tests to use specific context interfaces - Removed unused import of Sql type in test-context-utils.ts - Improved consistency and maintainability of context handling across tests * refactor: update type annotations and remove unnecessary underscores in test code - Changed context parameter types from Context to SupabaseStepTaskContext in multiple tests - Removed leading underscores from input parameters in several step functions for consistency - Simplified function signatures by removing optional chaining where context is guaranteed - Improved type clarity and code readability in integration test files * refactor(dsl): improve type handling and unwrapping of Promise return types - Added AwaitedReturn utility to unwrap Promise types while keeping plain values - Updated Flow class to use AwaitedReturn for handler return types - Simplified type definitions for step handlers and dependencies - Minor adjustments to type assertions for better type inference and safety * docs: add development guidelines and update instructions in CLAUDE.md - Added a new section with development guidelines - Included instruction to run 'nx' as 'pnpm nx' in the documentation * test: add comprehensive type tests for Supabase Flow integration and context composition - Introduced detailed type assertions for default and custom resources in Supabase flows - Verified environment variable typings and resource structure correctness - Ensured explicit and mixed context types are properly inferred and combined - Added tests for step input inference, context accumulation, and context-specific resource isolation - Confirmed the structure of SupabaseResources and SupabasePlatformContext interfaces - Included flow behavior tests to validate type safety across multiple steps and dependencies - Overall enhances type safety and correctness assurance for Supabase flow implementations * refactor(tests): replace test context creation with proper queue and flow worker context functions - Updated message executor context test to use createQueueWorkerContext - Changed step task executor test to utilize createFlowWorkerContext - Removed unused imports related to test context utils - Improved consistency and clarity in test setup for worker contexts * test: add comprehensive integration tests for message handling, context propagation, and environment variable access in flow and worker contexts * chore: add global EdgeRuntime declaration, enhance type safety, and update test utils - Declare global EdgeRuntime for Supabase Edge Functions compatibility - Extend SupabaseEnv interface with environment variables - Update createAdapter to return PlatformAdapter with correct resources - Refactor startWorker to use test-specific platform adapter - Modify test helpers and mocks for consistent environment setup - Add new integration test for verifying context resources in EdgeWorker * fix: update type annotations and environment typings in test utils and tests - Corrected environment type annotations to use SupabaseEnv across multiple files - Updated context type in message executor tests for better type inference - Ensured consistent usage of environment types in test setup and context creation - Minor adjustments to mock message definitions for clarity and correctness * refactor: replace direct createAdapter imports with test-specific createTestPlatformAdapter - Updated multiple test files to import createTestPlatformAdapter from helpers - Ensured consistent use of test adapter across integration tests for platform interactions - Simplifies test setup and improves test isolation by using dedicated test adapter * feat(Documentation): Add comprehensive guide on context object and its usage in pgflow - Introduces new documentation page explaining the context object and its core principles - Details available resources, including core and Supabase-specific ones - Provides usage examples for database operations, environment access, Supabase integration, and graceful shutdown - Highlights type safety benefits with code examples for flow and queue handlers - Outlines migration steps from manual connection management to context-based approach - Discusses future extensibility with custom resources and plugin system - Enhances overall developer guidance on leveraging context for robust handler development * docs: update context documentation with resource details and usage examples Expand the documentation to include core and platform-specific resources such as env, shutdownSignal, rawMessage, stepTask, and Supabase clients. Clarify their types, availability, and usage with code examples. Also, improve flow handler examples demonstrating context parameter usage and resource access, emphasizing backward compatibility and platform adapter centralization. * docs: add note about future extensibility of context system and remove example usage from documentation - Added a note indicating that the context system will support custom resources in future versions - Removed extensive usage examples from the documentation to streamline the content - Updated the documentation to focus on current features and future plans for context extensibility * refactor: improve database save function and update flow to inject dependencies - Removed redundant singleton pattern for Supabase client in saveWebsite - Added explicit supabase parameter to saveWebsite for better dependency injection - Updated analyze_website flow to pass serviceSupabase to saveWebsite - Enhanced code clarity and testability by decoupling database client instantiation * docs: update README files with detailed context resource descriptions and usage examples - Expanded documentation on context object resources for both pgflow DSL and edge-worker - Added code snippets illustrating platform resource access and flow configuration - Clarified environment variables and platform-specific resource availability - Improved clarity on optional context parameter and resource types in handlers - Ensured comprehensive guidance for users integrating platform resources into flows and workers * feat: add context parameter to handlers for platform resources and update documentation - Introduce optional context parameter for queue and flow handlers to access platform resources - Update usage examples demonstrating new context parameter with platform services - Maintain backward compatibility for existing handlers - Add details about core and Supabase-specific resources available in context * refactor: simplify type imports and improve type safety across modules - Removed redundant or unused type imports to streamline code - Updated waitUntil declaration to use unknown for better type safety - Changed context variables to unknown for consistency and clarity - Minor formatting adjustments for consistency and readability - No functional changes, focusing on code cleanup and type correctness * fix: add environment variable validation in test helpers and correct supabase client initialization - Validate presence of SUPABASE_URL, SUPABASE_ANON_KEY, and SUPABASE_SERVICE_ROLE_KEY in test helper functions - Throw errors if required environment variables are missing - Corrected the creation of supabase clients to use validated variables - Improved robustness of test setup code to prevent runtime errors due to missing env vars * chore: update import paths to TypeScript extensions and improve test context typings Refactors import statements to use .ts extensions for consistency and correctness. Enhances test code by typing received context as SupabasePlatformContext. Adds missing type annotations for context variables in integration tests. Ensures compatibility with TypeScript module resolution and improves code clarity. * refactor: update import types and remove unused imports in test files - Removed unused imports from platform/types.ts in _helpers.ts - Simplified import statements in stepTaskExecutorContext.test.ts by removing unused type imports - Minor cleanup to improve code clarity and maintainability in test setup files * chore: update dependencies and fix import paths in createQueueWorker.ts - Added @supabase/supabase-js to package dependencies and lock files - Updated package.json to include the new dependency - Corrected import statements in createQueueWorker.ts to use .js extensions - Fixed retry strategy configuration comment for clarity
1 parent 1f74f6d commit 9f219a4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+3766
-256
lines changed

.changeset/clean-zoos-smoke.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@pgflow/edge-worker': patch
3+
---
4+
5+
Remove PlatformAdapter from EdgeWorker.start() return type
6+
7+
The `EdgeWorker.start()` method now returns `Promise<void>` instead of `Promise<PlatformAdapter>`. The PlatformAdapter was never intended to be part of the public API and was accidentally exposed. This change properly encapsulates the internal implementation details.
8+
9+
**Breaking Change**: If you were using the returned PlatformAdapter (which was undocumented), you'll need to refactor your code. However, this is unlikely as the EdgeWorker is designed to be started and run without further interaction.

.changeset/little-seas-lick.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
---
2+
'@pgflow/edge-worker': patch
3+
'@pgflow/dsl': patch
4+
---
5+
6+
Add context object as second parameter to handlers
7+
8+
Queue and flow handlers now receive an optional context parameter that provides platform resources like database connections, environment variables, and Supabase clients - eliminating boilerplate and connection management.
9+
10+
```typescript
11+
// Queue handler
12+
EdgeWorker.start(async (payload, context) => {
13+
await context.sql`INSERT INTO tasks (data) VALUES (${payload})`;
14+
});
15+
16+
// Flow step handler
17+
.step({ slug: 'process' }, async (input, context) => {
18+
const result = await context.serviceSupabase.from('users').select();
19+
})
20+
```
21+
22+
**Core resources** (always available):
23+
24+
- `context.env` - Environment variables
25+
- `context.shutdownSignal` - AbortSignal for graceful shutdown
26+
- `context.rawMessage` - Original pgmq message with metadata
27+
- `context.stepTask` - Current step task details (flow handlers only)
28+
29+
**Supabase platform resources**:
30+
31+
- `context.sql` - PostgreSQL client (postgres.js)
32+
- `context.anonSupabase` - Supabase client with anonymous key
33+
- `context.serviceSupabase` - Supabase client with service role key
34+
35+
To use Supabase resources in flows, import from the Supabase preset:
36+
37+
```typescript
38+
import { Flow } from '@pgflow/dsl/supabase';
39+
```
40+
41+
The context parameter is optional for backward compatibility - existing single-parameter handlers continue to work unchanged.

CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
1616
@.claude/testing_guidelines.md
1717
@.claude/mcp_crawler.md
1818

19+
## Development Guidelines
20+
21+
- Always run 'nx' as 'pnpm nx'

examples/playground/supabase/functions/_flows/analyze_website.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ export default new Flow<Input>({
3030
const { keywords } = await extractTags(input.website.content);
3131
return keywords;
3232
})
33-
.step({ slug: 'saveToDb', dependsOn: ['summary', 'tags'] }, async (input) => {
34-
const websiteData = {
35-
user_id: input.run.user_id,
36-
website_url: input.run.url,
37-
summary: input.summary,
38-
tags: input.tags,
39-
};
40-
const { website } = await saveWebsite(websiteData);
33+
.step(
34+
{ slug: 'saveToDb', dependsOn: ['summary', 'tags'] },
35+
async (input, { serviceSupabase }) => {
36+
const websiteData = {
37+
user_id: input.run.user_id,
38+
website_url: input.run.url,
39+
summary: input.summary,
40+
tags: input.tags,
41+
};
42+
const { website } = await saveWebsite(websiteData, serviceSupabase);
4143

42-
return website;
43-
});
44+
return website;
45+
},
46+
);

examples/playground/supabase/functions/_tasks/saveWebsite.ts

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,18 @@
1-
import { createClient, type SupabaseClient } from '@supabase/supabase-js';
1+
import type { SupabaseClient } from '@supabase/supabase-js';
22
import type { Database } from '../database-types.d.ts';
33

4-
let _supabase: SupabaseClient<Database> | undefined;
5-
6-
function getSupabase(): SupabaseClient<Database> {
7-
if (!_supabase) {
8-
const SUPABASE_URL = Deno.env.get('SUPABASE_URL')!;
9-
const SUPABASE_SERVICE_ROLE_KEY = Deno.env.get(
10-
'SUPABASE_SERVICE_ROLE_KEY',
11-
)!;
12-
_supabase = createClient<Database, 'public'>(
13-
SUPABASE_URL,
14-
SUPABASE_SERVICE_ROLE_KEY,
15-
);
16-
}
17-
18-
return _supabase;
19-
}
20-
21-
export default async (websiteData: {
4+
interface WebsiteData {
225
user_id: string;
236
website_url: string;
247
summary: string;
258
tags: string[];
26-
}) => {
27-
const { data } = await getSupabase()
9+
}
10+
11+
export default async (
12+
websiteData: WebsiteData,
13+
supabase: SupabaseClient<Database>,
14+
) => {
15+
const { data } = await context.supabase
2816
.schema('public')
2917
.from('websites')
3018
.insert([websiteData])

pkgs/dsl/README.md

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Key features:
1313

1414
- **Type Safety** - Complete TypeScript type checking from flow inputs to outputs
1515
- **Fluent Interface** - Chainable method calls for defining steps and dependencies
16-
- **Functional Approach** - Clean separation between task implementation and flow orchestration
16+
- **Functional Approach** - Clean separation between task implementation and flow orchestration
1717
- **JSON-Compatible** - All inputs and outputs are JSON-serializable for database storage
1818
- **Immutable Flow Definitions** - Each step operation returns a new Flow instance
1919

@@ -68,21 +68,101 @@ In pgflow, each step receives an `input` object that contains:
6868
2. **`input.{stepName}`** - Outputs from dependency steps
6969

7070
This design ensures:
71+
7172
- Original flow parameters are accessible throughout the entire flow
7273
- Data doesn't need to be manually forwarded through intermediate steps
7374
- Steps can combine original input with processed data from previous steps
7475

76+
### Context Object
77+
78+
Step handlers can optionally receive a second parameter - the **context object** - which provides access to platform resources and runtime information.
79+
80+
```typescript
81+
.step(
82+
{ slug: 'saveToDb' },
83+
async (input, context) => {
84+
// Access platform resources through context
85+
const result = await context.sql`SELECT * FROM users WHERE id = ${input.userId}`;
86+
return result[0];
87+
}
88+
)
89+
```
90+
91+
#### Core Context Resources
92+
93+
All platforms provide these core resources:
94+
95+
- **`context.env`** - Environment variables (`Record<string, string | undefined>`)
96+
- **`context.shutdownSignal`** - AbortSignal for graceful shutdown handling
97+
- **`context.rawMessage`** - Original pgmq message with metadata
98+
```typescript
99+
interface PgmqMessageRecord<T> {
100+
msg_id: number;
101+
read_ct: number;
102+
enqueued_at: Date;
103+
vt: Date;
104+
message: T; // <-- this is your 'input'
105+
}
106+
```
107+
- **`context.stepTask`** - Current step task details (flow handlers only)
108+
```typescript
109+
interface StepTaskRecord<TFlow> {
110+
flow_slug: string;
111+
run_id: string;
112+
step_slug: string;
113+
input: StepInput<TFlow, StepSlug>; // <-- this is handler 'input'
114+
msg_id: number;
115+
}
116+
```
117+
118+
#### Supabase Platform Resources
119+
120+
When using the Supabase platform with EdgeWorker, additional resources are available:
121+
122+
- **`context.sql`** - PostgreSQL client (postgres.js)
123+
- **`context.anonSupabase`** - Supabase client with anonymous key
124+
- **`context.serviceSupabase`** - Supabase client with service role key
125+
126+
To use Supabase resources, import the `Flow` class from the Supabase preset:
127+
128+
```typescript
129+
import { Flow } from '@pgflow/dsl/supabase';
130+
131+
const MyFlow = new Flow<{ userId: string }>({
132+
slug: 'my_flow',
133+
}).step({ slug: 'process' }, async (input, context) => {
134+
// TypeScript knows context includes Supabase resources
135+
const { data } = await context.serviceSupabase
136+
.from('users')
137+
.select('*')
138+
.eq('id', input.userId);
139+
140+
// Use SQL directly
141+
const stats = await context.sql`
142+
SELECT COUNT(*) as total FROM events
143+
WHERE user_id = ${input.userId}
144+
`;
145+
146+
return { user: data[0], eventCount: stats[0].total };
147+
});
148+
```
149+
150+
> [!NOTE]
151+
> Context is optional - handlers that don't need platform resources can omit the second parameter for backward compatibility.
152+
153+
For more details on available resources and platform configuration, see the [@pgflow/edge-worker documentation](https://github.com/pgflow-org/pgflow/tree/main/pkgs/edge-worker#context-resources).
154+
75155
### Flow Configuration
76156

77157
Configure flows and steps with runtime options:
78158

79159
```typescript
80160
new Flow<Input>({
81-
slug: 'my_flow', // Required: Unique flow identifier
82-
maxAttempts: 3, // Optional: Maximum retry attempts (default: 1)
83-
baseDelay: 5, // Optional: Base delay in seconds for retries (default: 1)
84-
timeout: 10, // Optional: Task timeout in seconds (default: 30)
85-
})
161+
slug: 'my_flow', // Required: Unique flow identifier
162+
maxAttempts: 3, // Optional: Maximum retry attempts (default: 1)
163+
baseDelay: 5, // Optional: Base delay in seconds for retries (default: 1)
164+
timeout: 10, // Optional: Task timeout in seconds (default: 30)
165+
});
86166
```
87167

88168
## Compiling Flows
@@ -120,4 +200,5 @@ Run `nx test dsl` to execute the unit tests via [Vitest](https://vitest.dev/).
120200
## Documentation
121201

122202
For detailed documentation on the Flow DSL, visit:
123-
- [Understanding the Flow DSL](https://pgflow.dev/explanations/flow-dsl/)
203+
204+
- [Understanding the Flow DSL](https://pgflow.dev/explanations/flow-dsl/)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { describe, it, expectTypeOf } from 'vitest';
2+
import { Flow } from '../src/platforms/supabase.js';
3+
4+
/**
5+
* This test file verifies the environment type validation system.
6+
* It tests that:
7+
* 1. Supabase flows get typed environment variables
8+
* 2. UserEnv augmentation works correctly
9+
* 3. Invalid UserEnv types cause compilation errors
10+
*/
11+
12+
describe('Environment Type Validation', () => {
13+
it('should provide typed Supabase environment variables', () => {
14+
const flow = new Flow({ slug: 'test_flow' })
15+
.step({ slug: 'process' }, async (input, ctx) => {
16+
// Supabase-specific env vars should be typed
17+
expectTypeOf(ctx.env.EDGE_WORKER_DB_URL).toEqualTypeOf<string>();
18+
expectTypeOf(ctx.env.EDGE_WORKER_LOG_LEVEL).toEqualTypeOf<string | undefined>();
19+
20+
// Random env vars should still be accessible
21+
expectTypeOf(ctx.env.RANDOM_VAR).toEqualTypeOf<string | undefined>();
22+
23+
return { processed: true };
24+
});
25+
26+
void flow;
27+
});
28+
29+
it('should work with custom resources', () => {
30+
interface CustomResources {
31+
logger: { log: (msg: string) => void };
32+
cache: { get: (key: string) => string | null };
33+
}
34+
35+
const flow = new Flow<{ input: string }, CustomResources>({ slug: 'custom_flow' })
36+
.step({ slug: 'process' }, async (input, ctx) => {
37+
// Should have all platform resources
38+
void ctx.sql;
39+
void ctx.anonSupabase;
40+
void ctx.serviceSupabase;
41+
42+
// Plus custom resources
43+
ctx.logger.log('test');
44+
ctx.cache.get('key');
45+
46+
// And typed env vars
47+
expectTypeOf(ctx.env.EDGE_WORKER_DB_URL).toEqualTypeOf<string>();
48+
49+
return { done: true };
50+
});
51+
52+
void flow;
53+
});
54+
55+
it('should validate that Extra extends Record<string, unknown>', () => {
56+
// These should compile fine
57+
type ValidExtra1 = { redis: unknown };
58+
type ValidExtra2 = { logger: unknown; cache: unknown };
59+
type ValidExtra3 = Record<string, unknown>;
60+
61+
type TestInput = { data: string };
62+
63+
// This demonstrates the constraint works
64+
const flow1 = new Flow<TestInput, ValidExtra1>({ slug: 'test1' });
65+
const flow2 = new Flow<TestInput, ValidExtra2>({ slug: 'test2' });
66+
const flow3 = new Flow<TestInput, ValidExtra3>({ slug: 'test3' });
67+
68+
// Just verify they exist
69+
void flow1;
70+
void flow2;
71+
void flow3;
72+
73+
// Note: Invalid types like string, number, etc. would cause compilation errors
74+
// but we can't easily test compilation failures in vitest
75+
});
76+
});
77+
78+
// This demonstrates how users would augment UserEnv
79+
// In a real project, this would be in a separate .d.ts file
80+
declare module '@pgflow/dsl' {
81+
interface UserEnv {
82+
DATABASE_URL: string;
83+
API_KEY: string;
84+
OPTIONAL_FLAG?: string;
85+
}
86+
}
87+
88+
describe('UserEnv Augmentation', () => {
89+
it('should support user-defined environment variables', () => {
90+
const flow = new Flow({ slug: 'user_env_flow' })
91+
.step({ slug: 'process' }, async (input, ctx) => {
92+
// User-defined env vars should be typed
93+
expectTypeOf(ctx.env.DATABASE_URL).toEqualTypeOf<string>();
94+
expectTypeOf(ctx.env.API_KEY).toEqualTypeOf<string>();
95+
expectTypeOf(ctx.env.OPTIONAL_FLAG).toEqualTypeOf<string | undefined>();
96+
97+
// Platform env vars should still be there
98+
expectTypeOf(ctx.env.EDGE_WORKER_DB_URL).toEqualTypeOf<string>();
99+
100+
// Random vars should still work
101+
expectTypeOf(ctx.env.UNKNOWN_VAR).toEqualTypeOf<string | undefined>();
102+
103+
return { processed: true };
104+
});
105+
106+
void flow;
107+
});
108+
});

0 commit comments

Comments
 (0)