Skip to content

Commit c7547f9

Browse files
feat: semantic type annotations for sql schema (#2)
* fix detection of protobuf field (causing panic) * inform changelog for v4.6.1 * feat: add risingwave support to substreams-sink-sql (#1) * feat: implement risingwave sql dialect * fix: migrate all pg types to risingwave types * feat: dialect and driver selection working * fix: risingwave create table * feat: add id generation and migrate json methods * chore: logs everywhere * chore: logs everywhere * chore: logs everywhere * feat: working risingwave ingestion * chore: remove debug logs * chore: remove risingwave connection overrides * chore(ci): enable manual dispatch * feat: add RisingWave database support - Rebase RisingWave implementation onto upstream develop branch - Adapt to new database/dialect/inserter architecture pattern - Implement UseVersionField() and UseDeletedField() dialect methods - Add pgInserter/pgFlusher interfaces for database operations - Refactor inserters to use init pattern with separate construction/initialization * fix: align RisingWave transaction handling with autocommit-only behavior - Fix 'unexpected transaction status idle' error by avoiding explicit transactions - RisingWave does not support read-write transactions per documentation - Use autocommit mode for all operations as intended by RisingWave design - Add comprehensive logging and documentation about transaction limitations - Update README to clearly explain RisingWave's streaming-first architecture * fix: replace PostgreSQL ON CONFLICT syntax with RisingWave table-level conflict handling - Remove 'ON CONFLICT (name) DO UPDATE SET' from INSERT statements - Add 'ON CONFLICT OVERWRITE' to _cursor_ table definition - RisingWave defines conflict behavior at table creation, not per-INSERT - This resolves 'sql parser error: expected end of statement, found: ON' * refactor: remove legacy PostgreSQL compatibility cruft from RisingWave - Remove legacy Insert(table, values, txWrapper) methods - Remove legacy Flush(tx) methods - Keep only clean modern interfaces: Insert(table, values) and flush(database) - RisingWave now implements only the interfaces it needs without PostgreSQL baggage - Makes codebase cleaner and forces use of modern patterns * fix: use execSql helper instead of direct tx.Exec in accumulator flush - Replace database.tx.Exec() with database.execSql() in flush method - Prevents nil pointer dereference since tx is nil in RisingWave autocommit mode - Ensures consistent SQL execution through the autocommit wrapper * fix: use autocommit mode for HandleBlocksUndo in RisingWave - Replace transaction-based block undo with direct SQL execution - Use execSql helper instead of db.Begin() and tx.Exec() - Eliminates 'unexpected transaction status idle' error in block reorg handling - Maintains same functionality with RisingWave's streaming architecture * fix: handle RisingWave SQL compatibility limitations - Keep RETURNING clause support (verified in RisingWave release #7094) - Document ON CONFLICT limitation: only supported in CREATE TABLE, not INSERT - Add comment explaining UPSERT requires tables created with ON CONFLICT OVERWRITE - JSONB functions (jsonb_populate_record, to_jsonb) are supported by RisingWave * refactor: simplify RisingWave UPSERT to rely on table-level conflict handling - Remove unused EXCLUDED syntax building for UPSERT operations - RisingWave handles conflicts via ON CONFLICT OVERWRITE at table creation - Simple INSERT statements automatically overwrite on primary key conflicts - Ensures complete reorg support equivalent to PostgreSQL * feat: implement semantic type annotations for custom SQL types This implementation introduces a comprehensive semantic type annotation system that enables dialect-specific SQL type mapping, with special support for RisingWave's rw_int256 type for large blockchain integers. Key Features: - Semantic type annotations in protobuf schema (semantic_type, format_hint) - RisingWave rw_int256 support for uint256/int256 semantic types - Dialect-agnostic type mapping with graceful fallbacks - Comprehensive value conversion with format validation - Support for blockchain types (address, hash, signature, pubkey) - Precision decimal types (decimal18, decimal6, decimal8, money) - Time and JSON type optimizations Implementation Details: - Extended protobuf schema with semantic_type and format_hint fields - Created semantic type registry with dialect-specific mappings - Updated RisingWave dialect to support rw_int256 type casting - Added comprehensive test coverage for all semantic types - Implemented value conversion with validation and error handling Benefits: - Leverages RisingWave's native 256-bit integer arithmetic - Optimized storage for blockchain addresses and hashes - Proper decimal precision for DeFi token amounts - Maintains compatibility across PostgreSQL, RisingWave, and ClickHouse - Extensible design for future semantic types Usage: ```protobuf string value = 1 [(sf.substreams.sink.sql.schema.v1.field) = { semantic_type: "uint256", format_hint: "decimal" }]; // → rw_int256 in RisingWave, NUMERIC in PostgreSQL string address = 2 [(sf.substreams.sink.sql.schema.v1.field) = { semantic_type: "address" }]; // → VARCHAR(42) with validation ``` * fix: remove unsupported decimal semantic types and fix RisingWave type mappings - Remove decimal6, decimal8, decimal18, and money semantic types from all dialects - Fix RisingWave types to use correct CHARACTER VARYING without length specs - Clean up unused convertToDecimal functions - Update documentation to reflect supported types only - Ensure consistent semantic type support across PostgreSQL, ClickHouse, and RisingWave * docs: remove remaining references to deleted decimal semantic types - Remove stale decimal18/decimal6 examples from ClickHouse SQL output - Update best practices to recommend uint256 instead of decimal types - Ensure documentation is fully consistent with implementation --------- Co-authored-by: Stéphane Duchesneau <stephane.duchesneau@streamingfast.io>
1 parent c6a7a0e commit c7547f9

39 files changed

+6220
-901
lines changed

.github/workflows/docker.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ on:
66
- "v*"
77
branches:
88
- "*"
9+
workflow_dispatch:
910

1011
env:
1112
REGISTRY: ghcr.io

0 commit comments

Comments
 (0)