-
Notifications
You must be signed in to change notification settings - Fork 21
feat: cleaner & more efficient direct insert logic #1125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
CREATE OR REPLACE TABLE {STAGING_CLICKHOUSE_DATABASE}._tmp_loadable_keys ENGINE = Set AS ( | ||
SELECT {clickhouse_table.key_field} | ||
FROM {src_table} src | ||
LEFT ANTI JOIN {dst_table} dst |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
The first pass at this used a more efficient mechanism for determining which variants to load (if the max variant id in the loading pipeline run is already present, don't load anything).
-
This didn't work for the data migrations, so we opted to just re-load all variants and let rocksdb compaction handle the de-duplication. This causes serious memory pressure under certain situations.
-
The proposed solution here duplicates the anti join that happens upstream in the pipeline to also happen here. This potentially unnecessary in the long run, but is theoretically correct behavior.
-
We need a temp table because the expected
INSERT INTO table1 FROM SELECT * FROM table2 LEFT ANTI JOIN table1
runs into some trouble as table1 is being both joined and inserted into in the same query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a second thought that we might see memory pressure with the Set
implementation, but validated that we should not:
CREATE TABLE T Engine=Set AS SELECT randomString(500) FROM numbers(20000000);
SELECT
database,
name,
formatReadableSize(total_bytes)
FROM system.tables
WHERE engine IN ('Memory', 'Set', 'Join')
and name = 'T';
1. │ default │ T │ 1.50 GiB │
drop_staging_db() | ||
logged_query( | ||
f""" | ||
CREATE DATABASE {STAGING_CLICKHOUSE_DATABASE} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a CREATE OR REPLACE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in this case. this is the database, not a table, and one line above we delete the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I find it confusing that we CREATE
a database and then on the next line CREATE OR REPLACE
a table in that db - if the db is new every time then should't we always create the table and never replace it? And if the db can persist shouldn't we fail gracefully if it does exist, which CREATE
won't do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 that's fair! I think the best course of action is removing the REPLACE
on the table creation. It's sort of an artifact of the testing I've been doing, but I did explicitly leave it because the table name is hardcoded and there are scenarios where the staging
environment doesn't get deleted if the pod is killed. If the staging environment is wiped that won't benefit an issue.
I wasn't 100% sure, while implementing this, whether the staging environment should be used for this table or not, or if an entirely different "database" should be created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better, there's no reason to use a hardcoded table name here. There's a utility already present in this file to give a proper staging prefix, which includes the run id & dataset type.
No description provided.