23
23
import java .util .ArrayList ;
24
24
import java .util .HashMap ;
25
25
import java .util .List ;
26
+ import java .util .Map ;
26
27
import java .util .concurrent .*;
27
28
import java .util .function .Consumer ;
28
29
import java .util .stream .Collectors ;
30
+ import java .util .stream .IntStream ;
29
31
import java .util .stream .Stream ;
30
32
31
33
public class OpenFgaClient {
@@ -36,6 +38,7 @@ public class OpenFgaClient {
36
38
private static final String CLIENT_BULK_REQUEST_ID_HEADER = "X-OpenFGA-Client-Bulk-Request-Id" ;
37
39
private static final String CLIENT_METHOD_HEADER = "X-OpenFGA-Client-Method" ;
38
40
private static final int DEFAULT_MAX_METHOD_PARALLEL_REQS = 10 ;
41
+ private static final int DEFAULT_MAX_BATCH_SIZE = 50 ;
39
42
40
43
public OpenFgaClient (ClientConfiguration configuration ) throws FgaInvalidParameterException {
41
44
this (configuration , new ApiClient ());
@@ -574,29 +577,29 @@ public CompletableFuture<ClientCheckResponse> check(ClientCheckRequest request,
574
577
*
575
578
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
576
579
*/
577
- public CompletableFuture <List <ClientBatchCheckResponse >> batchCheck (List <ClientCheckRequest > requests )
580
+ public CompletableFuture <List <ClientBatchCheckClientResponse >> clientBatchCheck (List <ClientCheckRequest > requests )
578
581
throws FgaInvalidParameterException {
579
- return batchCheck (requests , null );
582
+ return clientBatchCheck (requests , null );
580
583
}
581
584
582
585
/**
583
586
* BatchCheck - Run a set of checks (evaluates)
584
587
*
585
588
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
586
589
*/
587
- public CompletableFuture <List <ClientBatchCheckResponse >> batchCheck (
588
- List <ClientCheckRequest > requests , ClientBatchCheckOptions batchCheckOptions )
590
+ public CompletableFuture <List <ClientBatchCheckClientResponse >> clientBatchCheck (
591
+ List <ClientCheckRequest > requests , ClientBatchCheckClientOptions batchCheckOptions )
589
592
throws FgaInvalidParameterException {
590
593
configuration .assertValid ();
591
594
configuration .assertValidStoreId ();
592
595
593
596
var options = batchCheckOptions != null
594
597
? batchCheckOptions
595
- : new ClientBatchCheckOptions ().maxParallelRequests (DEFAULT_MAX_METHOD_PARALLEL_REQS );
598
+ : new ClientBatchCheckClientOptions ().maxParallelRequests (DEFAULT_MAX_METHOD_PARALLEL_REQS );
596
599
if (options .getAdditionalHeaders () == null ) {
597
600
options .additionalHeaders (new HashMap <>());
598
601
}
599
- options .getAdditionalHeaders ().putIfAbsent (CLIENT_METHOD_HEADER , "BatchCheck " );
602
+ options .getAdditionalHeaders ().putIfAbsent (CLIENT_METHOD_HEADER , "ClientBatchCheck " );
600
603
options .getAdditionalHeaders ()
601
604
.putIfAbsent (CLIENT_BULK_REQUEST_ID_HEADER , randomUUID ().toString ());
602
605
@@ -606,13 +609,13 @@ public CompletableFuture<List<ClientBatchCheckResponse>> batchCheck(
606
609
var executor = Executors .newScheduledThreadPool (maxParallelRequests );
607
610
var latch = new CountDownLatch (requests .size ());
608
611
609
- var responses = new ConcurrentLinkedQueue <ClientBatchCheckResponse >();
612
+ var responses = new ConcurrentLinkedQueue <ClientBatchCheckClientResponse >();
610
613
611
614
final var clientCheckOptions = options .asClientCheckOptions ();
612
615
613
616
Consumer <ClientCheckRequest > singleClientCheckRequest =
614
617
request -> call (() -> this .check (request , clientCheckOptions ))
615
- .handleAsync (ClientBatchCheckResponse .asyncHandler (request ))
618
+ .handleAsync (ClientBatchCheckClientResponse .asyncHandler (request ))
616
619
.thenAccept (responses ::add )
617
620
.thenRun (latch ::countDown );
618
621
@@ -627,6 +630,117 @@ public CompletableFuture<List<ClientBatchCheckResponse>> batchCheck(
627
630
}
628
631
}
629
632
633
+ /**
634
+ * BatchCheck - Run a set of checks (evaluates)
635
+ *
636
+ * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
637
+ */
638
+ public CompletableFuture <ClientBatchCheckResponse > batchCheck (ClientBatchCheckRequest request )
639
+ throws FgaInvalidParameterException , FgaValidationError {
640
+ return batchCheck (request , null );
641
+ }
642
+
643
+ /**
644
+ * BatchCheck - Run a set of checks (evaluates)
645
+ *
646
+ * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
647
+ */
648
+ public CompletableFuture <ClientBatchCheckResponse > batchCheck (
649
+ ClientBatchCheckRequest requests , ClientBatchCheckOptions batchCheckOptions )
650
+ throws FgaInvalidParameterException , FgaValidationError {
651
+ configuration .assertValid ();
652
+ configuration .assertValidStoreId ();
653
+
654
+ var options = batchCheckOptions != null
655
+ ? batchCheckOptions
656
+ : new ClientBatchCheckOptions ()
657
+ .maxParallelRequests (DEFAULT_MAX_METHOD_PARALLEL_REQS )
658
+ .maxBatchSize (DEFAULT_MAX_BATCH_SIZE );
659
+ if (options .getAdditionalHeaders () == null ) {
660
+ options .additionalHeaders (new HashMap <>());
661
+ }
662
+ options .getAdditionalHeaders ().putIfAbsent (CLIENT_METHOD_HEADER , "BatchCheck" );
663
+ options .getAdditionalHeaders ()
664
+ .putIfAbsent (CLIENT_BULK_REQUEST_ID_HEADER , randomUUID ().toString ());
665
+
666
+ Map <String , ClientBatchCheckItem > correlationIdToCheck = new HashMap <>();
667
+
668
+ List <BatchCheckItem > collect = new ArrayList <>();
669
+ for (ClientBatchCheckItem check : requests .getChecks ()) {
670
+ String correlationId = check .getCorrelationId ();
671
+ correlationId = correlationId == null || correlationId .isBlank ()
672
+ ? randomUUID ().toString ()
673
+ : correlationId ;
674
+
675
+ BatchCheckItem batchCheckItem = new BatchCheckItem ()
676
+ .tupleKey (new CheckRequestTupleKey ()
677
+ .user (check .getUser ())
678
+ .relation (check .getRelation ())
679
+ ._object (check .getObject ()))
680
+ .context (check .getContext ())
681
+ .correlationId (correlationId );
682
+
683
+ List <ClientTupleKey > contextualTuples = check .getContextualTuples ();
684
+ if (contextualTuples != null && !contextualTuples .isEmpty ()) {
685
+ batchCheckItem .contextualTuples (ClientTupleKey .asContextualTupleKeys (contextualTuples ));
686
+ }
687
+
688
+ collect .add (batchCheckItem );
689
+
690
+ if (correlationIdToCheck .containsKey (correlationId )) {
691
+ throw new FgaValidationError (
692
+ "correlationId" , "When calling batchCheck, correlation IDs must be unique" );
693
+ }
694
+
695
+ correlationIdToCheck .put (correlationId , check );
696
+ }
697
+
698
+ int maxBatchSize = options .getMaxBatchSize () != null ? options .getMaxBatchSize () : DEFAULT_MAX_BATCH_SIZE ;
699
+ List <List <BatchCheckItem >> batchedChecks = IntStream .range (
700
+ 0 , (collect .size () + maxBatchSize - 1 ) / maxBatchSize )
701
+ .mapToObj (i -> collect .subList (i * maxBatchSize , Math .min ((i + 1 ) * maxBatchSize , collect .size ())))
702
+ .collect (Collectors .toList ());
703
+
704
+ int maxParallelRequests = options .getMaxParallelRequests () != null
705
+ ? options .getMaxParallelRequests ()
706
+ : DEFAULT_MAX_METHOD_PARALLEL_REQS ;
707
+ var executor = Executors .newScheduledThreadPool (maxParallelRequests );
708
+ var latch = new CountDownLatch (batchedChecks .size ());
709
+
710
+ var responses = new ConcurrentLinkedQueue <ClientBatchCheckSingleResponse >();
711
+
712
+ var override = new ConfigurationOverride ().addHeaders (options );
713
+
714
+ Consumer <List <BatchCheckItem >> singleBatchCheckRequest = request -> call (() ->
715
+ api .batchCheck (configuration .getStoreId (), new BatchCheckRequest ().checks (request ), override ))
716
+ .handleAsync ((batchCheckResponseApiResponse , throwable ) -> {
717
+ Map <String , BatchCheckSingleResult > response =
718
+ batchCheckResponseApiResponse .getData ().getResult ();
719
+
720
+ List <ClientBatchCheckSingleResponse > batchResults = new ArrayList <>();
721
+ response .forEach ((key , result ) -> {
722
+ boolean allowed = Boolean .TRUE .equals (result .getAllowed ());
723
+ ClientBatchCheckItem checkItem = correlationIdToCheck .get (key );
724
+ var singleResponse =
725
+ new ClientBatchCheckSingleResponse (allowed , checkItem , key , result .getError ());
726
+ batchResults .add (singleResponse );
727
+ });
728
+ return batchResults ;
729
+ })
730
+ .thenAccept (responses ::addAll )
731
+ .thenRun (latch ::countDown );
732
+
733
+ try {
734
+ batchedChecks .forEach (batch -> executor .execute (() -> singleBatchCheckRequest .accept (batch )));
735
+ latch .await ();
736
+ return CompletableFuture .completedFuture (new ClientBatchCheckResponse (new ArrayList <>(responses )));
737
+ } catch (Exception e ) {
738
+ return CompletableFuture .failedFuture (e );
739
+ } finally {
740
+ executor .shutdown ();
741
+ }
742
+ }
743
+
630
744
/**
631
745
* Expand - Expands the relationships in userset tree format (evaluates)
632
746
*
@@ -764,7 +878,7 @@ public CompletableFuture<ClientListRelationsResponse> listRelations(
764
878
.context (request .getContext ()))
765
879
.collect (Collectors .toList ());
766
880
767
- return this .batchCheck (batchCheckRequests , options .asClientBatchCheckOptions ())
881
+ return this .clientBatchCheck (batchCheckRequests , options .asClientBatchCheckClientOptions ())
768
882
.thenCompose (responses -> call (() -> ClientListRelationsResponse .fromBatchCheckResponses (responses )));
769
883
}
770
884
0 commit comments