diff --git a/backend/src/main/java/com/cloudera/cai/rag/Types.java b/backend/src/main/java/com/cloudera/cai/rag/Types.java index f2eec834f..22dfe1ac5 100644 --- a/backend/src/main/java/com/cloudera/cai/rag/Types.java +++ b/backend/src/main/java/com/cloudera/cai/rag/Types.java @@ -59,7 +59,7 @@ public record RagDocument( Long dataSourceId, String documentId, String s3Path, - Instant vectorUploadTimestamp, + @With Instant vectorUploadTimestamp, Long sizeInBytes, String extension, Instant timeCreated, diff --git a/backend/src/main/java/com/cloudera/cai/rag/files/RagFileIndexReconciler.java b/backend/src/main/java/com/cloudera/cai/rag/files/RagFileIndexReconciler.java index f5cc9fba7..d8201e902 100644 --- a/backend/src/main/java/com/cloudera/cai/rag/files/RagFileIndexReconciler.java +++ b/backend/src/main/java/com/cloudera/cai/rag/files/RagFileIndexReconciler.java @@ -66,6 +66,7 @@ public class RagFileIndexReconciler extends BaseReconciler { private final Jdbi jdbi; private final RagBackendClient ragBackendClient; private final RagDataSourceRepository ragDataSourceRepository; + private final RagFileRepository ragFileRepository; @Autowired public RagFileIndexReconciler( @@ -74,12 +75,14 @@ public RagFileIndexReconciler( RagBackendClient ragBackendClient, RagDataSourceRepository ragDataSourceRepository, @Qualifier("singleWorkerReconcilerConfig") ReconcilerConfig reconcilerConfig, + RagFileRepository ragFileRepository, OpenTelemetry openTelemetry) { super(reconcilerConfig, openTelemetry); this.bucketName = bucketName; this.jdbi = jdbi; this.ragBackendClient = ragBackendClient; this.ragDataSourceRepository = ragDataSourceRepository; + this.ragFileRepository = ragFileRepository; } @Override @@ -106,6 +109,12 @@ public void resync() { public ReconcileResult reconcile(Set documents) { for (RagDocument document : documents) { log.info("starting indexing document: {}", document); + var currentDocumentState = ragFileRepository.findDocumentByDocumentId(document.documentId()); + if (currentDocumentState.vectorUploadTimestamp() != null) { + log.info("Document already indexed: {}", document.filename()); + continue; + } + IndexConfiguration indexConfiguration = fetchIndexConfiguration(document.dataSourceId()); Instant updateTimestamp = indexFile(document, indexConfiguration); String updateSql = @@ -150,6 +159,7 @@ public static RagFileIndexReconciler createNull() { RagBackendClient.createNull(), RagDataSourceRepository.createNull(), ReconcilerConfig.builder().isTestReconciler(true).build(), + RagFileRepository.createNull(), OpenTelemetry.noop()); } } diff --git a/backend/src/test/java/com/cloudera/cai/rag/files/RagFileIndexReconcilerTest.java b/backend/src/test/java/com/cloudera/cai/rag/files/RagFileIndexReconcilerTest.java index 14d0becbc..e29ea9269 100644 --- a/backend/src/test/java/com/cloudera/cai/rag/files/RagFileIndexReconcilerTest.java +++ b/backend/src/test/java/com/cloudera/cai/rag/files/RagFileIndexReconcilerTest.java @@ -99,6 +99,9 @@ void reconcile() { .isNull(); reconciler.submit(document.withId(id)); + // add a copy that has already been indexed to make sure we don't try to re-index with + // long-running index jobs + reconciler.submit(document.withId(id).withVectorUploadTimestamp(Instant.now())); await().until(reconciler::isEmpty); await() .untilAsserted( @@ -108,6 +111,7 @@ void reconcile() { assertThat(updatedDocument.vectorUploadTimestamp()).isNotNull(); }); assertThat(requestTracker.getValues()) + .hasSize(1) .contains( new RagBackendClient.TrackedRequest<>( new TrackedIndexRequest( @@ -177,6 +181,7 @@ private RagFileIndexReconciler createTestInstance( RagBackendClient.createNull(tracker, exceptions), RagDataSourceRepository.createNull(), reconcilerConfig, + RagFileRepository.createNull(), OpenTelemetry.noop()); reconciler.init(); return reconciler;