-
Notifications
You must be signed in to change notification settings - Fork 495
Open
Labels
feat-dataframe-apiEverything related to the dataframe APIEverything related to the dataframe API
Description
Some queries are failing with errors like:
DataFusion error: Internal("Invalid HashJoinExec, partition count mismatch 2038!=2059,consider using RepartitionExec")
I've done some investigating and this basically happens when the query from the server returns different numbers of partitions, either based on entity-filtering or time filtering, which in turn means we aggregate to different numbers of engines, and subsequently we try to use these two different providers in a join.
Here's a minimal repro:
import datafusion as dfn
from rerun.catalog import CatalogClient
catalog = CatalogClient("rerun+http://localhost:51234")
dataset = catalog.get_dataset(name="droid:sample50")
# Use filter_range to exclude some of the partitions
df1 = dataset.dataframe_query_view(index="real_time", contents="/observation/joint_positions").df()
df2 = (
dataset.dataframe_query_view(index="real_time", contents="/action/joint_positions")
.filter_range_nanos(start=0, end=1689823898752999936)
.df()
)
df1 = df1.aggregate(
dfn.col("rerun_partition_id").alias("left_rerun_partition_id"),
dfn.functions.first_value(dfn.col("/observation/joint_positions:Scalars:scalars")),
)
df2 = df2.aggregate(
dfn.col("rerun_partition_id").alias("right_rerun_partition_id"),
dfn.functions.last_value(dfn.col("/action/joint_positions:Scalars:scalars")),
)
df2.join(df1, left_on="left_rerun_partition_id", right_on="right_rerun_partition_id").count()
Metadata
Metadata
Assignees
Labels
feat-dataframe-apiEverything related to the dataframe APIEverything related to the dataframe API