Skip to content

Missing insertion data exists during bulk write #11

@Uijeong97

Description

@Uijeong97

Hi, I'm using spark milvus connector to do bulk insert, but I have an issue with some data missing.

I'm getting an error from the proxy, and It is a describe collection error.
I think it's fatal to have missing data. check please.

milvus_writer = df.write \
    .format("milvus") \
    .mode("append") \
    .option("milvus.host", host) \
    .option("milvus.port", port) \
    .option("milvus.database.name", db_name) \
    .option("milvus.collection.name", collection_name) \
    .option("milvus.collection.vectorField", "embedding") \
    .option("milvus.collection.vectorDim", "768") \
    .option("milvus.collection.primaryKeyField", "poi_id")

I made sure segments were all flushed, and did a count check at the point where enough time had passed.

  • expected collection count (df count)
스크린샷 2024-03-27 오후 10 06 20
  • real collection count (after bulk write using spark-milvus connector)
스크린샷 2024-03-27 오후 10 06 27
  • spark error log
2024-03-27 21:21:40,046 ERROR client.AbstractMilvusGrpcClient: DescribeCollectionRequest failed:can't find collection collection not found[collection=448192185218736664]
2024-03-27 21:21:40,062 ERROR client.AbstractMilvusGrpcClient: Failed to describe collection: cp_poi_embedding
  • This shows a failed request from the proxy. Errors often occur in the describe collection query.
스크린샷 2024-03-27 오후 10 07 07

and another log

2024-03-27 21:21:43,242 ERROR internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=135, target=gateway-pai-milvus.pai-staging-milvus.svc.pr1.io.navercorp.com:10001} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:630)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
	at io.milvus.client.MilvusServiceClient.<init>(MilvusServiceClient.java:143)
	at zilliztech.spark.milvus.MilvusConnection$.acquire(MilvusConnection.scala:30)
	at zilliztech.spark.milvus.writer.MilvusDataWriter.<init>(MilvusDataWriter.scala:18)
	at zilliztech.spark.milvus.writer.MilvusDataWriterFactory.createWriter(MilvusDataWriterFactory.scala:11)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:459)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions