Skip to content

Enable huge 'select' statements #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 3 additions & 26 deletions django_bulk_load/bulk_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,7 @@ def bulk_select_model_dicts(
the filter_field_names keys in addition to any fields in select_field_names
:param filter_data: Values (normally tuples) of the filter_field_names. For instance if filter_field_names=["field1", "field2"],
filter_data may be [(12, "hello"), (23, "world"), (35, "fun"), ...]
:param skip_filter_transform: Normally the function converts the filter_data into DB specific values. This is useful
for datetimes or other complex values that have different representation in the DB. The downside is the transform
can be slow. If you know your data is simple values (strings, integers, etc.) and don't need
transformation, you can pass True.
:param skip_filter_transform: DEPRECATED.
:param select_for_update: Use `FOR UPDATE` clause in select query. This will lock the rows.

:return: List of dictionaries that match the model_data. Returns dictionaries for performance reasons
Expand All @@ -539,32 +536,13 @@ def bulk_select_model_dicts(
connection = connections[db_name]

with connection.cursor() as cursor:
Copy link
Author

@wchinfeman-cedar wchinfeman-cedar Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need a transaction here, now that I am making a temp-table? Or is there an explicit cleanup step?

# Grab all the filter data, so we can know the length
filter_data = list(filter_data)
if not skip_filter_transform:
filter_data_transformed = []
for filter_vals in filter_data:
filter_data_transformed.append(
[
django_field_to_query_value(filter_fields[i], value)
for i, value in enumerate(filter_vals)
]
)
filter_data = filter_data_transformed

sql = generate_values_select_query(
table_name=table_name,
select_fields=select_fields,
filter_fields=filter_fields,
select_for_update=select_for_update
)
sql_string = sql.as_string(cursor.connection)
models = [model_class(**dict(zip(filter_field_names, x))) for x in filter_data]
cursor.execute(generate_select_query(table_name, create_temp_table_and_load(models, connection, cursor, filter_field_names), filter_fields, select_fields, for_update=select_for_update))

logger.info(
"Starting selecting models",
extra=dict(query_dict_count=len(filter_data), table_name=table_name),
)
execute_values(cursor, sql_string, filter_data, page_size=len(filter_data))
columns = [col[0] for col in cursor.description]

# Map columns to fields so we can later correctly interpret column values
Expand All @@ -590,5 +568,4 @@ def bulk_select_model_dicts(
duration=monotonic() - start_time,
),
)

return results
27 changes: 19 additions & 8 deletions django_bulk_load/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def generate_select_query(
loading_table_name: str,
join_fields: Sequence[models.Field],
select_fields: Sequence[models.Field] = None,
for_update=False
) -> Composable:
join_clause = generate_join_condition(
source_table_name=loading_table_name,
Expand All @@ -326,14 +327,24 @@ def generate_select_query(
else:
fields = SQL("{table_name}.*").format(table_name=Identifier(table_name))

return SQL(
"SELECT {fields} FROM {table_name} INNER JOIN {loading_table_name} ON {join_clause}"
).format(
loading_table_name=Identifier(loading_table_name),
join_clause=join_clause,
fields=fields,
table_name=Identifier(table_name),
)
if for_update:
return SQL(
"SELECT {fields} FROM {table_name} INNER JOIN {loading_table_name} ON {join_clause} FOR UPDATE"
).format(
loading_table_name=Identifier(loading_table_name),
join_clause=join_clause,
fields=fields,
table_name=Identifier(table_name),
)
else:
return SQL(
"SELECT {fields} FROM {table_name} INNER JOIN {loading_table_name} ON {join_clause}"
).format(
loading_table_name=Identifier(loading_table_name),
join_clause=join_clause,
fields=fields,
table_name=Identifier(table_name),
)


def generate_values_select_query(
Expand Down
Loading