1
- import glob
2
1
import sys
3
2
import os
4
- import itertools
5
- import time
6
3
import json
7
- import asyncio
4
+ import time
8
5
import warnings
9
6
from urllib .parse import urlparse
10
7
from dataclasses import dataclass
11
- from importlib .metadata import version
12
8
13
- import gcsfs
14
- import s3fs
15
9
import pandas as pd
16
- from tqdm .auto import tqdm
17
10
import pyarrow .parquet as pq
18
11
from pydantic import ValidationError
19
- from typing import Any , Generator , Iterator , List , Union , Dict , Optional , Tuple
12
+ from typing import Any , Generator , Iterator , List , Dict , Optional , Tuple , NamedTuple
20
13
21
14
from pinecone_datasets import cfg
22
15
from pinecone_datasets .catalog import DatasetMetadata
23
- from pinecone_datasets .fs import get_cloud_fs , LocalFileSystem
16
+ from pinecone_datasets .fs import get_cloud_fs
24
17
25
18
import pinecone as pc
26
- from pinecone import Index
19
+ from pinecone import Index , ServerlessSpec , PodSpec
27
20
28
21
29
22
class DatasetInitializationError (Exception ):
@@ -446,7 +439,7 @@ def to_catalog(
446
439
def _upsert_to_index (
447
440
self , index_name : str , namespace : str , batch_size : int , show_progress : bool
448
441
):
449
- pinecone_index = Index (index_name = index_name )
442
+ pinecone_index = self . _pinecone_client . Index (index_name )
450
443
451
444
res = pinecone_index .upsert_from_dataframe (
452
445
self .documents [self ._config .Schema .documents_select_columns ].dropna (
@@ -461,41 +454,51 @@ def _upsert_to_index(
461
454
def _set_pinecone_index (
462
455
self ,
463
456
api_key : Optional [str ] = None ,
464
- environment : Optional [str ] = None ,
465
457
** kwargs ,
466
458
) -> None :
467
- pc .init (api_key = api_key , environment = environment , ** kwargs )
468
- self ._pinecone_client = pc
459
+ self ._pinecone_client = pc .Pinecone (api_key = api_key , ** kwargs )
460
+
461
+ def _get_index_list (self ) -> List [str ]:
462
+ return self ._pinecone_client .list_indexes ().names ()
469
463
470
464
def _create_index (
471
465
self ,
472
466
index_name : str ,
473
467
api_key : Optional [str ] = None ,
474
- environment : Optional [str ] = None ,
468
+ spec : Optional [NamedTuple ] = None ,
475
469
** kwargs ,
476
470
) -> Index :
477
- self ._set_pinecone_index (api_key = api_key , environment = environment )
478
- pinecone_index_list = self ._pinecone_client . list_indexes ()
471
+ self ._set_pinecone_index (api_key = api_key )
472
+ pinecone_index_list = self ._get_index_list ()
479
473
480
474
if index_name in pinecone_index_list :
481
475
raise ValueError (
482
476
f"index { index_name } already exists, Pinecone Datasets can only be upserted to a new indexe"
483
477
)
484
478
else :
485
479
# create index
486
- print ("creating index" )
487
480
try :
488
481
self ._pinecone_client .create_index (
489
482
name = index_name ,
490
483
dimension = self .metadata .dense_model .dimension ,
484
+ spec = spec ,
491
485
** kwargs ,
492
486
)
493
- print ( "index created" )
487
+ self . _wait_for_index_creation ( index_name )
494
488
return True
495
489
except Exception as e :
496
490
print (f"error creating index: { e } " )
497
491
return False
498
492
493
+ def _wait_for_index_creation (self , index_name : str , timeout : int = 60 ):
494
+ for _ in range (timeout ):
495
+ try :
496
+ self ._pinecone_client .Index (index_name ).describe_index_stats ()
497
+ return
498
+ except Exception as e :
499
+ time .sleep (1 )
500
+ raise TimeoutError (f"Index creation timed out after { timeout } seconds" )
501
+
499
502
def to_pinecone_index (
500
503
self ,
501
504
index_name : str ,
@@ -505,20 +508,31 @@ def to_pinecone_index(
505
508
show_progress : bool = True ,
506
509
api_key : Optional [str ] = None ,
507
510
environment : Optional [str ] = None ,
511
+ region : Optional [str ] = None ,
512
+ cloud : Optional [str ] = None ,
513
+ serverless : Optional [bool ] = None ,
508
514
** kwargs ,
509
515
):
510
516
"""
511
517
Saves the dataset to a Pinecone index.
512
518
513
- this function will look for two environment variables:
519
+ this function will look for four environment variables:
520
+ - SERVERLESS
514
521
- PINECONE_API_KEY
522
+ - PINECONE_REGION
523
+ - PINECONE_CLOUD
515
524
- PINECONE_ENVIRONMENT
516
525
517
526
Then, it will init a Pinecone Client and will perform an upsert to the index.
518
527
The upsert will be using async batches to increase performance.
519
528
520
529
Args:
521
530
index_name (str): the name of the index to upsert to
531
+ api_key (str, optional): the api key to use for the upsert. Defaults to None.
532
+ region (str, optional): the region to use for the upsert for serverless. Defaults to None.
533
+ cloud (str, optional): the cloud to use for the upsert for serverless. Defaults to None.
534
+ environment (str, optional): the environment to use for the upsert for pod-based. Defaults to None.
535
+ serverless (bool, optional): whether to use serverless or pod-based. Defaults to None.
522
536
namespace (str, optional): the namespace to use for the upsert. Defaults to "".
523
537
batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
524
538
show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
@@ -536,13 +550,21 @@ def to_pinecone_index(
536
550
result = dataset.to_pinecone_index(index_name="my_index")
537
551
```
538
552
"""
553
+ serverless = serverless or os .environ .get ("SERVERLESS" , False )
554
+ if serverless :
555
+ spec = ServerlessSpec (
556
+ cloud = cloud or os .getenv ("PINECONE_CLOUD" , "aws" ),
557
+ region = region or os .getenv ("PINECONE_REGION" , "us-west2" ),
558
+ )
559
+ else :
560
+ spec = PodSpec (
561
+ environment = environment or os .environ ["PINECONE_ENVIRONMENT" ],
562
+ )
539
563
if should_create_index :
540
- if not self ._create_index (
541
- index_name , api_key = api_key , environment = environment , ** kwargs
542
- ):
564
+ if not self ._create_index (index_name , api_key = api_key , spec = spec , ** kwargs ):
543
565
raise RuntimeError ("index creation failed" )
544
566
else :
545
- self ._set_pinecone_index (api_key = api_key , environment = environment , ** kwargs )
567
+ self ._set_pinecone_index (api_key = api_key , ** kwargs )
546
568
547
569
return self ._upsert_to_index (
548
570
index_name = index_name ,
0 commit comments