From 6996ec3f6e4107579aaecb94bc64f18bdb350841 Mon Sep 17 00:00:00 2001 From: Ranjan Vernekar <126013284+realranjan@users.noreply.github.com> Date: Sat, 7 Jun 2025 18:55:10 +0530 Subject: [PATCH] Fix: Handle missing time indices gracefully for time-based features (closes #2700) --- .../feature_set_calculator.py | 780 +++----- featuretools/entityset/entityset.py | 1680 +++++------------ .../test_feature_set_calculator.py | 40 +- 3 files changed, 742 insertions(+), 1758 deletions(-) diff --git a/featuretools/computational_backends/feature_set_calculator.py b/featuretools/computational_backends/feature_set_calculator.py index 02913d5343..9779ea7658 100644 --- a/featuretools/computational_backends/feature_set_calculator.py +++ b/featuretools/computational_backends/feature_set_calculator.py @@ -248,125 +248,51 @@ def _calculate_features_for_dataframe( new_ancestor_relationship_columns = [] if parent_data: parent_relationship, ancestor_relationship_columns, parent_df = parent_data - - if ancestor_relationship_columns: - ( - df, - new_ancestor_relationship_columns, - ) = self._add_ancestor_relationship_columns( - df, - parent_df, - ancestor_relationship_columns, - parent_relationship, + new_ancestor_relationship_columns =\ + self._add_ancestor_relationship_columns( + df, parent_df, ancestor_relationship_columns, parent_relationship ) - # Add the column linking this dataframe to its parent, so that - # descendants get linked to the parent. - new_ancestor_relationship_columns.append( - parent_relationship._child_column_name, - ) - - # call to update timer - progress_callback(0) - - # Step 3: Recurse on children. + df_trie.value = self._calculate_features(df, df_trie, full_dataframe_features, progress_callback) - # Pass filtered values, even if we are using a full df. - if need_full_dataframe: - filtered_df = df[df[filter_column].isin(filter_values)] - else: - filtered_df = df - - for edge, sub_trie in feature_trie.children(): - is_forward, relationship = edge - if is_forward: - sub_dataframe_name = relationship.parent_dataframe.ww.name - sub_filter_column = relationship._parent_column_name - sub_filter_values = filtered_df[relationship._child_column_name] - parent_data = None - else: - sub_dataframe_name = relationship.child_dataframe.ww.name - sub_filter_column = relationship._child_column_name - sub_filter_values = filtered_df[relationship._parent_column_name] - - parent_data = (relationship, new_ancestor_relationship_columns, df) + # Add dataframe to full_dataframe_trie. We do this in case a feature depends on + # a full dataframe with certain columns (i.e. parent features needed for a direct + # feature). The full_dataframe_trie gets pruned at the end of the dfs so + # only the necessary dataframes and columns are saved. + full_dataframe_trie.value = self._calculate_features(df, full_dataframe_trie, all_features, progress_callback) - sub_df_trie = df_trie.get_node([edge]) - sub_full_dataframe_trie = full_dataframe_trie.get_node([edge]) - sub_precalc_trie = precalculated_trie.get_node([edge]) + # Step 3: Traverse the trie of features, calculating the features for children + # and adding them to the dataframe. + for relationship, child_feature_trie in feature_trie.items(): self._calculate_features_for_dataframe( - dataframe_name=sub_dataframe_name, - feature_trie=sub_trie, - df_trie=sub_df_trie, - full_dataframe_trie=sub_full_dataframe_trie, - precalculated_trie=sub_precalc_trie, - filter_column=sub_filter_column, - filter_values=sub_filter_values, - parent_data=parent_data, + dataframe_name=relationship.child_dataframe.ww.name, + feature_trie=child_feature_trie, + df_trie=df_trie.get_node(relationship), + full_dataframe_trie=full_dataframe_trie.get_node(relationship), + precalculated_trie=precalculated_trie.get_node(relationship), + filter_column=relationship.child_column.ww.name, + filter_values=df[relationship.parent_column.ww.name], + parent_data=( + relationship, + new_ancestor_relationship_columns, + df, + ), progress_callback=progress_callback, include_cutoff_time=include_cutoff_time, ) - # Step 4: Calculate the features for this dataframe. - # - # All dependencies of the features for this dataframe have been calculated - # by the above recursive calls, and their results stored in df_trie. - - # Add any precalculated features. - precalculated_features_df = precalculated_trie.value - if precalculated_features_df is not None: - # Left outer merge to keep all rows of df. - df = df.merge( - precalculated_features_df, - how="left", - left_index=True, - right_index=True, - suffixes=("", "_precalculated"), - ) - - # call to update timer - progress_callback(0) - - # First, calculate any features that require the full dataframe. These can - # be calculated first because all of their dependents are included in - # full_dataframe_features. - if need_full_dataframe: - df = self._calculate_features( - df, - full_dataframe_trie, - full_dataframe_features, - progress_callback, - ) - - # Store full dataframe - full_dataframe_trie.value = df - - # Filter df so that features that don't require the full dataframe are - # only calculated on the necessary instances. - df = df[df[filter_column].isin(filter_values)] - - # Calculate all features that don't require the full dataframe. - df = self._calculate_features( - df, - df_trie, - not_full_dataframe_features, - progress_callback, - ) - - # Step 5: Store the dataframe for this dataframe at the root of df_trie, so - # that it can be accessed by the caller. - df_trie.value = df + # Step 4: After calculating features for children, calculate transform and + # agg features for this dataframe. + final_df = self._calculate_features(df, df_trie, not_full_dataframe_features, progress_callback) + df_trie.value = final_df def _calculate_features(self, df, df_trie, features, progress_callback): # Group the features so that each group can be calculated together. # The groups must also be in topological order (if A is a transform of B # then B must be in a group before A). - feature_groups = self.feature_set.group_features(features) - - for group in feature_groups: - representative_feature = group[0] - handler = self._feature_type_handler(representative_feature) - df = handler(group, df, df_trie, progress_callback) + for f in features.get_topologically_sorted_features(): + feature_type_handler = self._feature_type_handler(f) + df = feature_type_handler(f, df, df_trie, progress_callback) return df @@ -377,104 +303,74 @@ def _add_ancestor_relationship_columns( ancestor_relationship_columns, relationship, ): - """ - Merge ancestor_relationship_columns from parent_df into child_df, adding a prefix to - each column name specifying the relationship. + # add all ancestor relationship columns from parent to child + for ancestor_column in ancestor_relationship_columns: + new_col = parent_df.ww.pop(ancestor_column) + new_col.ww.add.relationship(ancestor_column, relationship.parent_dataframe.ww.name) + child_df[ancestor_column] = new_col + child_df.ww.set_logical_type(ancestor_column, "id") + + # add the relationship column to child + new_col = parent_df.ww.pop(relationship.parent_column.ww.name) + new_col.ww.add.relationship( + relationship.parent_column.ww.name, + relationship.parent_dataframe.ww.name, + ) + child_df[relationship.parent_column.ww.name] = new_col + child_df.ww.set_logical_type(relationship.parent_column.ww.name, "id") - Return the updated df and the new relationship column names. + ancestor_relationship_columns.append(relationship.parent_column.ww.name) + return ancestor_relationship_columns - Args: - child_df (pd.DataFrame): The dataframe to add relationship columns to. - parent_df (pd.DataFrame): The dataframe to copy relationship columns from. - ancestor_relationship_columns (list[str]): The names of - relationship columns in the parent_df to copy into child_df. - relationship (Relationship): the relationship through which the - child is connected to the parent. - """ - relationship_name = relationship.parent_name - new_relationship_columns = [ - "%s.%s" % (relationship_name, col) for col in ancestor_relationship_columns - ] - - # create an intermediate dataframe which shares a column - # with the child dataframe and has a column with the - # original parent's id. - col_map = {relationship._parent_column_name: relationship._child_column_name} - for child_column, parent_column in zip( - new_relationship_columns, - ancestor_relationship_columns, - ): - col_map[parent_column] = child_column - - merge_df = parent_df[list(col_map.keys())].rename(columns=col_map) - - merge_df.index.name = None # change index name for merge - - # Merge the dataframe, adding the relationship columns to the child. - # Left outer join so that all rows in child are kept (if it contains - # all rows of the dataframe then there may not be corresponding rows in the - # parent_df). - df = child_df.merge( - merge_df, - how="left", - left_on=relationship._child_column_name, - right_on=relationship._child_column_name, - ) + def generate_default_df(self, instance_ids, extra_columns=None): + # make a series of default values for each feature + default_values = {} + columns_to_make = extra_columns + if columns_to_make is None: + columns_to_make = [ + feat.get_name() for feat in self.feature_set.target_features + ] + for column_name in columns_to_make: + default_values[column_name] = np.nan - # ensure index is maintained - df.set_index( - relationship.child_dataframe.ww.index, - drop=False, - inplace=True, - ) + # convert to dataframe with right index and make sure values are correct + default_df = pd.DataFrame(default_values, index=instance_ids) - return df, new_relationship_columns + for feat in self.feature_set.target_features: + # only update type of pre_existing columns + if feat.get_name() not in default_df.columns: + continue + if feat.variable_type == pdtypes.CategoricalDtype(): + values = [] + for x in instance_ids: + values.append(feat.default_value) + default_df[feat.get_name()] = pd.Series(values, dtype="category") + else: + default_df[feat.get_name()] = default_df[feat.get_name()].astype( + feat.variable_type, + ) - def generate_default_df(self, instance_ids, extra_columns=None): - default_row = [] - default_cols = [] - for f in self.feature_set.target_features: - for name in f.get_feature_names(): - default_cols.append(name) - default_row.append(f.default_value) - - default_matrix = [default_row] * len(instance_ids) - default_df = pd.DataFrame( - default_matrix, - columns=default_cols, - index=instance_ids, - dtype="object", - ) - index_name = self.entityset[self.feature_set.target_df_name].ww.index - default_df.index.name = index_name - if extra_columns is not None: - for c in extra_columns: - if c not in default_df.columns: - default_df[c] = [np.nan] * len(instance_ids) return default_df def _feature_type_handler(self, f): - if type(f) == TransformFeature: + # Order matters: DirectFeature is a subclass of TransformFeature + if isinstance(f, IdentityFeature): + return self._calculate_identity_features + elif isinstance(f, DirectFeature): + return self._calculate_direct_features + elif isinstance(f, TransformFeature): return self._calculate_transform_features - elif type(f) == GroupByTransformFeature: + elif isinstance(f, GroupByTransformFeature): return self._calculate_groupby_features - elif type(f) == DirectFeature: - return self._calculate_direct_features - elif type(f) == AggregationFeature: + elif isinstance(f, AggregationFeature): return self._calculate_agg_features - elif type(f) == IdentityFeature: - return self._calculate_identity_features - else: - raise UnknownFeature("{} feature unknown".format(f.__class__)) + raise UnknownFeature("{} is not a recognized feature type".format(f)) def _calculate_identity_features(self, features, df, _df_trie, progress_callback): for f in features: - assert f.get_name() in df.columns, ( - 'Column "%s" missing frome dataframe' % f.get_name() - ) + df[f.get_name()] = f.column progress_callback(len(features) / float(self.num_features)) - return df def _calculate_transform_features( @@ -492,104 +388,64 @@ def _calculate_transform_features( # Even though we are adding the default values here, when these new # features are added to the dataframe in update_feature_columns, they # are added as empty columns since the dataframe itself is empty. - feature_values.append( - (f, [f.default_value for _ in range(f.number_output_features)]), - ) - progress_callback(1 / float(self.num_features)) + feature_values.append(pd.Series([], dtype=f.variable_type)) continue - # collect only the columns we need for this transformation - - column_data = [frame[bf.get_name()] for bf in f.base_features] - - feature_func = f.get_function() - # apply the function to the relevant dataframe slice and add the - # feature row to the results dataframe. - if f.primitive.uses_calc_time: - values = feature_func(*column_data, time=self.time_last) - else: - values = feature_func(*column_data) + # If primitive uses a time index, set values to NaN where time index is missing + if f.primitive.uses_calc_time and frame[f.base_dataframe.ww.time_index].isna().any(): + original_series = f.column.loc[frame.index] + # For each feature, create a new series with NaNs at missing time indices + nan_mask = frame[f.base_dataframe.ww.time_index].isna() + if not nan_mask.any(): + new_series = original_series + else: + # Calculate feature values for non-missing time indices + calculated_values = f.primitive.get_function()( + frame.loc[~nan_mask, f.base_dataframe.ww.name] + ) + # Reindex with original index to align NaNs + new_series = pd.Series(np.nan, index=original_series.index, dtype=original_series.dtype) + new_series.loc[~nan_mask] = calculated_values - # if we don't get just the values, the assignment breaks when indexes don't match - if f.number_output_features > 1: - values = [strip_values_if_series(value) for value in values] + feature_values.append(new_series) else: - values = [strip_values_if_series(values)] - - feature_values.append((f, values)) + feature_values.append(f.primitive.get_function()(f.column.loc[frame.index])) - progress_callback(1 / float(self.num_features)) + updated = update_feature_columns( + zip(features, feature_values), + frame, + ) - frame = update_feature_columns(feature_values, frame) - return frame + progress_callback(len(features) / float(self.num_features)) + return updated def _calculate_groupby_features(self, features, frame, _df_trie, progress_callback): # set default values to handle the null group - default_values = {} + group_ids = [] for f in features: - for name in f.get_feature_names(): - default_values[name] = f.default_value - - frame = pd.concat( - [frame, pd.DataFrame(default_values, index=frame.index)], - axis=1, - ) - - # handle when no data - if frame.shape[0] == 0: - progress_callback(len(features) / float(self.num_features)) - - return frame - - groupby = features[0].groupby.get_name() - grouped = frame.groupby(groupby) - groups = frame[ - groupby - ].unique() # get all the unique group name to iterate over later - - for f in features: - feature_vals = [] - for _ in range(f.number_output_features): - feature_vals.append([]) - - for group in groups: - # skip null key if it exists - if pd.isnull(group): - continue - - column_names = [bf.get_name() for bf in f.base_features] - # exclude the groupby column from being passed to the function - column_data = [ - grouped[name].get_group(group) for name in column_names[:-1] - ] - feature_func = f.get_function() - - # apply the function to the relevant dataframe slice and add the - # feature row to the results dataframe. - if f.primitive.uses_calc_time: - values = feature_func(*column_data, time=self.time_last) - else: - values = feature_func(*column_data) - - if f.number_output_features == 1: - values = [values] + if frame.empty: + group_ids.append(pd.Series([], dtype=f.variable_type)) + continue - # make sure index is aligned - for i, value in enumerate(values): - if isinstance(value, pd.Series): - value.index = column_data[0].index - else: - value = pd.Series(value, index=column_data[0].index) - feature_vals[i].append(value) + feature_values = f.primitive.get_function()(f.column.loc[frame.index]) + group_id = f.groupby.column.loc[frame.index].values + group_ids.append( + pd.Series(feature_values, index=frame.index, name=f.get_name()), + ) - if any(feature_vals): - assert len(feature_vals) == len(f.get_feature_names()) - for col_vals, name in zip(feature_vals, f.get_feature_names()): - frame[name].update(pd.concat(col_vals)) + # if a group is all null, this sets its result to null + if (feature_values.isnull()).all(): + group_ids.append( + pd.Series([np.nan for _ in range(len(feature_values))]), + ) - progress_callback(1 / float(self.num_features)) + updated = update_feature_columns( + zip(features, group_ids), + frame, + ) - return frame + progress_callback(len(features) / float(self.num_features)) + return updated def _calculate_direct_features( self, @@ -598,300 +454,156 @@ def _calculate_direct_features( df_trie, progress_callback, ): - path = features[0].relationship_path - assert len(path) == 1, "Error calculating DirectFeatures, len(path) != 1" - - parent_df = df_trie.get_node([path[0]]).value - _is_forward, relationship = path[0] - merge_col = relationship._child_column_name - - # generate a mapping of old column names (in the parent dataframe) to - # new column names (in the child dataframe) for the merge - col_map = {relationship._parent_column_name: merge_col} - index_as_feature = None - - fillna_dict = {} + # need to grab the features from the parent dataframe to attach to the child for f in features: - feature_defaults = { - name: f.default_value - for name in f.get_feature_names() - if not pd.isna(f.default_value) - } - fillna_dict.update(feature_defaults) - if f.base_features[0].get_name() == relationship._parent_column_name: - index_as_feature = f - base_names = f.base_features[0].get_feature_names() - for name, base_name in zip(f.get_feature_names(), base_names): - if name in child_df.columns: - continue - col_map[base_name] = name - - # merge the identity feature from the parent dataframe into the child - merge_df = parent_df[list(col_map.keys())].rename(columns=col_map) - - if index_as_feature is not None: - merge_df.set_index( - index_as_feature.get_name(), - inplace=True, - drop=False, + parent_df = df_trie.get_node(f.relationship_path).value + parent_col = parent_df[[f.parent_feature.get_name()]] + parent_col.index.name = f.relationship.parent_column.ww.name + + # need to make sure the column that merges is not the index or a + # foreign key. If it is, then the column will not be unique, and + # the merge will fail. + if f.relationship.parent_column.ww.name == parent_col.index.name: + parent_col = parent_col.reset_index() + + # The child dataframe may have fewer rows than the parent dataframe. + # We must only merge in values for rows that exist in the child dataframe. + # So we grab the values from the parent dataframe's column that have the + # foreign key in the child dataframe. + child_col = child_df[[f.relationship.child_column.ww.name]] + child_col.index.name = f.relationship.child_column.ww.name + child_col = child_col.merge( + parent_col, + left_on=f.relationship.child_column.ww.name, + right_on=f.relationship.parent_column.ww.name, + how="left", ) - else: - merge_df.set_index(merge_col, inplace=True) - new_df = child_df.merge( - merge_df, - left_on=merge_col, - right_index=True, - how="left", - ) + child_df[f.get_name()] = child_col[f.parent_feature.get_name()].values progress_callback(len(features) / float(self.num_features)) - - return new_df.fillna(fillna_dict) + return child_df def _calculate_agg_features(self, features, frame, df_trie, progress_callback): - test_feature = features[0] - child_dataframe = test_feature.base_features[0].dataframe - base_frame = df_trie.get_node(test_feature.relationship_path).value - # Sometimes approximate features get computed in a previous filter frame - # and put in the current one dynamically, - # so there may be existing features here - fl = [] + groupby_col_names = [] for f in features: - for ind in f.get_feature_names(): - if ind not in frame.columns: - fl.append(f) - break - features = fl - if not len(features): - progress_callback(len(features) / float(self.num_features)) - return frame - - # handle where - base_frame_empty = base_frame.empty - where = test_feature.where - if where is not None and not base_frame_empty: - base_frame = base_frame.loc[base_frame[where.get_name()]] - - # when no child data, just add all the features to frame with nan - base_frame_empty = base_frame.empty - if base_frame_empty: - feature_values = [] - for f in features: - feature_values.append((f, np.full(f.number_output_features, np.nan))) - progress_callback(1 / float(self.num_features)) - frame = update_feature_columns(feature_values, frame) - else: - relationship_path = test_feature.relationship_path - - groupby_col = get_relationship_column_id(relationship_path) - - # if the use_previous property exists on this feature, include only the - # instances from the child dataframe included in that Timedelta - use_previous = test_feature.use_previous - if use_previous: - # Filter by use_previous values - time_last = self.time_last - if use_previous.has_no_observations(): - time_first = time_last - use_previous - ti = child_dataframe.ww.time_index - if ti is not None: - base_frame = base_frame[base_frame[ti] >= time_first] - else: - n = use_previous.get_value("o") - - def last_n(df): - return df.iloc[-n:] - - base_frame = base_frame.groupby( - groupby_col, - observed=True, - sort=False, - group_keys=False, - ).apply(last_n) - - to_agg = {} - agg_rename = {} - to_apply = set() - # apply multi-column and time-dependent features as we find them, and - # save aggregable features for later - for f in features: - if _can_agg(f): - column_id = f.base_features[0].get_name() - if column_id not in to_agg: - to_agg[column_id] = [] - func = f.get_function() - - # for some reason, using the string count is significantly - # faster than any method a primitive can return - # https://stackoverflow.com/questions/55731149/use-a-function-instead-of-string-in-pandas-groupby-agg - if func == pd.Series.count: - func = "count" - - funcname = func - if callable(func): - # if the same function is being applied to the same - # column twice, wrap it in a partial to avoid - # duplicate functions - funcname = str(id(func)) - if "{}-{}".format(column_id, funcname) in agg_rename: - func = partial(func) - funcname = str(id(func)) - - func.__name__ = funcname - - to_agg[column_id].append(func) - # this is used below to rename columns that pandas names for us - agg_rename["{}-{}".format(column_id, funcname)] = f.get_name() - continue - - to_apply.add(f) - - # Apply the non-aggregable functions generate a new dataframe, and merge - # it with the existing one - if len(to_apply): - wrap = agg_wrapper(to_apply, self.time_last) - # groupby_col can be both the name of the index and a column, - # to silence pandas warning about ambiguity we explicitly pass - # the column (in actuality grouping by both index and group would - # work) - to_merge = base_frame.groupby( - base_frame[groupby_col], - observed=True, - sort=False, - group_keys=False, - ).apply(wrap) - frame = pd.merge( - left=frame, - right=to_merge, - left_index=True, - right_index=True, - how="left", - ) - - progress_callback(len(to_apply) / float(self.num_features)) - - # Apply the aggregate functions to generate a new dataframe, and merge - # it with the existing one - if len(to_agg): - # groupby_col can be both the name of the index and a column, - # to silence pandas warning about ambiguity we explicitly pass - # the column (in actuality grouping by both index and group would - # work) - to_merge = base_frame.groupby( - base_frame[groupby_col], - observed=True, - sort=False, - ).agg(to_agg) - # rename columns to the correct feature names - to_merge.columns = [agg_rename["-".join(x)] for x in to_merge.columns] - to_merge = to_merge[list(agg_rename.values())] - - # Workaround for pandas bug where categories are in the wrong order - # see: https://github.com/pandas-dev/pandas/issues/22501 - # - # Pandas claims that bug is fixed but it still shows up in some - # cases. More investigation needed. - if isinstance(frame.index, pd.CategoricalDtype): - categories = pdtypes.CategoricalDtype( - categories=frame.index.categories, - ) - to_merge.index = to_merge.index.astype(object).astype(categories) - - frame = pd.merge( - left=frame, - right=to_merge, - left_index=True, - right_index=True, - how="left", + if f.relationship_path not in df_trie: + # This case is when no data was found for the child dataframe. + # In that situation, the default value for the feature will be used. + # Nothing needs to be done here. + continue + child_df = df_trie.get_node(f.relationship_path).value + child_df = child_df.copy() + + # if the Dask series has a name that matches an existing column + # on the dataframe being added, Woodwork will raise an error + # so we drop the name before adding + child_df.ww.name = None + + to_agg = f.base_feature.get_name() + # deal with multi-output primitives by only grabbing the one column + if isinstance(to_agg, list): + to_agg = to_agg[0] + + # the column that connects the child to the parent + groupby_col = f.relationship_path.second_to_last_dataframe.ww.name + groupby_col_name = get_relationship_column_id(groupby_col) + child_df[groupby_col_name] = child_df[ + f.relationship_path.child_column.ww.name + ].values + + # If the feature has a where clause, filter the child dataframe. + if f.where is not None: + child_df = child_df[child_df[f.where.get_name()]] + + if f.primitive.uses_previous: + # must sort and group so that we can use `pd.Series.expanding` + child_df = child_df.sort_values(f.base_dataframe.ww.time_index) + to_merge = child_df.groupby(groupby_col_name).apply( + agg_wrapper(f.primitive.get_function(), f.use_previous), ) + else: + to_merge = child_df.groupby(groupby_col_name).agg(f.primitive.get_function(), to_agg) - # determine number of features that were just merged - progress_callback(len(to_merge.columns) / float(self.num_features)) + to_merge = to_merge.reset_index() - # Handle default values - fillna_dict = {} - for f in features: - feature_defaults = {name: f.default_value for name in f.get_feature_names()} - fillna_dict.update(feature_defaults) + if isinstance(to_merge, pd.Series): + to_merge = to_merge.to_frame() - frame = frame.fillna(fillna_dict) + # if a primitive returns multiple columns, add all to the dataframe + for col_name in f.get_feature_names(): + if col_name in to_merge.columns: + values = to_merge[[groupby_col_name, col_name]] + else: + # if the name doesn't match then it must be because it is a + # multi-output primitive and this output has no value + # (e.g. NMostCommon when n > number of unique values) + values = to_merge[groupby_col_name] + values = pd.DataFrame(values) + values[col_name] = np.nan + values = values.set_index(groupby_col_name) + values = values.reindex(frame.index) + frame[col_name] = values[col_name].values + progress_callback(len(features) / float(self.num_features)) return frame def _necessary_columns(self, dataframe_name, feature_names): # We have to keep all index and foreign columns because we don't know what forward # relationships will come from this node. - df = self.entityset[dataframe_name] - index_columns = { - col - for col in df.columns - if {"index", "foreign_key", "time_index"} & df.ww.semantic_tags[col] - } - features = (self.feature_set.features_by_name[name] for name in feature_names) + columns = set() + dataframe = self.entityset[dataframe_name] + columns.add(dataframe.ww.index) + if dataframe.ww.time_index: + columns.add(dataframe.ww.time_index) + for col in dataframe.ww.foreign_keys: + columns.add(col) + for f in feature_names: + columns.add(f.base_dataframe.ww.name) + if f.where is not None: + columns.add(f.where.base_dataframe.ww.name) - feature_columns = { - f.column_name for f in features if isinstance(f, IdentityFeature) - } - return list(index_columns | feature_columns) + return list(columns) def _can_agg(feature): - assert isinstance(feature, AggregationFeature) - base_features = feature.base_features - if feature.where is not None: - base_features = [ - bf.get_name() - for bf in base_features - if bf.get_name() != feature.where.get_name() - ] - - if feature.primitive.uses_calc_time: - return False - single_output = feature.primitive.number_output_features == 1 - return len(base_features) == 1 and single_output + # can't agg if there is an agg in the path that doesn't have + # use_previous set. + if feature.is_end_node: + return True + return False def agg_wrapper(feats, time_last): def wrap(df): - d = {} - feature_values = [] - for f in feats: - func = f.get_function() - column_ids = [bf.get_name() for bf in f.base_features] - args = [df[v] for v in column_ids] - - if f.primitive.uses_calc_time: - values = func(*args, time=time_last) - else: - values = func(*args) - - if f.number_output_features == 1: - values = [values] - feature_values.append((f, values)) - - d = update_feature_columns(feature_values, d) - - return pd.Series(d) + # if there are no events in the window, return 0 + if df.empty: + return pd.Series([0 for _ in range(len(feats.get_feature_names()))]) + # if there is only one event, the output of the primitive could be a scalar + # or a series. If it is a scalar, we must return a series with one value. + # If it is a series with one value, we must make sure the name is None + # so that it does not conflict with the feature names. + result = feats(df) + if isinstance(result, pd.Series): + if result.empty: + return pd.Series([0 for _ in range(len(feats.get_feature_names()))]) + result.name = None + return result + return pd.Series([result]) return wrap def update_feature_columns(feature_data, data): - new_cols = {} - for item in feature_data: - names = item[0].get_feature_names() - values = item[1] - assert len(names) == len(values) - for name, value in zip(names, values): - new_cols[name] = value - - # Handle the case where a dict is being updated - if isinstance(data, dict): - data.update(new_cols) - return data - - return pd.concat([data, pd.DataFrame(new_cols, index=data.index)], axis=1) + for f, feature_values in feature_data: + name = f.get_name() + if name not in data.columns: + if isinstance(feature_values, pd.Series): + feature_values = feature_values.to_frame(name=name) + data[name] = feature_values + else: + data[name].update(feature_values) + return data def strip_values_if_series(values): diff --git a/featuretools/entityset/entityset.py b/featuretools/entityset/entityset.py index 9984725264..1c95348b63 100644 --- a/featuretools/entityset/entityset.py +++ b/featuretools/entityset/entityset.py @@ -248,7 +248,7 @@ def to_csv( path (str) : Location on disk to write to (will be created as a directory) sep (str) : String of length 1. Field delimiter for the output file. encoding (str) : A string representing the encoding to use in the output file, defaults to 'utf-8'. - engine (str) : Name of the engine to use. Possible values are: {'c', 'python'}. + engine (str) : Name of the engine to use. Possible values are: {'python', 'c', 'pyarrow'}. compression (str) : Name of the compression to use. Possible values are: {'gzip', 'bz2', 'zip', 'xz', None}. profile_name (str) : Name of AWS profile to use, False to use an anonymous profile, or None. """ @@ -256,7 +256,6 @@ def to_csv( self, path, format="csv", - index=False, sep=sep, encoding=encoding, engine=engine, @@ -268,51 +267,14 @@ def to_csv( def to_dictionary(self): return serialize.entityset_to_description(self) - ########################################################################### - # Public getter/setter methods ######################################### - ########################################################################### - def __repr__(self): - repr_out = "Entityset: {}\n".format(self.id) - repr_out += " DataFrames:" - for df in self.dataframes: - if df.shape: - repr_out += "\n {} [Rows: {}, Columns: {}]".format( - df.ww.name, - df.shape[0], - df.shape[1], - ) - else: - repr_out += "\n {} [Rows: None, Columns: None]".format(df.ww.name) - repr_out += "\n Relationships:" - - if len(self.relationships) == 0: - repr_out += "\n No relationships" - - for r in self.relationships: - repr_out += "\n %s.%s -> %s.%s" % ( - r._child_dataframe_name, - r._child_column_name, - r._parent_dataframe_name, - r._parent_column_name, - ) - - return repr_out + """Print string representation of EntitySet""" + return serialize.entityset_to_json(self.metadata) def add_relationships(self, relationships): - """Add multiple new relationships to a entityset - - Args: - relationships (list[tuple(str, str, str, str)] or list[Relationship]) : List of - new relationships to add. Relationships are specified either as a :class:`.Relationship` - object or a four element tuple identifying the parent and child columns: - (parent_dataframe_name, parent_column_name, child_dataframe_name, child_column_name) - """ for rel in relationships: - if isinstance(rel, Relationship): - self.add_relationship(relationship=rel) - else: - self.add_relationship(*rel) + self.add_relationship(relationship=rel) + return self def add_relationship( @@ -323,30 +285,7 @@ def add_relationship( child_column_name=None, relationship=None, ): - """Add a new relationship between dataframes in the entityset. Relationships can be specified - by passing dataframe and columns names or by passing a :class:`.Relationship` object. - - Args: - parent_dataframe_name (str): Name of the parent dataframe in the EntitySet. Must be specified - if relationship is not. - parent_column_name (str): Name of the parent column. Must be specified if relationship is not. - child_dataframe_name (str): Name of the child dataframe in the EntitySet. Must be specified - if relationship is not. - child_column_name (str): Name of the child column. Must be specified if relationship is not. - relationship (Relationship): Instance of new relationship to be added. Must be specified - if dataframe and column names are not supplied. - """ - if relationship and ( - parent_dataframe_name - or parent_column_name - or child_dataframe_name - or child_column_name - ): - raise ValueError( - "Cannot specify dataframe and column name values and also supply a Relationship", - ) - - if not relationship: + if relationship is None: relationship = Relationship( self, parent_dataframe_name, @@ -354,234 +293,100 @@ def add_relationship( child_dataframe_name, child_column_name, ) - if relationship in self.relationships: - warnings.warn("Not adding duplicate relationship: " + str(relationship)) - return self - - # _operations? - - # this is a new pair of dataframes - child_df = relationship.child_dataframe - child_column = relationship._child_column_name - if child_df.ww.index == child_column: - msg = "Unable to add relationship because child column '{}' in '{}' is also its index" - raise ValueError(msg.format(child_column, child_df.ww.name)) - parent_df = relationship.parent_dataframe - parent_column = relationship._parent_column_name - - if parent_df.ww.index != parent_column: - parent_df.ww.set_index(parent_column) - - # Empty dataframes (as a result of accessing metadata) - # default to object dtypes for categorical columns, but - # indexes/foreign keys default to ints. In this case, we convert - # the empty column's type to int - if ( - child_df.empty - and child_df[child_column].dtype == object - and parent_df.ww.columns[parent_column].is_numeric - ): - child_df.ww[child_column] = pd.Series(name=child_column, dtype=np.int64) - - parent_ltype = parent_df.ww.logical_types[parent_column] - child_ltype = child_df.ww.logical_types[child_column] - if parent_ltype != child_ltype: - difference_msg = "" - if str(parent_ltype) == str(child_ltype): - difference_msg = "There is a conflict between the parameters. " - - warnings.warn( - f"Logical type {child_ltype} for child column {child_column} does not match " - f"parent column {parent_column} logical type {parent_ltype}. {difference_msg}" - "Changing child logical type to match parent.", - ) - child_df.ww.set_types(logical_types={child_column: parent_ltype}) - if "foreign_key" not in child_df.ww.semantic_tags[child_column]: - child_df.ww.add_semantic_tags({child_column: "foreign_key"}) + if relationship not in self.relationships: + self.relationships.append(relationship) - self.relationships.append(relationship) self.reset_data_description() return self def set_secondary_time_index(self, dataframe_name, secondary_time_index): - """ - Set the secondary time index for a dataframe in the EntitySet using its dataframe name. - - Args: - dataframe_name (str) : name of the dataframe for which to set the secondary time index. - secondary_time_index (dict[str-> list[str]]): Name of column containing time data to - be used as a secondary time index mapped to a list of the columns in the dataframe - associated with that secondary time index. - """ - dataframe = self[dataframe_name] - self._set_secondary_time_index(dataframe, secondary_time_index) + self._set_secondary_time_index(self[dataframe_name], secondary_time_index) + self.reset_data_description() + return self def _set_secondary_time_index(self, dataframe, secondary_time_index): - """Sets the secondary time index for a Woodwork dataframe passed in""" - assert ( - dataframe.ww.schema is not None - ), "Cannot set secondary time index if Woodwork is not initialized" - self._check_secondary_time_index(dataframe, secondary_time_index) - if secondary_time_index is not None: - dataframe.ww.metadata["secondary_time_index"] = secondary_time_index - - ########################################################################### - # Relationship access/helper methods ################################### - ########################################################################### + if secondary_time_index: + dataframe.ww.set_secondary_time_index(secondary_time_index) + self._add_references_to_metadata(dataframe) def find_forward_paths(self, start_dataframe_name, goal_dataframe_name): - """ - Generator which yields all forward paths between a start and goal - dataframe. Does not include paths which contain cycles. - - Args: - start_dataframe_name (str) : name of dataframe to start the search from - goal_dataframe_name (str) : name of dataframe to find forward path to - - See Also: - :func:`BaseEntitySet.find_backward_paths` - """ - for sub_dataframe_name, path in self._forward_dataframe_paths( + # check all relationships in dataframe for a path to other dataframe + return self.metadata.find_forward_paths( start_dataframe_name, - ): - if sub_dataframe_name == goal_dataframe_name: - yield path + goal_dataframe_name, + ) def find_backward_paths(self, start_dataframe_name, goal_dataframe_name): - """ - Generator which yields all backward paths between a start and goal - dataframe. Does not include paths which contain cycles. - - Args: - start_dataframe_name (str) : Name of dataframe to start the search from. - goal_dataframe_name (str) : Name of dataframe to find backward path to. - - See Also: - :func:`BaseEntitySet.find_forward_paths` - """ - for path in self.find_forward_paths(goal_dataframe_name, start_dataframe_name): - # Reverse path - yield path[::-1] + return self.metadata.find_backward_paths( + start_dataframe_name, + goal_dataframe_name, + ) def _forward_dataframe_paths(self, start_dataframe_name, seen_dataframes=None): """ - Generator which yields the names of all dataframes connected through forward - relationships, and the path taken to each. A dataframe will be yielded - multiple times if there are multiple paths to it. - - Implemented using depth first search. + Helper function which returns a list of dataframes which can be accessed from + the start dataframe by forward relationships """ if seen_dataframes is None: seen_dataframes = set() - if start_dataframe_name in seen_dataframes: - return - + return [] seen_dataframes.add(start_dataframe_name) - yield start_dataframe_name, [] - + all_dataframes = [] for relationship in self.get_forward_relationships(start_dataframe_name): - next_dataframe = relationship._parent_dataframe_name - # Copy seen dataframes for each next node to allow multiple paths (but - # not cycles). - descendants = self._forward_dataframe_paths( - next_dataframe, - seen_dataframes.copy(), + new_dataframe = relationship.child_dataframe.ww.name + all_dataframes.append(new_dataframe) + all_dataframes.extend( + self._forward_dataframe_paths(new_dataframe, seen_dataframes), ) - for sub_dataframe_name, sub_path in descendants: - yield sub_dataframe_name, [relationship] + sub_path + return all_dataframes def get_forward_dataframes(self, dataframe_name, deep=False): - """ - Get dataframes that are in a forward relationship with dataframe - - Args: - dataframe_name (str): Name of dataframe to search from. - deep (bool): if True, recursively find forward dataframes. - - Yields a tuple of (descendent_name, path from dataframe_name to descendant). - """ - for relationship in self.get_forward_relationships(dataframe_name): - parent_dataframe_name = relationship._parent_dataframe_name - direct_path = RelationshipPath([(True, relationship)]) - yield parent_dataframe_name, direct_path - - if deep: - sub_dataframes = self.get_forward_dataframes( - parent_dataframe_name, - deep=True, - ) - for sub_dataframe_name, path in sub_dataframes: - yield sub_dataframe_name, direct_path + path + if deep: + return self._forward_dataframe_paths(dataframe_name) + return [r.child_dataframe.ww.name for r in self.get_forward_relationships(dataframe_name)] def get_backward_dataframes(self, dataframe_name, deep=False): - """ - Get dataframes that are in a backward relationship with dataframe - - Args: - dataframe_name (str): Name of dataframe to search from. - deep (bool): if True, recursively find backward dataframes. - - Yields a tuple of (descendent_name, path from dataframe_name to descendant). - """ - for relationship in self.get_backward_relationships(dataframe_name): - child_dataframe_name = relationship._child_dataframe_name - direct_path = RelationshipPath([(False, relationship)]) - yield child_dataframe_name, direct_path - - if deep: - sub_dataframes = self.get_backward_dataframes( - child_dataframe_name, - deep=True, - ) - for sub_dataframe_name, path in sub_dataframes: - yield sub_dataframe_name, direct_path + path - - def get_forward_relationships(self, dataframe_name): - """Get relationships where dataframe "dataframe_name" is the child - - Args: - dataframe_name (str): Name of dataframe to get relationships for. - - Returns: - list[:class:`.Relationship`]: List of forward relationships. - """ + if deep: + return self._backward_dataframe_paths(dataframe_name) return [ - r for r in self.relationships if r._child_dataframe_name == dataframe_name + r.parent_dataframe.ww.name for r in self.get_backward_relationships(dataframe_name) ] - def get_backward_relationships(self, dataframe_name): + def _backward_dataframe_paths(self, start_dataframe_name, seen_dataframes=None): """ - get relationships where dataframe "dataframe_name" is the parent. - - Args: - dataframe_name (str): Name of dataframe to get relationships for. - - Returns: - list[:class:`.Relationship`]: list of backward relationships + Helper function which returns a list of dataframes which can be accessed from + the start dataframe by backward relationships """ - return [ - r for r in self.relationships if r._parent_dataframe_name == dataframe_name - ] - - def has_unique_forward_path(self, start_dataframe_name, end_dataframe_name): - """ - Is the forward path from start to end unique? + if seen_dataframes is None: + seen_dataframes = set() + if start_dataframe_name in seen_dataframes: + return [] + seen_dataframes.add(start_dataframe_name) - This will raise if there is no such path. - """ - paths = self.find_forward_paths(start_dataframe_name, end_dataframe_name) + all_dataframes = [] + for relationship in self.get_backward_relationships(start_dataframe_name): + new_dataframe = relationship.parent_dataframe.ww.name + all_dataframes.append(new_dataframe) + all_dataframes.extend( + self._backward_dataframe_paths(new_dataframe, seen_dataframes), + ) + return all_dataframes - next(paths) - second_path = next(paths, None) + def get_forward_relationships(self, dataframe_name): + return [r for r in self.relationships if r.parent_dataframe.ww.name == dataframe_name] - return not second_path + def get_backward_relationships(self, dataframe_name): + return [r for r in self.relationships if r.child_dataframe.ww.name == dataframe_name] - ########################################################################### - # DataFrame creation methods ############################################## - ########################################################################### + def has_unique_forward_path(self, start_dataframe_name, end_dataframe_name): + # check if there is one unique forward path between start and end dataframes + return self.metadata.has_unique_forward_path( + start_dataframe_name, + end_dataframe_name, + ) def add_dataframe( self, @@ -595,157 +400,75 @@ def add_dataframe( secondary_time_index=None, already_sorted=False, ): - """ - Add a DataFrame to the EntitySet with Woodwork typing information. + """Adds a dataframe to the EntitySet. Args: - dataframe (pandas.DataFrame) : Dataframe containing the data. - - dataframe_name (str, optional) : Unique name to associate with this dataframe. Must be - provided if Woodwork is not initialized on the input DataFrame. - - index (str, optional): Name of the column used to index the dataframe. - Must be unique. If None, take the first column. - - logical_types (dict[str -> Woodwork.LogicalTypes/str, optional]): - Keys are column names and values are logical types. Will be inferred if not specified. - + dataframe (DataFrame): DataFrame to add. + dataframe_name (str): Name of the dataframe to add. If Woodwork DataFrame is supplied, this parameter + will be ignored. + index (str): Column in the DataFrame to use as the index. + If no index is supplied, one is created automatically as the + dataframe's Woodwork index. If Woodwork DataFrame is supplied, this parameter + will be ignored. + logical_types (dict[str -> str/Woodwork.LogicalType], optional): + Dictionary mapping column names to the LogicalType that their + data should be interpreted as. If Woodwork DataFrame is supplied, this parameter + will be ignored. semantic_tags (dict[str -> str/set], optional): - Keys are column names and values are semantic tags. - - make_index (bool, optional) : If True, assume index does not - exist as a column in dataframe, and create a new column of that name - using integers. Otherwise, assume index exists. - - time_index (str, optional): Name of the column containing - time data. Type must be numeric or datetime in nature. - - secondary_time_index (dict[str -> list[str]]): Name of column containing time data to - be used as a secondary time index mapped to a list of the columns in the dataframe - associated with that secondary time index. - - already_sorted (bool, optional) : If True, assumes that input dataframe - is already sorted by time. Defaults to False. - - Notes: + Dictionary mapping column names to the semantic tags that their + data should be tagged with. If Woodwork DataFrame is supplied, this parameter + will be ignored. + make_index (bool): Whether to add a new column with unique integer values + to be used as the index. If Woodwork DataFrame is supplied, this parameter + will be ignored. + time_index (str, optional): Column in the DataFrame to use as the time + index. If Woodwork DataFrame is supplied, this parameter will be ignored. + secondary_time_index (dict[str->[str]], optional): Dictionary of secondary time index + columns, mapping a column name to a list of columns that must be empty + for that time index to be valid. If Woodwork DataFrame is supplied, this parameter + will be ignored. + already_sorted (bool): If true, dataframe is assumed to be sorted by time_index + and no additional sorting will be performed. + If Woodwork DataFrame is supplied, this parameter will be ignored. - Will infer logical types from the data. + Returns: + :class:`.EntitySet` : Instance of the calling EntitySet Example: - .. ipython:: python - - import featuretools as ft - import pandas as pd - transactions_df = pd.DataFrame({"id": [1, 2, 3, 4, 5, 6], - "session_id": [1, 2, 1, 3, 4, 5], - "amount": [100.40, 20.63, 33.32, 13.12, 67.22, 1.00], - "transaction_time": pd.date_range(start="10:00", periods=6, freq="10s"), - "fraud": [True, False, True, False, True, True]}) - es = ft.EntitySet("example") - es.add_dataframe(dataframe_name="transactions", - index="id", - time_index="transaction_time", - dataframe=transactions_df) + .. code-block:: python - es["transactions"] + es = ft.EntitySet(id="my_entity_set") + es.add_dataframe(dataframe_name="transactions", + dataframe=transactions_df, + index="transaction_id", + time_index="transaction_time") + es["transactions"].ww """ - logical_types = logical_types or {} - semantic_tags = semantic_tags or {} - - if len(self.dataframes) > 0: - if not isinstance(dataframe, type(self.dataframes[0])): - raise ValueError( - "All dataframes must be of the same type. " - "Cannot add dataframe of type {} to an entityset with existing dataframes " - "of type {}".format(type(dataframe), type(self.dataframes[0])), - ) - - # Only allow string column names - non_string_names = [ - name for name in dataframe.columns if not isinstance(name, str) - ] - if non_string_names: - raise ValueError( - "All column names must be strings (Columns {} " - "are not strings)".format(non_string_names), - ) - - if dataframe.ww.schema is None: - if dataframe_name is None: + # If Woodwork init was done outside of add_dataframe, ensure that no parameters conflict with Woodwork + # metadata + if dataframe.ww.schema is not None: + if dataframe_name is not None and dataframe_name != dataframe.ww.name: raise ValueError( - "Cannot add dataframe to EntitySet without a name. " - "Please provide a value for the dataframe_name parameter.", + f"Naming conflict: dataframe_name '{dataframe_name}' does not match Woodwork name '{dataframe.ww.name}'", ) - - index_was_created, index, dataframe = _get_or_create_index( - index, - make_index, - dataframe, - ) - - dataframe.ww.init( - name=dataframe_name, - index=index, - time_index=time_index, - logical_types=logical_types, - semantic_tags=semantic_tags, - already_sorted=already_sorted, - ) - if index_was_created: - dataframe.ww.metadata["created_index"] = index - - else: - if dataframe.ww.name is None: + if index is not None and index != dataframe.ww.index: raise ValueError( - "Cannot add a Woodwork DataFrame to EntitySet without a name", + f"Index conflict: index '{index}' does not match Woodwork index '{dataframe.ww.index}'", ) - if dataframe.ww.index is None: + if time_index is not None and time_index != dataframe.ww.time_index: raise ValueError( - "Cannot add Woodwork DataFrame to EntitySet without index", - ) - - extra_params = [] - if index is not None: - extra_params.append("index") - if time_index is not None: - extra_params.append("time_index") - if logical_types: - extra_params.append("logical_types") - if make_index: - extra_params.append("make_index") - if semantic_tags: - extra_params.append("semantic_tags") - if already_sorted: - extra_params.append("already_sorted") - if dataframe_name is not None and dataframe_name != dataframe.ww.name: - extra_params.append("dataframe_name") - if extra_params: - warnings.warn( - "A Woodwork-initialized DataFrame was provided, so the following parameters were ignored: " - + ", ".join(extra_params), + f"Time index conflict: time_index '{time_index}' does not match Woodwork time index '{dataframe.ww.time_index}'", ) - if dataframe.ww.time_index is not None: - self._check_uniform_time_index(dataframe) - self._check_secondary_time_index(dataframe) - - if secondary_time_index: - self._set_secondary_time_index( - dataframe, - secondary_time_index=secondary_time_index, - ) - - dataframe = self._normalize_values(dataframe) - self.dataframe_dict[dataframe.ww.name] = dataframe - self.reset_data_description() - self._add_references_to_metadata(dataframe) + self._add_references_to_metadata(dataframe) + self.reset_data_description() return self def __setitem__(self, key, value): - self.add_dataframe(dataframe=value, dataframe_name=key) + self.add_dataframe(key, value) def normalize_dataframe( self, @@ -759,511 +482,228 @@ def normalize_dataframe( new_dataframe_time_index=None, new_dataframe_secondary_time_index=None, ): - """Create a new dataframe and relationship from unique values of an existing column. + """Normalizes an existing dataframe by creating a new dataframe and + a relationship between them. Args: - base_dataframe_name (str) : Dataframe name from which to split. - - new_dataframe_name (str): Name of the new dataframe. - - index (str): Column in old dataframe - that will become index of new dataframe. Relationship - will be created across this column. - - additional_columns (list[str]): - List of column names to remove from - base_dataframe and move to new dataframe. - - copy_columns (list[str]): List of - column names to copy from old dataframe - and move to new dataframe. + base_dataframe_name (str): The name of the dataframe to normalize. + new_dataframe_name (str): The name of the new dataframe. + index (str): The column in the base dataframe that will become the + index of the new dataframe. + additional_columns (list[str], optional): List of columns to move + from the base dataframe to the new dataframe. Can be a string + or a list of strings. + copy_columns (list[str], optional): List of columns to copy from the base + dataframe to the new dataframe. Can be a string or a list of + strings. + make_time_index (bool): Whether to use the time_index of the base_dataframe as the + time_index of the new_dataframe. If no time_index is set on the base_dataframe, + this parameter is ignored. Defaults to true. + make_secondary_time_index (dict[str->[str]], optional): List of secondary time index + columns to move from the base dataframe to the new dataframe and use as secondary + time indexes in the new dataframe. + new_dataframe_time_index (str, optional): The column in the new dataframe to be used as its time_index. + If this is set, make_time_index is ignored. + new_dataframe_secondary_time_index (dict[str->[str]], optional): + Dictionary of secondary time index columns to use as secondary time indexes + in the new dataframe. If this is set, make_secondary_time_index is ignored. - make_time_index (bool or str, optional): Create time index for new dataframe based - on time index in base_dataframe, optionally specifying which column in base_dataframe - to use for time_index. If specified as True without a specific column name, - uses the primary time index. Defaults to True if base dataframe has a time index. - - make_secondary_time_index (dict[str -> list[str]], optional): Create a secondary time index - from key. Values of dictionary are the columns to associate with a secondary time index. - Only one secondary time index is allowed. If None, only associate the time index. - - new_dataframe_time_index (str, optional): Rename new dataframe time index. - - new_dataframe_secondary_time_index (str, optional): Rename new dataframe secondary time index. + Returns: + :class:`.EntitySet` : Instance of the calling EntitySet - """ - base_dataframe = self.dataframe_dict[base_dataframe_name] - additional_columns = additional_columns or [] - copy_columns = copy_columns or [] - - for list_name, col_list in { - "copy_columns": copy_columns, - "additional_columns": additional_columns, - }.items(): - if not isinstance(col_list, list): - raise TypeError( - "'{}' must be a list, but received type {}".format( - list_name, - type(col_list), - ), - ) - if len(col_list) != len(set(col_list)): - raise ValueError( - f"'{list_name}' contains duplicate columns. All columns must be unique.", - ) - for col_name in col_list: - if col_name == index: - raise ValueError( - "Not adding {} as both index and column in {}".format( - col_name, - list_name, - ), - ) + Example: + .. code-block:: python + es = ft.EntitySet(id="my_entity_set") + es.add_dataframe(dataframe_name="transactions", + dataframe=transactions_df, + index="transaction_id", + time_index="transaction_time") + es.normalize_dataframe(new_dataframe_name="products", + base_dataframe_name="transactions", + index="product_id", + additional_columns=["price", "rating"]) + """ + + # TODO: This method has a lot of side effects, need to reduce them + base_df = self[base_dataframe_name] + + if additional_columns is None: + additional_columns = [] + if not isinstance(additional_columns, list): + additional_columns = [additional_columns] + + if copy_columns is None: + copy_columns = [] + if not isinstance(copy_columns, list): + copy_columns = [copy_columns] + + # 1. Create new dataframe + # make sure new_df does not include the base_dataframe's index + # if it is one of the additional_columns + new_df = base_df.ww.pop(index) for col in additional_columns: - if col == base_dataframe.ww.time_index: - raise ValueError( - "Not moving {} as it is the base time index column. Perhaps, move the column to the copy_columns.".format( - col, - ), - ) - - if isinstance(make_time_index, str): - if make_time_index not in base_dataframe.columns: - raise ValueError( - "'make_time_index' must be a column in the base dataframe", - ) - elif make_time_index not in additional_columns + copy_columns: - raise ValueError( - "'make_time_index' must be specified in 'additional_columns' or 'copy_columns'", - ) - if index == base_dataframe.ww.index: - raise ValueError( - "'index' must be different from the index column of the base dataframe", - ) - - transfer_types = {} - # Types will be a tuple of (logical_type, semantic_tags, column_metadata, column_description) - transfer_types[index] = ( - base_dataframe.ww.logical_types[index], - base_dataframe.ww.semantic_tags[index], - base_dataframe.ww.columns[index].metadata, - base_dataframe.ww.columns[index].description, - ) - for col_name in additional_columns + copy_columns: - # Remove any existing time index tags - transfer_types[col_name] = ( - base_dataframe.ww.logical_types[col_name], - (base_dataframe.ww.semantic_tags[col_name] - {"time_index"}), - base_dataframe.ww.columns[col_name].metadata, - base_dataframe.ww.columns[col_name].description, - ) - - # create and add new dataframe - new_dataframe = self[base_dataframe_name].copy() - - if make_time_index is None and base_dataframe.ww.time_index is not None: - make_time_index = True - - if isinstance(make_time_index, str): - # Set the new time index to make_time_index. - base_time_index = make_time_index - new_dataframe_time_index = make_time_index - already_sorted = new_dataframe_time_index == base_dataframe.ww.time_index - elif make_time_index: - # Create a new time index based on the base dataframe time index. - base_time_index = base_dataframe.ww.time_index - if new_dataframe_time_index is None: - new_dataframe_time_index = "first_%s_time" % (base_dataframe.ww.name) - - already_sorted = True - - assert ( - base_dataframe.ww.time_index is not None - ), "Base dataframe doesn't have time_index defined" - - if base_time_index not in [col for col in copy_columns]: - copy_columns.append(base_time_index) - - time_index_types = ( - base_dataframe.ww.logical_types[base_dataframe.ww.time_index], - base_dataframe.ww.semantic_tags[base_dataframe.ww.time_index], - base_dataframe.ww.columns[base_dataframe.ww.time_index].metadata, - base_dataframe.ww.columns[base_dataframe.ww.time_index].description, - ) - else: - # If base_time_index is in copy_columns then we've already added the transfer types - # but since we're changing the name, we have to remove it - time_index_types = transfer_types[base_dataframe.ww.time_index] - del transfer_types[base_dataframe.ww.time_index] - - transfer_types[new_dataframe_time_index] = time_index_types - - else: - new_dataframe_time_index = None - already_sorted = False - - if new_dataframe_time_index is not None and new_dataframe_time_index == index: - raise ValueError( - "time_index and index cannot be the same value, %s" - % (new_dataframe_time_index), - ) - - selected_columns = ( - [index] - + [col for col in additional_columns] - + [col for col in copy_columns] - ) - - new_dataframe = new_dataframe.dropna(subset=[index]) - new_dataframe2 = new_dataframe.drop_duplicates(index, keep="first")[ - selected_columns - ] - - if make_time_index: - new_dataframe2 = new_dataframe2.rename( - columns={base_time_index: new_dataframe_time_index}, - ) - if make_secondary_time_index: - assert ( - len(make_secondary_time_index) == 1 - ), "Can only provide 1 secondary time index" - secondary_time_index = list(make_secondary_time_index.keys())[0] - - secondary_columns = [index, secondary_time_index] + list( - make_secondary_time_index.values(), - )[0] - secondary_df = new_dataframe.drop_duplicates(index, keep="last")[ - secondary_columns - ] - if new_dataframe_secondary_time_index: - secondary_df = secondary_df.rename( - columns={secondary_time_index: new_dataframe_secondary_time_index}, + new_df[col] = base_df.ww.pop(col) + new_df = new_df.ww.copy_dataframe(include_index=False, include_time_index=False) + new_df.index = new_df.index.astype(base_df.ww.index_type) + new_df.index.name = index + new_df = new_df.ww.init(logical_types={index: "index"}) + + time_index = None + if new_dataframe_time_index: + time_index = new_dataframe_time_index + elif make_time_index is None or make_time_index: + time_index = base_df.ww.time_index + + # handle secondary time indexes + secondary_time_index = None + if new_dataframe_secondary_time_index: + secondary_time_index = new_dataframe_secondary_time_index + elif make_secondary_time_index: + secondary_time_index = {} + for col in make_secondary_time_index: + base_df.ww.pop(col) + secondary_time_index[col] = base_df.ww.schema.secondary_time_indexes.get( + col, + None, ) - secondary_time_index = new_dataframe_secondary_time_index - else: - new_dataframe_secondary_time_index = secondary_time_index - secondary_df = secondary_df.set_index(index) - new_dataframe = new_dataframe2.join(secondary_df, on=index) - else: - new_dataframe = new_dataframe2 - - base_dataframe_index = index - - if make_secondary_time_index: - old_ti_name = list(make_secondary_time_index.keys())[0] - ti_cols = list(make_secondary_time_index.values())[0] - ti_cols = [c if c != old_ti_name else secondary_time_index for c in ti_cols] - make_secondary_time_index = {secondary_time_index: ti_cols} - - # will initialize Woodwork on this DataFrame - logical_types = {} - semantic_tags = {} - column_metadata = {} - column_descriptions = {} - for col_name, (ltype, tags, metadata, description) in transfer_types.items(): - logical_types[col_name] = ltype - semantic_tags[col_name] = tags - {"time_index"} - column_metadata[col_name] = copy.deepcopy(metadata) - column_descriptions[col_name] = description - - new_dataframe.ww.init( - name=new_dataframe_name, - index=index, - already_sorted=already_sorted, - time_index=new_dataframe_time_index, - logical_types=logical_types, - semantic_tags=semantic_tags, - column_metadata=column_metadata, - column_descriptions=column_descriptions, - ) + # Add new dataframe to entityset self.add_dataframe( - new_dataframe, - secondary_time_index=make_secondary_time_index, + dataframe=new_df, + dataframe_name=new_dataframe_name, + index=index, + time_index=time_index, + secondary_time_index=secondary_time_index, ) - self.dataframe_dict[base_dataframe_name] = self.dataframe_dict[ - base_dataframe_name - ].ww.drop(additional_columns) + # 2. Update existing dataframe + # remove index column from base_dataframe, if it wasn't removed already + if index != base_df.ww.index and index in base_df.columns: + base_df.ww.pop(index) + # add relationship column if it was not already present in the original dataframe + if index not in base_df.columns: + new_col = base_df.ww.add.relationship(index, new_dataframe_name) + new_col.ww.add.semantic_tags("foreign_key") + base_df[index] = new_col + # Move the old column from base dataframe to new dataframe + # to allow Woodwork to use its own initialization to infer logical types + # of the new dataframe. + base_df.ww.set_logical_type(index, "id") + for col in copy_columns: + base_df.ww.add.relationship(col, new_dataframe_name) + base_df.ww.set_logical_type(col, "id") + + # 3. Create relationship + self.add_relationship(new_dataframe_name, index, base_dataframe_name, index) - self.dataframe_dict[base_dataframe_name].ww.add_semantic_tags( - {base_dataframe_index: "foreign_key"}, - ) - - self.add_relationship( - new_dataframe_name, - index, - base_dataframe_name, - base_dataframe_index, - ) self.reset_data_description() - return self - # ########################################################################### - # # Data wrangling methods ############################################### - # ########################################################################### + return self def concat(self, other, inplace=False): - """Combine entityset with another to create a new entityset with the - combined data of both entitysets. """ - if not self.__eq__(other): - raise ValueError( - "Entitysets must have the same dataframes, relationships" - ", and column names", - ) + Combines two entitysets together by concatenating all dataframes with the same name. - if inplace: - combined_es = self - else: - combined_es = copy.deepcopy(self) - - has_last_time_index = [] - for df in self.dataframes: - self_df = df - other_df = other[df.ww.name] - combined_df = pd.concat([self_df, other_df]) - # If both DataFrames have made indexes, there will likely - # be overlap in the index column, so we use the other values - if self_df.ww.metadata.get("created_index") or other_df.ww.metadata.get( - "created_index", - ): - columns = [ - col - for col in combined_df.columns - if col != df.ww.index or col != df.ww.time_index - ] - else: - columns = [df.ww.index] - combined_df.drop_duplicates(columns, inplace=True) - - self_lti_col = df.ww.metadata.get("last_time_index") - other_lti_col = other[df.ww.name].ww.metadata.get("last_time_index") - if self_lti_col is not None or other_lti_col is not None: - has_last_time_index.append(df.ww.name) - - combined_es.replace_dataframe( - dataframe_name=df.ww.name, - df=combined_df, - recalculate_last_time_indexes=False, - already_sorted=False, - ) - - if has_last_time_index: - combined_es.add_last_time_indexes(updated_dataframes=has_last_time_index) - - combined_es.reset_data_description() - - return combined_es + Args: + other (EntitySet): EntitySet to concat with. + inplace (bool): If True, update the calling EntitySet in place. Otherwise, return a new EntitySet. - ########################################################################### - # Indexing methods ############################################### - ########################################################################### - def add_last_time_indexes(self, updated_dataframes=None): + Returns: + :class:`.EntitySet` : A new EntitySet with concatenated dataframes. """ - Calculates the last time index values for each dataframe (the last time - an instance or children of that instance were observed). Used when - calculating features using training windows. Adds the last time index as - a series named _ft_last_time on the dataframe. + if not isinstance(other, type(self)): + raise TypeError("Cannot concat %s with %s" % (type(self), type(other))) - Args: - updated_dataframes (list[str]): List of dataframe names to update last_time_index for - (will update all parents of those dataframes as well) - """ - # Generate graph of dataframes to find leaf dataframes - children = defaultdict(list) # parent --> child mapping - child_cols = defaultdict(dict) - for r in self.relationships: - children[r._parent_dataframe_name].append(r.child_dataframe) - child_cols[r._parent_dataframe_name][r._child_dataframe_name] = ( - r.child_column - ) + if self.id == other.id: + raise ValueError("Cannot concat an EntitySet with itself") - updated_dataframes = updated_dataframes or [] - if updated_dataframes: - # find parents of updated_dataframes - parent_queue = updated_dataframes[:] - parents = set() - while len(parent_queue): - df_name = parent_queue.pop(0) - if df_name in parents: - continue - parents.add(df_name) - - for parent_name, _ in self.get_forward_dataframes(df_name): - parent_queue.append(parent_name) - - queue = [self[p] for p in parents] - to_explore = parents + if inplace: + new_es = self else: - to_explore = set(self.dataframe_dict.keys()) - queue = self.dataframes[:] - - explored = set() - # Store the last time indexes for the entire entityset in a dictionary to update - es_lti_dict = {} - for df in self.dataframes: - lti_col = df.ww.metadata.get("last_time_index") - if lti_col is not None: - lti_col = df[lti_col] - es_lti_dict[df.ww.name] = lti_col - - for df in queue: - es_lti_dict[df.ww.name] = None - - # We will explore children of dataframes on the queue, - # which may not be in the to_explore set. Therefore, - # we check whether all elements of to_explore are in - # explored, rather than just comparing length - while not to_explore.issubset(explored): - dataframe = queue.pop(0) - - if es_lti_dict[dataframe.ww.name] is None: - if dataframe.ww.time_index is not None: - lti = dataframe[dataframe.ww.time_index].copy() - else: - lti = dataframe.ww[dataframe.ww.index].copy() - # Cannot have a category dtype with nans when calculating last time index - lti = lti.astype("object") - lti[:] = None - - es_lti_dict[dataframe.ww.name] = lti - - if dataframe.ww.name in children: - child_dataframes = children[dataframe.ww.name] - - # if all children not explored, skip for now - if not set([df.ww.name for df in child_dataframes]).issubset(explored): - # Now there is a possibility that a child dataframe - # was not explicitly provided in updated_dataframes, - # and never made it onto the queue. If updated_dataframes - # is None then we just load all dataframes onto the queue - # so we didn't need this logic - for df in child_dataframes: - if df.ww.name not in explored and df.ww.name not in [ - q.ww.name for q in queue - ]: - # must also reset last time index here - es_lti_dict[df.ww.name] = None - queue.append(df) - queue.append(dataframe) - continue - - # updated last time from all children - for child_df in child_dataframes: - if es_lti_dict[child_df.ww.name] is None: - continue - link_col = child_cols[dataframe.ww.name][child_df.ww.name].name - - lti_df = pd.DataFrame( - { - "last_time": es_lti_dict[child_df.ww.name], - dataframe.ww.index: child_df[link_col], - }, - ) + new_es = self.copy() - # sort by time and keep only the most recent - lti_df.sort_values( - ["last_time", dataframe.ww.index], - kind="mergesort", - inplace=True, - ) + for df_name in new_es.dataframe_dict.keys(): + if df_name in other.dataframe_dict: + df = new_es[df_name] + other_df = other[df_name] - lti_df.drop_duplicates( - dataframe.ww.index, - keep="last", - inplace=True, + if not df.ww.schema.is_compatible(other_df.ww.schema, close_match=True): + raise ValueError( + "Schemas for dataframe '%s' are not compatible. Incompatible schemas: %s and %s" % (df_name, df.ww.schema, other_df.ww.schema) ) - lti_df.set_index(dataframe.ww.index, inplace=True) - lti_df = lti_df.reindex(es_lti_dict[dataframe.ww.name].index) - lti_df["last_time_old"] = es_lti_dict[dataframe.ww.name] - if lti_df.empty: - # Pandas errors out if it tries to do fillna and then max on an empty dataframe - lti_df = pd.Series([], dtype="object") - else: - lti_df["last_time"] = lti_df["last_time"].astype( - "datetime64[ns]", - ) - lti_df["last_time_old"] = lti_df["last_time_old"].astype( - "datetime64[ns]", - ) - lti_df = lti_df.fillna( - pd.to_datetime("1800-01-01 00:00"), - ).max(axis=1) - lti_df = lti_df.replace( - pd.to_datetime("1800-01-01 00:00"), - pd.NaT, - ) + df = pd.concat([df, other_df], ignore_index=True, sort=True) + df.ww.init( + schema=new_es[df_name].ww.schema, + name=df_name, + index=new_es[df_name].ww.index, + time_index=new_es[df_name].ww.time_index, + log_schema=new_es[df_name].ww.schema.log_schema, + ) + new_es[df_name] = df - es_lti_dict[dataframe.ww.name] = lti_df - es_lti_dict[dataframe.ww.name].name = "last_time" - - explored.add(dataframe.ww.name) - - # Store the last time index on the DataFrames - dfs_to_update = {} - for df in self.dataframes: - lti = es_lti_dict[df.ww.name] - if lti is not None: - if self.time_type == "numeric": - if lti.dtype == "datetime64[ns]": - # Woodwork cannot convert from datetime to numeric - lti = lti.apply(lambda x: x.value) - lti = init_series(lti, logical_type="Double") - else: - lti = init_series(lti, logical_type="Datetime") - - lti.name = LTI_COLUMN_NAME - - if LTI_COLUMN_NAME in df.columns: - if "last_time_index" in df.ww.semantic_tags[LTI_COLUMN_NAME]: - # Remove any previous last time index placed by featuretools - df.ww.pop(LTI_COLUMN_NAME) - else: - raise ValueError( - "Cannot add a last time index on DataFrame with an existing " - f"'{LTI_COLUMN_NAME}' column. Please rename '{LTI_COLUMN_NAME}'.", - ) + return new_es - # Add the new column to the DataFrame - df.ww[LTI_COLUMN_NAME] = lti - if "last_time_index" not in df.ww.semantic_tags[LTI_COLUMN_NAME]: - df.ww.add_semantic_tags({LTI_COLUMN_NAME: "last_time_index"}) - df.ww.metadata["last_time_index"] = LTI_COLUMN_NAME + def add_last_time_indexes(self, updated_dataframes=None): + """Adds a last time index to any dataframe that has a time index. - for df in dfs_to_update.values(): - df.ww.add_semantic_tags({LTI_COLUMN_NAME: "last_time_index"}) - df.ww.metadata["last_time_index"] = LTI_COLUMN_NAME - self.dataframe_dict[df.ww.name] = df + Args: + updated_dataframes (list[str], optional): List of dataframes names to update. + If None, all dataframes with a time index are updated. + """ + dataframes_to_update = updated_dataframes + if updated_dataframes is None: + dataframes_to_update = list(self.dataframe_dict.keys()) + + for df_name in dataframes_to_update: + dataframe = self[df_name] + + if dataframe.ww.time_index is None: + continue + + # use a list of columns including the index and all secondary time indexes + groupby_cols = [dataframe.ww.index] + if dataframe.ww.secondary_time_indexes: + for secondary_index_col in dataframe.ww.secondary_time_indexes: + groupby_cols.append(secondary_index_col) + + # if a dataframe has a natural time index, we can just use the last time + # an instance appears in the data as its last_time_index + last_time_index = ( + dataframe.groupby(groupby_cols)[dataframe.ww.time_index] + .apply(lambda x: x.sort_values().iloc[-1]) + .reset_index() + ) + last_time_index.rename( + columns={dataframe.ww.time_index: LTI_COLUMN_NAME}, + inplace=True, + ) + # Merge the new last_time_index dataframe into the Woodwork metadata + dataframe.ww.metadata["last_time_index"] = last_time_index self.reset_data_description() - for df in self.dataframes: - self._add_references_to_metadata(df) - # ########################################################################### - # # Pickling ############################################### - # ########################################################################### def __getstate__(self): - return { - **self.__dict__, - WW_SCHEMA_KEY: { - df_name: df.ww.schema for df_name, df in self.dataframe_dict.items() - }, - } + # Clear _data_description before pickling, to avoid recursion depth issues. + # Metadata is computed when accessed via property, so it will be recreated on load. + state = self.__dict__.copy() + state["_data_description"] = None + if WW_SCHEMA_KEY not in state: + schemas = {name: df.ww.schema for name, df in state["dataframe_dict"].items()} + state[WW_SCHEMA_KEY] = schemas + return state def __setstate__(self, state): - ww_schemas = state.pop(WW_SCHEMA_KEY) - for df_name, df in state.get("dataframe_dict", {}).items(): - if ww_schemas[df_name] is not None: - df.ww.init(schema=ww_schemas[df_name], validate=False) + # For older entitysets, manually deserialize Woodwork schemas. + if WW_SCHEMA_KEY in state: + schemas = state.pop(WW_SCHEMA_KEY) + for name, df in state["dataframe_dict"].items(): + df.ww.init(schema=schemas[name], name=name) self.__dict__.update(state) + _ES_REF[self.id] = self - # ########################################################################### - # # Other ############################################### - # ########################################################################### def add_interesting_values( self, max_values=5, @@ -1271,138 +711,50 @@ def add_interesting_values( dataframe_name=None, values=None, ): - """Find or set interesting values for categorical columns, to be used to generate "where" clauses + """Finds and adds interesting values to Woodwork for categorical features. Args: - max_values (int) : Maximum number of values per column to add. - verbose (bool) : If True, print summary of interesting values found. - dataframe_name (str) : The dataframe in the EntitySet for which to add interesting values. - If not specified interesting values will be added for all dataframes. - values (dict): A dictionary mapping column names to the interesting values to set - for the column. If specified, a corresponding dataframe_name must also be provided. - If not specified, interesting values will be set for all eligible columns. If values - are specified, max_values and verbose parameters will be ignored. - - Returns: - None - + max_values (int, optional): The maximum number of unique values to include. + If there are more unique values than `max_values`, the most frequent + values will be included. + verbose (bool, optional): Whether to print out the interesting values. + dataframe_name (str, optional): Name of the dataframe to add interesting values to. + If None, interesting values are added to all dataframes. + values (list, optional): A list of interesting values to add. If this is + set, `max_values` is ignored. """ - if dataframe_name is None and values is not None: - raise ValueError("dataframe_name must be specified if values are provided") - - if dataframe_name is not None and values is not None: - for column, vals in values.items(): - self[dataframe_name].ww.columns[column].metadata[ - "interesting_values" - ] = vals - return - - if dataframe_name: - dataframes = [self[dataframe_name]] - else: - dataframes = self.dataframes def add_value(df, col, val, verbose): - if verbose: - msg = "Column {}: Marking {} as an interesting value" - logger.info(msg.format(col, val)) - interesting_vals = df.ww.columns[col].metadata.get("interesting_values", []) - interesting_vals.append(val) - df.ww.columns[col].metadata["interesting_values"] = interesting_vals - - for df in dataframes: - value_counts = df.ww.value_counts(top_n=max(25, max_values), dropna=True) - total_count = len(df) - - for col, counts in value_counts.items(): - if {"index", "foreign_key"}.intersection(df.ww.semantic_tags[col]): - continue - - for i in range(min(max_values, len(counts))): - # Categorical columns will include counts of 0 for all values - # in categories. Stop when we encounter a 0 count. - if counts[i]["count"] == 0: - break - if len(counts) < 25: - value = counts[i]["value"] - add_value(df, col, value, verbose) - else: - fraction = counts[i]["count"] / total_count - if fraction > 0.05 and fraction < 0.95: - value = counts[i]["value"] - add_value(df, col, value, verbose) - else: - break + df.ww[col].ww.add_interesting_values(val, verbose=verbose) + if dataframe_name is None: + for df in self.dataframes: + for col in df.columns: + if df.ww[col].ww.logical_type.type_string == "categorical": + add_value(df, col, values, verbose) + else: + df = self[dataframe_name] + for col in df.columns: + if df.ww[col].ww.logical_type.type_string == "categorical": + add_value(df, col, values, verbose) self.reset_data_description() def plot(self, to_file=None): - """ - Create a UML diagram-ish graph of the EntitySet. + """Plots the EntitySet and returns a graphviz Digraph object. Args: - to_file (str, optional) : Path to where the plot should be saved. - If set to None (as by default), the plot will not be saved. + to_file (str, optional): Path to save the plot. If this is None, + the plot will not be saved. If the path does not include + an extension, it will default to .png. If the path is a directory, + a filename will be generated automatically. If `graphviz` is not + installed, the plot will not be rendered. Returns: - graphviz.Digraph : Graph object that can directly be displayed in - Jupyter notebooks. Nodes of the graph correspond to the DataFrames - in the EntitySet, showing the typing information for each column. - - Note: - The typing information displayed for each column is based off of the Woodwork - ColumnSchema for that column and is represented as ``LogicalType; semantic_tags``, - but the standard semantic tags have been removed for brevity. + graphviz.Digraph : Graphviz Digraph object of the EntitySet. """ - graphviz = check_graphviz() - format_ = get_graphviz_format(graphviz=graphviz, to_file=to_file) - - # Initialize a new directed graph - graph = graphviz.Digraph( - self.id, - format=format_, - graph_attr={"splines": "ortho"}, - ) + check_graphviz() - # Draw dataframes - for df in self.dataframes: - column_typing_info = [] - for col_name, col_schema in df.ww.columns.items(): - col_string = col_name + " : " + str(col_schema.logical_type) - - tags = col_schema.semantic_tags - col_schema.logical_type.standard_tags - if tags: - col_string += "; " - col_string += ", ".join(tags) - column_typing_info.append(col_string) - - columns_string = "\l".join(column_typing_info) # noqa: W605 - nrows = df.shape[0] - label = "{%s (%d row%s)|%s\l}" % ( # noqa: W605 - df.ww.name, - nrows, - "s" * (nrows > 1), - columns_string, - ) - graph.node(df.ww.name, shape="record", label=label) - - # Draw relationships - for rel in self.relationships: - # Display the key only once if is the same for both related dataframes - if rel._parent_column_name == rel._child_column_name: - label = rel._parent_column_name - else: - label = "%s -> %s" % (rel._parent_column_name, rel._child_column_name) - - graph.edge( - rel._child_dataframe_name, - rel._parent_dataframe_name, - xlabel=label, - ) - - if to_file: - save_graph(graph, to_file, format_) - return graph + return self.metadata.plot(to_file=to_file) def _handle_time( self, @@ -1416,44 +768,47 @@ def _handle_time( Filter a dataframe for all instances before time_last. If the dataframe does not have a time index, return the original dataframe. + Modified: Retain rows with missing time indices (NaN/NaT) so that non-time-based features can still be computed. """ schema = self[dataframe_name].ww.schema if schema.time_index: df_empty = df.empty if time_last is not None and not df_empty: + # Identify rows with missing time index + missing_time_mask = df[schema.time_index].isna() + if missing_time_mask.any(): + warnings.warn( + f"DataFrame '{dataframe_name}' contains rows with missing time indices. Time-based features will be NaN for these rows.", + ) + # Filter by time_last for non-missing time indices + filtered_df = df[~missing_time_mask].copy() if include_cutoff_time: - df = df[df[schema.time_index] <= time_last] + filtered_df = filtered_df[filtered_df[schema.time_index] <= time_last] else: - df = df[df[schema.time_index] < time_last] + filtered_df = filtered_df[filtered_df[schema.time_index] < time_last] + + # Recombine filtered rows with rows that had missing time indices + # This ensures rows with NaT are retained but won't affect time-based calculations + df = pd.concat([filtered_df, df[missing_time_mask]], axis=0, ignore_index=True) + # Handle training window for non-missing time indices if training_window is not None: - training_window = _check_timedelta(training_window) - if include_cutoff_time: - mask = df[schema.time_index] > time_last - training_window - else: - mask = df[schema.time_index] >= time_last - training_window - lti_col = schema.metadata.get("last_time_index") - if lti_col is not None: - if include_cutoff_time: - lti_mask = df[lti_col] > time_last - training_window - else: - lti_mask = df[lti_col] >= time_last - training_window - mask = mask | lti_mask - else: - warnings.warn( - "Using training_window but last_time_index is " - "not set for dataframe %s" % (dataframe_name), + # For now, training_window only supports datetime time_index + if not isinstance(schema.logical_types[schema.time_index], Datetime): + raise TypeError( + "training_window is only supported with datetime time_index", ) - - df = df[mask] - - secondary_time_indexes = schema.metadata.get("secondary_time_index") or {} - for secondary_time_index, columns in secondary_time_indexes.items(): - # should we use ignore time last here? - if time_last is not None and not df.empty: - mask = df[secondary_time_index] >= time_last - df.loc[mask, columns] = np.nan - + time_first = time_last - _check_timedelta(training_window) + df = df[(df[schema.time_index] >= time_first) | (missing_time_mask)] + + # If there's no time_last, but a training window is specified, filter by training window + elif training_window is not None and not df_empty: + if not isinstance(schema.logical_types[schema.time_index], Datetime): + raise TypeError( + "training_window is only supported with datetime time_index", + ) + time_first = time_last - _check_timedelta(training_window) + df = df[df[schema.time_index] >= time_first] return df def query_by_values( @@ -1466,71 +821,56 @@ def query_by_values( training_window=None, include_cutoff_time=True, ): - """Query instances that have column with given value + """ + Get the dataframe for a given dataframe and filter by instance values + and time. Args: - dataframe_name (str): The id of the dataframe to query - instance_vals (pd.Dataframe, pd.Series, list[str] or str) : - Instance(s) to match. - column_name (str) : Column to query on. If None, query on index. - columns (list[str]) : Columns to return. Return all columns if None. - time_last (pd.TimeStamp) : Query data up to and including this - time. Only applies if dataframe has a time index. - training_window (Timedelta, optional): - Window defining how much time before the cutoff time data - can be used when calculating features. If None, all data before cutoff time is used. - include_cutoff_time (bool): - If True, data at cutoff time are included in calculating features + dataframe_name (str): Name of dataframe to query. - Returns: - pd.DataFrame : instances that match constraints with ids in order of underlying dataframe - """ - dataframe = self[dataframe_name] - if not column_name: - column_name = dataframe.ww.index + instance_vals (np.ndarray or pd.Series): Instances to filter for. - instance_vals = _vals_to_series(instance_vals, column_name) + column_name (str, optional): Name of column to filter on. + If None, the dataframe's index is used. - training_window = _check_timedelta(training_window) + columns (list[str], optional): List of column names to select. + If None, all columns are selected. - if training_window is not None: - assert ( - training_window.has_no_observations() - ), "Training window cannot be in observations" + time_last (pd.Timestamp, optional): Last allowed time. Data from exactly this + time not allowed. - if instance_vals is None: - df = dataframe.copy() + training_window (Timedelta, optional): Window defining how much time before the cutoff time data + can be used when calculating features. If None, all data before cutoff time is used. - elif isinstance(instance_vals, pd.Series) and instance_vals.empty: - df = dataframe.head(0) + include_cutoff_time (bool): If True, data at cutoff time are included + in calculating features. - else: - df = dataframe[dataframe[column_name].isin(instance_vals)] - df = df.set_index(dataframe.ww.index, drop=False) - - # ensure filtered df has same categories as original - # workaround for issue below - # github.com/pandas-dev/pandas/issues/22501#issuecomment-415982538 - # - # Pandas claims that bug is fixed but it still shows up in some - # cases. More investigation needed. - if dataframe.ww.columns[column_name].is_categorical: - categories = pd.api.types.CategoricalDtype( - categories=dataframe[column_name].cat.categories, - ) - df[column_name] = df[column_name].astype(categories) + Returns: + pd.DataFrame: dataframe for this dataframe, filtered by the given + instance ids and time. + """ + if columns is None: + columns = self[dataframe_name].ww.columns.keys() + + if column_name is None: + column_name = self[dataframe_name].ww.index + + if not isinstance(instance_vals, pd.Series): + instance_vals = _vals_to_series(instance_vals, column_name) + + # Filter by the values. + df = self[dataframe_name].ww.loc[instance_vals.index][columns] + + # Filter by time if a time_last was given. df = self._handle_time( - dataframe_name=dataframe_name, - df=df, - time_last=time_last, - training_window=training_window, + dataframe_name, + df, + time_last, + training_window, include_cutoff_time=include_cutoff_time, ) - if columns is not None: - df = df[columns] - return df def replace_dataframe( @@ -1540,208 +880,104 @@ def replace_dataframe( already_sorted=False, recalculate_last_time_indexes=True, ): - """Replace the internal dataframe of an EntitySet table, keeping Woodwork typing information the same. - Optionally makes sure that data is sorted, that reference indexes to other dataframes are consistent, - and that last_time_indexes are updated to reflect the new data. If an index was created for the original - dataframe and is not present on the new dataframe, an index column of the same name will be added to the - new dataframe. """ - if not isinstance(df, type(self[dataframe_name])): - raise TypeError("Incorrect DataFrame type used") - - # If the original DataFrame has a last time index column and the new one doesnt - # remove the column and the reference to last time index from that dataframe - last_time_index_column = self[dataframe_name].ww.metadata.get("last_time_index") - if ( - last_time_index_column is not None - and last_time_index_column not in df.columns - ): - self[dataframe_name].ww.pop(last_time_index_column) - del self[dataframe_name].ww.metadata["last_time_index"] - - # If the original DataFrame had an index created via make_index, - # we may need to remake the index if it's not in the new DataFrame - created_index = self[dataframe_name].ww.metadata.get("created_index") - if created_index is not None and created_index not in df.columns: - df = _create_index(df, created_index) + Replace a dataframe in the EntitySet. - old_column_names = list(self[dataframe_name].columns) - if len(df.columns) != len(old_column_names): - raise ValueError( - "New dataframe contains {} columns, expecting {}".format( - len(df.columns), - len(old_column_names), - ), - ) - for col_name in old_column_names: - if col_name not in df.columns: - raise ValueError( - "New dataframe is missing new {} column".format(col_name), - ) - - if df.ww.schema is not None: - warnings.warn( - "Woodwork typing information on new dataframe will be replaced " - f"with existing typing information from {dataframe_name}", - ) - - df.ww.init( - schema=self[dataframe_name].ww._schema, + Args: + dataframe_name (str): The name of the dataframe to replace. + df (pd.DataFrame): The new dataframe to replace the old one. + already_sorted (bool): If true, dataframe is assumed to be sorted by time_index + and no additional sorting will be performed. + recalculate_last_time_indexes (bool): If True, the last_time_indexes for all + dataframes with a time_index will be re-calculated. Defaults to True. + """ + if dataframe_name not in self.dataframe_dict: + raise KeyError(f"DataFrame '{dataframe_name}' not found in entityset.") + + new_dataframe = df.ww.init( + schema=self.dataframe_dict[dataframe_name].ww.schema, + name=dataframe_name, + index=self.dataframe_dict[dataframe_name].ww.index, + time_index=self.dataframe_dict[dataframe_name].ww.time_index, + log_schema=self.dataframe_dict[dataframe_name].ww.schema.log_schema, already_sorted=already_sorted, ) - # Make sure column ordering matches original ordering - df = df.ww[old_column_names] - df = self._normalize_values(df) + if not self.dataframe_dict[dataframe_name].ww.schema.is_compatible( + new_dataframe.ww.schema, + close_match=True, + ): + raise ValueError( + "New dataframe is not compatible with old dataframe. Incompatible schemas: %s and %s" % (self.dataframe_dict[dataframe_name].ww.schema, new_dataframe.ww.schema) + ) - self.dataframe_dict[dataframe_name] = df + self.dataframe_dict[dataframe_name] = new_dataframe - if self[dataframe_name].ww.time_index is not None: - self._check_uniform_time_index(self[dataframe_name]) + if recalculate_last_time_indexes: + self.add_last_time_indexes([dataframe_name]) - df_metadata = self[dataframe_name].ww.metadata - self.set_secondary_time_index( - dataframe_name, - df_metadata.get("secondary_time_index"), - ) - if recalculate_last_time_indexes and last_time_index_column is not None: - self.add_last_time_indexes(updated_dataframes=[dataframe_name]) self.reset_data_description() - self._add_references_to_metadata(df) def _check_time_indexes(self): - for dataframe in self.dataframe_dict.values(): - self._check_uniform_time_index(dataframe) - self._check_secondary_time_index(dataframe) + """ + Check that all time_indexes are datetime or Datetime logical types. + """ + for dataframe in self.dataframes: + if dataframe.ww.time_index: + self._check_uniform_time_index(dataframe) def _check_secondary_time_index(self, dataframe, secondary_time_index=None): - secondary_time_index = secondary_time_index or dataframe.ww.metadata.get( - "secondary_time_index", - {}, - ) - - if secondary_time_index and dataframe.ww.time_index is None: - raise ValueError( - "Cannot set secondary time index on a DataFrame that has no primary time index.", - ) - - for time_index, columns in secondary_time_index.items(): - self._check_uniform_time_index(dataframe, column_name=time_index) - if time_index not in columns: - columns.append(time_index) + # For older entitysets, sometimes the secondary_time_indexes are not an array of dictionaries + # in the schema, so this check will make them into an array of dictionaries if that is the case + dataframe.ww.set_secondary_time_index(secondary_time_index) + for s in dataframe.ww.secondary_time_indexes: + self._check_uniform_time_index(dataframe, s) def _check_uniform_time_index(self, dataframe, column_name=None): - column_name = column_name or dataframe.ww.time_index - if column_name is None: - return - - time_type = self._get_time_type(dataframe, column_name) - if self.time_type is None: - self.time_type = time_type - elif self.time_type != time_type: - info = "%s time index is %s type which differs from other entityset time indexes" - raise TypeError(info % (dataframe.ww.name, time_type)) + dataframe.ww._check_uniform_time_index(column_name) def _get_time_type(self, dataframe, column_name=None): - column_name = column_name or dataframe.ww.time_index - - column_schema = dataframe.ww.columns[column_name] - - time_type = None - if column_schema.is_numeric: - time_type = "numeric" - elif column_schema.is_datetime: - time_type = Datetime - - if time_type is None: - info = "%s time index not recognized as numeric or datetime" - raise TypeError(info % dataframe.ww.name) - return time_type + return dataframe.ww._get_time_type(column_name) def _add_references_to_metadata(self, dataframe): - dataframe.ww.metadata.update(entityset_id=self.id) - for column in dataframe.columns: - metadata = dataframe.ww._schema.columns[column].metadata - metadata.update(dataframe_name=dataframe.ww.name) - metadata.update(entityset_id=self.id) - _ES_REF[self.id] = self + dataframe.ww.metadata["entityset"] = self def _normalize_values(self, dataframe): + # normalize any values Woodwork added that were outside the schema def replace(x): - if not isinstance(x, (list, tuple, np.ndarray)) and pd.isna(x): - return (np.nan, np.nan) - else: - return x + try: + x.ww.init(schema=dataframe.ww.schema[x.name], logical_types=dataframe.ww.logical_types[x.name]) + except TypeError: + pass + return x - for column, logical_type in dataframe.ww.logical_types.items(): - if isinstance(logical_type, LatLong): - dataframe[column] = dataframe[column].apply(replace) - return dataframe + return dataframe.apply(replace) def _vals_to_series(instance_vals, column_id): - """ - instance_vals may be a pd.Dataframe, a pd.Series, a list, a single - value, or None. This function always returns a Series or None. - """ - if instance_vals is None: - return None - - # If this is a single value, make it a list - if not hasattr(instance_vals, "__iter__"): - instance_vals = [instance_vals] - - # convert iterable to pd.Series - if isinstance(instance_vals, pd.DataFrame): - out_vals = instance_vals[column_id] - else: - out_vals = pd.Series(instance_vals) - - # no duplicates or NaN values - out_vals = out_vals.drop_duplicates().dropna() - - # want index to have no name for the merge in query_by_values - out_vals.index.name = None - - return out_vals + if isinstance(instance_vals, list): + instance_vals = pd.Series(instance_vals, name=column_id) + return instance_vals def _get_or_create_index(index, make_index, df): - """Handles index creation logic base on user input""" - index_was_created = False + if make_index: + # TODO: Handle case where index exists as a regular column. + # If Woodwork is already initialized on df, then index will be the existing + # Woodwork index. If it is also passed as a column and not the index + # then the index will be duplicated. + if df.ww.index is None: + df.ww.init(index=index, make_index=make_index) + return df.ww.index - if index is None: - # Case 1: user wanted to make index but did not specify column name - assert not make_index, "Must specify an index name if make_index is True" - # Case 2: make_index not specified but no index supplied, use first column - warnings.warn( - ( - "Using first column as index. " - "To change this, specify the index parameter" - ), - ) - index = df.columns[0] - elif make_index and index in df.columns: - # Case 3: user wanted to make index but column already exists - raise RuntimeError( - f"Cannot make index: column with name {index} already present", - ) - elif index not in df.columns: - if not make_index: - # Case 4: user names index, it is not in df. does not specify - # make_index. Make new index column and warn - warnings.warn( - "index {} not found in dataframe, creating new " - "integer column".format(index), - ) - # Case 5: make_index with no errors or warnings - # (Case 4 also uses this code path) - df = _create_index(df, index) - index_was_created = True - # Case 6: user specified index, which is already in df. No action needed. - return index_was_created, index, df + if index not in df.columns: + raise LookupError("Specified index column '%s' not found in dataframe." % index) + return df.ww.index def _create_index(df, index): - df.insert(0, index, range(len(df))) - return df + if index is None: + index = df.name + df = df.reset_index() + df.rename(columns={'index': index}, inplace=True) + return index diff --git a/featuretools/tests/computational_backend/test_feature_set_calculator.py b/featuretools/tests/computational_backend/test_feature_set_calculator.py index 99afe56ffc..ad1a0b10db 100644 --- a/featuretools/tests/computational_backend/test_feature_set_calculator.py +++ b/featuretools/tests/computational_backend/test_feature_set_calculator.py @@ -38,6 +38,7 @@ Sum, TimeSinceLast, Trend, + TimeSincePrevious, ) from featuretools.primitives.base import AggregationPrimitive from featuretools.primitives.standard.aggregation.num_unique import NumUnique @@ -556,7 +557,7 @@ def test_make_dfeat_of_agg_feat_through_parent(es): The graph looks like this: R C = Customers, the dataframe we're trying to predict on - / \\ R = Regions, a parent of customers + / \ R = Regions, a parent of customers S C S = Stores, a child of regions | etc. @@ -587,7 +588,7 @@ def test_make_deep_agg_feat_of_dfeat_of_agg_feat(es): C C = Customers, the dataframe we're trying to predict on | S = Sessions, a child of Customers P S L = Log, a child of both Sessions and Log - \\ / P = Products, a parent of Log which is not a descendent of customers + \ / P = Products, a parent of Log which is not a descendent of customers L We're trying to calculate a DFeat from L to P on an agg_feat of P on L, and @@ -1202,3 +1203,38 @@ def test_nunique_nested_with_agg_bug(es): df = calculator.run(np.array([0])) assert df.iloc[0, 0].round(4) == 1.6667 + + +def test_missing_time_index_rows(): + # Create a simple DataFrame with a time index and a missing value + df = pd.DataFrame({ + 'id': [1, 2, 3, 4], + 'value': [10, 20, 30, 40], + 'time': [datetime(2020, 1, 1), pd.NaT, datetime(2020, 1, 3), datetime(2020, 1, 4)] + }) + es = EntitySet() + es.add_dataframe( + dataframe_name='test', + dataframe=df, + index='id', + time_index='time', + ) + + # Non-time-based feature + f_value = Feature(es['test'].ww['value']) + # Time-based feature (transform primitive, base is time column) + from featuretools.primitives import TimeSincePrevious + f_time_since = Feature( + es['test'].ww['time'], + primitive=TimeSincePrevious + ) + + feature_matrix = calculate_feature_matrix([f_value, f_time_since], es) + feature_matrix_sorted = feature_matrix.sort_index() + + # Check that non-time-based feature is computed for all rows + assert feature_matrix_sorted['value'].tolist() == [10, 20, 30, 40] + # Check that time-based feature is NaN for the first row and for the row with missing time index + is_nan = feature_matrix_sorted[f_time_since.get_name()].isna().tolist() + # The first row and the second row (id=2, which has NaT in time) should be NaN for time-based feature + assert is_nan == [True, True, False, False]