Skip to content

Conversation

@jasonhernandez
Copy link

Description of changes

Adds a Materialize backend for Ibis

This PR introduces a new backend for Materialize, a live data layer with native support for generalized incremental view maintenance. Materialize strives to maintain PostgreSQL wire compatibility, but it offers a number of unique capabilities that require expanding beyond the existing Postgres backend for Materialize users. In particular, Materialize provides:

  • Indexes on views, enabling fast, incremental computation over complex SQL queries.
  • Robust materialized view support
  • Clusters for providing physical workload isolation between multiple concurrent use cases within the same database.

The Materialize team is excited to collaborate with the Ibis community and is committed to maintaining and improving this backend going forward.

@github-actions github-actions bot added docs Documentation related issues or PRs tests Issues or PRs related to tests ci Continuous Integration issues or PRs dependencies Issues or PRs related to dependencies sql Backends that generate SQL labels Oct 17, 2025
@jasonhernandez jasonhernandez force-pushed the feat/add-Materialize-backend branch 3 times, most recently from 3debfb6 to 35b0a11 Compare October 17, 2025 23:08
This commit adds comprehensive support for Materialize, including:
- Core backend implementation with cluster and connection management
- Support for materialized views, indexes, sources, sinks, and secrets
- Idiomatic Materialize SQL patterns (mz_now(), SUBSCRIBE, etc.)
- Test coverage for Materialize-specific features
- Integration with existing Ibis test suite
@jasonhernandez jasonhernandez force-pushed the feat/add-Materialize-backend branch from 35b0a11 to 349400c Compare October 20, 2025 17:24
@cpcloud
Copy link
Member

cpcloud commented Oct 24, 2025

@jasonhernandez Thanks for the PR! I will try to give it a review this weekend or later this evening.

It looks like the materialize test suite is timing out at 60 minutes. Any idea what's going on there?

Copy link
Member

@cpcloud cpcloud left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good! This is one of the most thorough initial backend PRs I've ever seen, great work.

A few comments, mostly around exception handling and too-broad-try clauses.

Comment on lines +87 to +100
schema = self.connection.get_schema(table_name)
columns = list(schema.keys())
col_list = ", ".join(f'"{c}"' for c in columns)

# Use COPY FROM STDIN with CSV format (psycopg3 API)
copy_sql = f'COPY "{table_name}" ({col_list}) FROM STDIN WITH (FORMAT CSV, HEADER true)'

with con.cursor() as cur:
# Open CSV file and use copy() context manager for psycopg3
with open(csv_file) as f:
with cur.copy(copy_sql) as copy:
while data := f.read(8192):
copy.write(data)
con.commit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move most of this outside the exception block? It's hard to tell what could raise an exception indicating a table already exists.

Comment on lines +102 to +105
# Log but don't fail - some tables might be pre-populated
import warnings

warnings.warn(f"Could not load {table_name}: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this warning isn't actionable, let's just figure out what conditions cause the exception to be raised and either detect that or isolate as little code as possible under a contextlib.suppress context manager invocation.



@pytest.mark.usefixtures("con")
class TestDistinctOnEdgeCases:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the rest of the codebase doesn't use classes for testing, even to organize tests, let's avoid introducing them in this PR.

If you want to isolate things, you can put these in separate modules (under a directory if you'd like, but not necessary if you don't want to).

"""

def test_kafka_connection_documented(self):
"""Document creating Kafka connection with SASL authentication.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add tests that don't make any assertions. You can always add them in a follow-up.

Comment on lines +55 to +56
con.drop_index(idx_name, force=True)
con.drop_materialized_view(mv_name, force=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only execute this finally if the corresponding create_materialized_view/create_index have successfully executed, so you need to move those create_* calls outside of the try block. Otherwise an exception will be raised when the finally block is hit, because you'll then be trying to drop things that don't exist.

expected.sort_values("string_col").reset_index(drop=True),
)
finally:
con.drop_materialized_view(mv_name, force=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment from another test file: you have to move the create_materialized_view outside of the try block.

Also, let's try to really keep as little code as possible in a try block in general, so that we can easily see what's expected to potentially raise an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way you can handle this, which would be preferable, is to create a fixture like this:

@pytest.fixture
def mat_view(con):
	name = gen_name(...)
    yield name
    con.drop_materialized_view(name)

pytest will handle any exceptions raised (without having to write try:finally: anywhere including in the fixture itself) and invoke the code after the yield point.

healthcheck:
test:
- CMD-SHELL
- bash -c 'printf "GET / HTTP/1.1\n\n" > /dev/tcp/127.0.0.1/6875; exit $$?;'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do the same for the other ports (or perhaps just at least the HTTP port)?

"""
# Materialize has mz_version() function
try:
with self.begin() as cur:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that we can isolate this to:

try:
	cur.execute(...)
except SomeSpecificException:
	return super().version

so that bugs in any string processing code are not swallowed by the ultra-generic Exception.

# Fetch results in batches
while True:
# Fetch a batch of rows
cursor.execute(f"FETCH {batch_size} {cursor_name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to use the pattern while batch := cur.fetchmany(batch_size) here?

- For SSH key rotation, update bastion server keys manually after rotation
"""
if not any([set_options, reset_options, rotate_keys]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not any([set_options, reset_options, rotate_keys]):
if not (set_options or reset_options or rotate_keys):

might actually be shorter and it's not constructing a list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci Continuous Integration issues or PRs dependencies Issues or PRs related to dependencies docs Documentation related issues or PRs sql Backends that generate SQL tests Issues or PRs related to tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants