diff --git a/ci/environment-3.10.yaml b/ci/environment-3.10.yaml index ca3045d..7c19bcf 100644 --- a/ci/environment-3.10.yaml +++ b/ci/environment-3.10.yaml @@ -6,13 +6,9 @@ dependencies: - python=3.10 - dask - distributed - # `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` - # doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. - # xref https://github.com/pandas-dev/pandas/issues/57049 - # xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 - - pandas<2.2 + - pandas - pyarrow - - snowflake-connector-python >=2.6.0 + - snowflake-connector-python >=2.7.3 - snowflake-sqlalchemy # Testing - pytest \ No newline at end of file diff --git a/ci/environment-3.11.yaml b/ci/environment-3.11.yaml index 2e5e6eb..d1ac68e 100644 --- a/ci/environment-3.11.yaml +++ b/ci/environment-3.11.yaml @@ -6,13 +6,9 @@ dependencies: - python=3.11 - dask - distributed - # `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` - # doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. - # xref https://github.com/pandas-dev/pandas/issues/57049 - # xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 - - pandas<2.2 + - pandas - pyarrow - - snowflake-connector-python >=2.6.0 + - snowflake-connector-python >=2.7.3 - snowflake-sqlalchemy # Testing - pytest \ No newline at end of file diff --git a/ci/environment-3.7.yaml b/ci/environment-3.7.yaml index 5176e7e..b247107 100644 --- a/ci/environment-3.7.yaml +++ b/ci/environment-3.7.yaml @@ -6,13 +6,9 @@ dependencies: - python=3.7 - dask - distributed - # `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` - # doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. - # xref https://github.com/pandas-dev/pandas/issues/57049 - # xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 - - pandas<2.2 + - pandas - pyarrow - - snowflake-connector-python >=2.6.0 + - snowflake-connector-python >=2.7.3 - snowflake-sqlalchemy # Testing - pytest \ No newline at end of file diff --git a/ci/environment-3.8.yaml b/ci/environment-3.8.yaml index 845e546..a82e82d 100644 --- a/ci/environment-3.8.yaml +++ b/ci/environment-3.8.yaml @@ -6,13 +6,9 @@ dependencies: - python=3.8 - dask - distributed - # `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` - # doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. - # xref https://github.com/pandas-dev/pandas/issues/57049 - # xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 - - pandas<2.2 + - pandas - pyarrow - - snowflake-connector-python >=2.6.0 + - snowflake-connector-python >=2.7.3 - snowflake-sqlalchemy # Testing - pytest \ No newline at end of file diff --git a/ci/environment-3.9.yaml b/ci/environment-3.9.yaml index 916924a..7e34c8e 100644 --- a/ci/environment-3.9.yaml +++ b/ci/environment-3.9.yaml @@ -6,13 +6,9 @@ dependencies: - python=3.9 - dask - distributed - # `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` - # doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. - # xref https://github.com/pandas-dev/pandas/issues/57049 - # xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 - - pandas<2.2 + - pandas - pyarrow - - snowflake-connector-python >=2.6.0 + - snowflake-connector-python >=2.7.3 - snowflake-sqlalchemy # Testing - pytest \ No newline at end of file diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index 9059d6c..85d128b 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -6,10 +6,8 @@ import pandas as pd import pyarrow as pa import snowflake.connector -from snowflake.connector.pandas_tools import pd_writer, write_pandas +from snowflake.connector.pandas_tools import write_pandas from snowflake.connector.result_batch import ArrowResultBatch -from snowflake.sqlalchemy import URL -from sqlalchemy import create_engine import dask import dask.dataframe as dd @@ -37,40 +35,13 @@ def write_snowflake( conn=conn, df=df, schema=connection_kwargs.get("schema", None), - # NOTE: since ensure_db_exists uses uppercase for the table name table_name=name.upper(), quote_identifiers=False, + auto_create_table=True, **(write_pandas_kwargs or {}), ) -@delayed -def ensure_db_exists( - df: pd.DataFrame, - name: str, - connection_kwargs, -): - connection_kwargs = { - **{"application": dask.config.get("snowflake.partner", "dask")}, - **connection_kwargs, - } - # NOTE: we have a separate `ensure_db_exists` function in order to use - # pandas' `to_sql` which will create a table if the requested one doesn't - # already exist. However, we don't always want to use Snowflake's `pd_writer` - # approach because it doesn't allow us disable parallel file uploading. - # For these cases we use a separate `write_snowflake` function. - engine = create_engine(URL(**connection_kwargs)) - # # NOTE: pd_writer will automatically uppercase the table name - df.to_sql( - name=name, - schema=connection_kwargs.get("schema", None), - con=engine, - index=False, - if_exists="append", - method=pd_writer, - ) - - def to_snowflake( df: dd.DataFrame, name: str, @@ -111,13 +82,6 @@ def to_snowflake( ... ) """ - # Write the DataFrame meta to ensure table exists before - # trying to write all partitions in parallel. Otherwise - # we run into race conditions around creating a new table. - # Also, some clusters will overwrite the `snowflake.partner` configuration value. - # We run `ensure_db_exists` on the cluster to ensure we capture the - # right partner application ID. - ensure_db_exists(df._meta, name, connection_kwargs).compute() parts = [ write_snowflake(partition, name, connection_kwargs, write_pandas_kwargs) for partition in df.to_delayed() diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index 70a054c..ca31c53 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -159,8 +159,8 @@ def mock_connect(**kwargs): monkeypatch.setattr(snowflake.connector, "connect", mock_connect) to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) - # One extra connection is made to ensure the DB table exists - count_after_write = ddf.npartitions + 1 + # One connection is made for writing each partition + count_after_write = ddf.npartitions assert count == count_after_write ddf_out = read_snowflake( @@ -184,8 +184,8 @@ def mock_connect(**kwargs): monkeypatch.setattr(snowflake.connector, "connect", mock_connect) to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) - # One extra connection is made to ensure the DB table exists - count_after_write = ddf.npartitions + 1 + # One connection is made for writing each partition + count_after_write = ddf.npartitions assert count == count_after_write ddf_out = read_snowflake( @@ -221,8 +221,8 @@ def mock_connect(**kwargs): client.run(patch_snowflake_connect) to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) - # One extra connection is made to ensure the DB table exists - count_after_write = ddf.npartitions + 1 + # One connection is made for writing each partition + count_after_write = ddf.npartitions ddf_out = read_snowflake( f"SELECT * FROM {table}", connection_kwargs=connection_kwargs, npartitions=2 @@ -250,8 +250,8 @@ def mock_connect(**kwargs): monkeypatch.setattr(snowflake.connector, "connect", mock_connect) to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) - # One extra connection is made to ensure the DB table exists - count_after_write = ddf.npartitions + 1 + # One connection is made for writing each partition + count_after_write = ddf.npartitions assert count == count_after_write ddf_out = read_snowflake( diff --git a/requirements.txt b/requirements.txt index f0e9132..84d5afd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,3 @@ dask>=2021.05.0 distributed -snowflake-connector-python[pandas]>=2.6.0 -snowflake-sqlalchemy -# `pandas=2.2` dropped support for `sqlalchemy<2`, but `snowflake-sqlalchemy` -# doesn't support `sqlalchemy>=2` yet. Temporarily pinning `pandas<2.2` for now. -# xref https://github.com/pandas-dev/pandas/issues/57049 -# xref https://github.com/snowflakedb/snowflake-sqlalchemy/issues/380 -pandas<2.2 +snowflake-connector-python[pandas]>=2.7.3