Skip to content

Cannot insert spark dataframe in batches into a collection #14

@timtimich35

Description

@timtimich35

Dependencies:
python 3.8.12
pyspark 3.5.0
pymilvus 2.4.1
grpcio-tools 1.60.0
protobuf 4.25.3
milvus cluster was deployed in k8s using milvus operator 0.9.13

SparkSession setup:

spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("similarity_search_mech") \
                    .config("spark.driver.extraClassPath", '/data/notebook_files/clickhouse-native-jdbc-shaded-2.6.5.jar') \
                    .config("spark.jars", '/data/notebook_files/spark-milvus-1.0.0-SNAPSHOT.jar') \
                    .config("spark.driver.memory", "4g") \
                    .getOrCreate()

Milvus setup:

client = MilvusClient(uri="http://cluster_address:19530")

# create schema
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)

# add fields to schema
schema.add_field(field_name="id", 
                 datatype=DataType.INT64, 
                 is_primary=True)

schema.add_field(field_name="vec", 
                 datatype=DataType.FLOAT_VECTOR, 
                 dim=3000)

# prepare index parameters
index_params = client.prepare_index_params()

# add indexes
index_params.add_index(field_name="id", 
                       index_type="STL_SORT")

index_params.add_index(field_name="vec", 
                       index_type="IVF_FLAT", 
                       metric_type="IP", 
                       params={"nlist": 128})

# create a collection
client.create_collection(collection_name="similarity_search_mech",
                         schema=schema,
                         index_params=index_params)

Given:
I work in DataLore IDE deployed in k8s along with Milvus and Spark.
I have a spark dataframe of 2.5 million rows and 2 columns, id and 3000 elements long vector of floats.
I try to load it in batches 100,000 records each so it should be 25 iterations in total.
None of the batches gets inserted.

Insert operation:

    batch.write \
    .mode("append") \
    .option("milvus.host", "cluster_address") \
    .option("milvus.port", "19530") \
    .option("milvus.collection.name", "similarity_search_mech") \
    .option("milvus.collection.vectorField", "vec") \
    .option("milvus.collection.vectorDim", "3000") \
    .option("milvus.collection.primaryKeyField", "id") \
    .format("milvus") \
    .save()

Can you please help me understand what do I do wrong?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions