-
Couldn't load subscription status.
- Fork 680
feat(materialize): add new backend for Materialize streaming database #11703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(materialize): add new backend for Materialize streaming database #11703
Conversation
3debfb6 to
35b0a11
Compare
This commit adds comprehensive support for Materialize, including: - Core backend implementation with cluster and connection management - Support for materialized views, indexes, sources, sinks, and secrets - Idiomatic Materialize SQL patterns (mz_now(), SUBSCRIBE, etc.) - Test coverage for Materialize-specific features - Integration with existing Ibis test suite
35b0a11 to
349400c
Compare
|
@jasonhernandez Thanks for the PR! I will try to give it a review this weekend or later this evening. It looks like the materialize test suite is timing out at 60 minutes. Any idea what's going on there? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good! This is one of the most thorough initial backend PRs I've ever seen, great work.
A few comments, mostly around exception handling and too-broad-try clauses.
| schema = self.connection.get_schema(table_name) | ||
| columns = list(schema.keys()) | ||
| col_list = ", ".join(f'"{c}"' for c in columns) | ||
|
|
||
| # Use COPY FROM STDIN with CSV format (psycopg3 API) | ||
| copy_sql = f'COPY "{table_name}" ({col_list}) FROM STDIN WITH (FORMAT CSV, HEADER true)' | ||
|
|
||
| with con.cursor() as cur: | ||
| # Open CSV file and use copy() context manager for psycopg3 | ||
| with open(csv_file) as f: | ||
| with cur.copy(copy_sql) as copy: | ||
| while data := f.read(8192): | ||
| copy.write(data) | ||
| con.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move most of this outside the exception block? It's hard to tell what could raise an exception indicating a table already exists.
| # Log but don't fail - some tables might be pre-populated | ||
| import warnings | ||
|
|
||
| warnings.warn(f"Could not load {table_name}: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this warning isn't actionable, let's just figure out what conditions cause the exception to be raised and either detect that or isolate as little code as possible under a contextlib.suppress context manager invocation.
|
|
||
|
|
||
| @pytest.mark.usefixtures("con") | ||
| class TestDistinctOnEdgeCases: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the rest of the codebase doesn't use classes for testing, even to organize tests, let's avoid introducing them in this PR.
If you want to isolate things, you can put these in separate modules (under a directory if you'd like, but not necessary if you don't want to).
| """ | ||
|
|
||
| def test_kafka_connection_documented(self): | ||
| """Document creating Kafka connection with SASL authentication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add tests that don't make any assertions. You can always add them in a follow-up.
| con.drop_index(idx_name, force=True) | ||
| con.drop_materialized_view(mv_name, force=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can only execute this finally if the corresponding create_materialized_view/create_index have successfully executed, so you need to move those create_* calls outside of the try block. Otherwise an exception will be raised when the finally block is hit, because you'll then be trying to drop things that don't exist.
| expected.sort_values("string_col").reset_index(drop=True), | ||
| ) | ||
| finally: | ||
| con.drop_materialized_view(mv_name, force=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment from another test file: you have to move the create_materialized_view outside of the try block.
Also, let's try to really keep as little code as possible in a try block in general, so that we can easily see what's expected to potentially raise an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way you can handle this, which would be preferable, is to create a fixture like this:
@pytest.fixture
def mat_view(con):
name = gen_name(...)
yield name
con.drop_materialized_view(name)pytest will handle any exceptions raised (without having to write try:finally: anywhere including in the fixture itself) and invoke the code after the yield point.
| healthcheck: | ||
| test: | ||
| - CMD-SHELL | ||
| - bash -c 'printf "GET / HTTP/1.1\n\n" > /dev/tcp/127.0.0.1/6875; exit $$?;' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do the same for the other ports (or perhaps just at least the HTTP port)?
| """ | ||
| # Materialize has mz_version() function | ||
| try: | ||
| with self.begin() as cur: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that we can isolate this to:
try:
cur.execute(...)
except SomeSpecificException:
return super().versionso that bugs in any string processing code are not swallowed by the ultra-generic Exception.
| # Fetch results in batches | ||
| while True: | ||
| # Fetch a batch of rows | ||
| cursor.execute(f"FETCH {batch_size} {cursor_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no way to use the pattern while batch := cur.fetchmany(batch_size) here?
| - For SSH key rotation, update bastion server keys manually after rotation | ||
| """ | ||
| if not any([set_options, reset_options, rotate_keys]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if not any([set_options, reset_options, rotate_keys]): | |
| if not (set_options or reset_options or rotate_keys): |
might actually be shorter and it's not constructing a list.
Description of changes
Adds a Materialize backend for Ibis
This PR introduces a new backend for Materialize, a live data layer with native support for generalized incremental view maintenance. Materialize strives to maintain PostgreSQL wire compatibility, but it offers a number of unique capabilities that require expanding beyond the existing Postgres backend for Materialize users. In particular, Materialize provides:
The Materialize team is excited to collaborate with the Ibis community and is committed to maintaining and improving this backend going forward.