|
1 |
| -import sys |
2 | 1 | import os
|
3 | 2 | import json
|
4 | 3 | import logging
|
5 | 4 | import warnings
|
6 |
| -from typing import Literal, Dict, Optional |
| 5 | +from typing import Literal, Optional |
7 | 6 |
|
8 | 7 | import pandas as pd
|
9 | 8 | import pyarrow.parquet as pq
|
10 |
| -from typing import Any, Generator, Iterator, List, Dict, Optional, Tuple, NamedTuple |
| 9 | +from .tqdm import tqdm |
11 | 10 |
|
12 | 11 | from .cfg import Schema
|
13 | 12 | from .dataset_metadata import DatasetMetadata
|
@@ -75,33 +74,41 @@ def _safe_read_from_path(
|
75 | 74 | read_path_str = os.path.join(dataset_path, data_type, "*.parquet")
|
76 | 75 | read_path = fs.glob(read_path_str)
|
77 | 76 | if DatasetFSReader._does_datatype_exist(fs, dataset_path, data_type):
|
78 |
| - dataset = pq.ParquetDataset(read_path, filesystem=fs) |
79 |
| - dataset_schema_names = dataset.schema.names |
| 77 | + # First, collect all the dataframes |
| 78 | + dfs = [] |
| 79 | + for path in tqdm(read_path, desc=f"Loading {data_type} parquet files"): |
| 80 | + piece = pq.read_pandas(path, filesystem=fs) |
| 81 | + df_piece = piece.to_pandas() |
| 82 | + dfs.append(df_piece) |
| 83 | + |
| 84 | + if not dfs: |
| 85 | + raise ValueError(f"No parquet files found in {read_path_str}") |
| 86 | + |
| 87 | + # Combine all dataframes |
| 88 | + df = pd.concat(dfs, ignore_index=True) |
| 89 | + |
| 90 | + # Validate schema |
| 91 | + dataset_schema_names = df.columns.tolist() |
80 | 92 | columns_to_null = []
|
81 | 93 | columns_not_null = []
|
82 | 94 | for column_name, is_nullable, null_value in getattr(
|
83 | 95 | Schema.Names, data_type
|
84 | 96 | ):
|
85 | 97 | if column_name not in dataset_schema_names and not is_nullable:
|
86 | 98 | raise ValueError(
|
87 |
| - f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found" |
| 99 | + f"error, file is not matching Pinecone Datasets Schema: {column_name} not found" |
88 | 100 | )
|
89 | 101 | elif column_name not in dataset_schema_names and is_nullable:
|
90 | 102 | columns_to_null.append((column_name, null_value))
|
91 | 103 | else:
|
92 | 104 | columns_not_null.append(column_name)
|
93 |
| - try: |
94 |
| - # TODO: use of the columns_not_null and columns_to_null is only a workaround for proper schema validation and versioning |
95 |
| - df = dataset.read_pandas(columns=columns_not_null).to_pandas() |
96 |
| - |
97 |
| - for column_name, null_value in columns_to_null: |
98 |
| - df[column_name] = null_value |
99 |
| - return df |
100 |
| - |
101 |
| - # TODO: add more specific error handling, explain what is wrong |
102 |
| - except Exception as e: |
103 |
| - print("error, no exception: {}".format(e), file=sys.stderr) |
104 |
| - raise (e) |
| 105 | + |
| 106 | + # Add null columns if needed |
| 107 | + for column_name, null_value in columns_to_null: |
| 108 | + df[column_name] = null_value |
| 109 | + |
| 110 | + return df[columns_not_null + [col for col, _ in columns_to_null]] |
| 111 | + |
105 | 112 | else:
|
106 | 113 | warnings.warn(
|
107 | 114 | "WARNING: No data found at: {}. Returning empty dataframe".format(
|
|
0 commit comments