Skip to content

Commit 8d425c9

Browse files
cleve-faunafireridlleshestakovgGennadii Shestakoverickpintor
authored
v4 mergeInMain - release set streaming (#352)
* debug auth0 (#330) * OSS-895 [java] Driver needs to detect and throw an error upon loss of networking (#323) * watch on connection * modified parallel scala test * reduced query's quantity * changed fauna query call Co-authored-by: Gennadii Shestakov <shestakov,g@gmail.com> * Revert "OSS-895 [java] Driver needs to detect and throw an error upon loss of networking (#323)" (#351) This reverts commit e7d38cf. * ENG-3044: Introduce set streaming to the java driver (#346) * introduce set streaming to the java driver * Update circleci to use latest public FaunaDB image * Remove unneed test on 100 concurrent streams Co-authored-by: Cleve Stuart <cleve.stuart@fauna.com> * ENG-3044: Introduce set streaming to the Scala driver (#347) * introduce set streaming to the Scala driver * Update circleci config * Remove unneed test on 100 concurrent streams Co-authored-by: Cleve Stuart <cleve.stuart@fauna.com> Co-authored-by: Cleve Stuart <90649124+cleve-fauna@users.noreply.github.com> * Bump keep alive to 4 seconds Co-authored-by: Sergii Zinkevych <s.zinkevych@gmail.com> Co-authored-by: shestakovg <shestakov.g@gmail.com> Co-authored-by: Gennadii Shestakov <shestakov,g@gmail.com> Co-authored-by: Erick Pintor <erickpintor@gmail.com>
1 parent 1780983 commit 8d425c9

File tree

7 files changed

+90
-37
lines changed

7 files changed

+90
-37
lines changed

.circleci/config.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ executors:
1010
resource_class: large
1111
docker:
1212
- image: openjdk:11
13-
- image: gcr.io/faunadb-cloud/faunadb/enterprise:latest
13+
- image: fauna/faunadb
1414
name: core
15-
auth:
16-
username: _json_key
17-
password: $GCR_KEY
15+
1816
environment:
1917
SBT_VERSION: 1.4.7
2018
FAUNA_ROOT_KEY: secret

faunadb-common/src/main/java/com/faunadb/common/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public Builder withCustomHeaders(Map<String, String> headers) {
291291
* the settings of the {@link Builder} instance.
292292
*/
293293
public Connection build() {
294-
System.setProperty("jdk.httpclient.keepalive.timeout", "3");
294+
System.setProperty("jdk.httpclient.keepalive.timeout", "4");
295295
MetricRegistry registry;
296296
registry = Objects.requireNonNullElseGet(metricRegistry, MetricRegistry::new);
297297

faunadb-java/src/main/java/com/faunadb/client/streaming/EventFields.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,11 @@ public String value() {
2929
return "action";
3030
}
3131
};
32+
33+
public static EventField IndexField = new EventField() {
34+
@Override
35+
public String value() {
36+
return "index";
37+
}
38+
};
3239
}

faunadb-java/src/test/java/com/faunadb/client/ClientSpec.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2312,7 +2312,7 @@ public void shouldTestIdentity() throws Exception {
23122312
@Test
23132313
public void shouldTestCreateAccessProvider() throws Exception {
23142314
String roleName = randomStartingWith("role_");
2315-
String providerName = randomStartingWith("provider_");
2315+
String providerName = randomStartingWith("provider_jvm_");
23162316
String issuerName = randomStartingWith("issuer_");
23172317
String fullUri = randomStartingWith("https://") + ".auth0.com";
23182318

@@ -3277,6 +3277,62 @@ public void onComplete() {
32773277
assertThat(e3.at("event", "document", "data").to(OBJECT).get(), is(Collections.singletonMap("testField", Value("testValue3"))));
32783278
}
32793279

3280+
@Test
3281+
public void shouldStreamSetEvents() throws Exception {
3282+
String coll = randomStartingWith("collection_");
3283+
query(CreateCollection(Obj("name", Value(coll))));
3284+
3285+
Flow.Publisher<Value> pub = adminClient.stream(Documents(Collection(coll))).get();
3286+
CompletableFuture<List<Value>> capturedEvents = new CompletableFuture<>();
3287+
3288+
Flow.Subscriber<Value> sub = new Flow.Subscriber<>() {
3289+
List<Value> captured = new ArrayList<>();
3290+
Flow.Subscription sub = null;
3291+
3292+
@Override
3293+
public void onSubscribe(Flow.Subscription sub) {
3294+
this.sub = sub;
3295+
this.sub.request(1);
3296+
}
3297+
3298+
@Override
3299+
public void onNext(Value v) {
3300+
captured.add(v);
3301+
if (captured.size() == 3) {
3302+
capturedEvents.complete(captured);
3303+
sub.cancel();
3304+
} else {
3305+
sub.request(1);
3306+
}
3307+
}
3308+
3309+
@Override
3310+
public void onError(Throwable throwable) {
3311+
capturedEvents.completeExceptionally(throwable);
3312+
}
3313+
3314+
@Override
3315+
public void onComplete() {
3316+
capturedEvents.completeExceptionally(
3317+
new IllegalStateException("not expecting the stream to complete"));
3318+
}
3319+
};
3320+
3321+
// subscribe to publisher
3322+
pub.subscribe(sub);
3323+
3324+
// push 2 events
3325+
Value doc = adminClient.query(Create(Collection(coll), Obj())).get();
3326+
adminClient.query(Delete(doc.at("ref"))).get();
3327+
3328+
List<Value> events = capturedEvents.get();
3329+
assertThat(events.get(0).at("type").to(STRING).get(), equalTo("start"));
3330+
assertThat(events.get(1).at("type").to(STRING).get(), equalTo("set"));
3331+
assertThat(events.get(1).at("event").at("action").to(STRING).get(), equalTo("add"));
3332+
assertThat(events.get(2).at("type").to(STRING).get(), equalTo("set"));
3333+
assertThat(events.get(2).at("event").at("action").to(STRING).get(), equalTo("remove"));
3334+
}
3335+
32803336
@Test
32813337
public void streamEventsWithSnapshotData() throws Exception {
32823338
String collectionName = randomStartingWith("collection_");

faunadb-scala/src/load/scala/faunadb/FaunaClientLoadSpec.scala

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -190,37 +190,6 @@ class FaunaClientLoadSpec extends FixtureAsyncWordSpec with Matchers with ScalaF
190190
}
191191
}
192192

193-
"fail to handle more than 100 concurrent streams on the same client" in { client =>
194-
def setup(): Future[Result[Value]] = {
195-
val collectionName = RandomGenerator.aRandomString
196-
for {
197-
_ <- client.query(CreateCollection(Obj("name" -> collectionName)))
198-
createdDoc <- client.query(Create(Collection(collectionName), Obj("credentials" -> Obj("password" -> "abcdefg"))))
199-
} yield createdDoc("ref")
200-
}
201-
202-
def run(docRef: Value): Future[Unit] = {
203-
val maxConcurrentStreamCount = 100
204-
for {
205-
// create a first publisher to setup the connection that will be reused by all the other publishers
206-
_ <- client.stream(docRef)
207-
// adding 99 publisher (makes 100 in total)
208-
_ <- Future.traverse(List.fill(maxConcurrentStreamCount - 1)(docRef))(ref => client.stream(ref))
209-
// adding 101st subscriber creates the error
210-
_ <- client.stream(docRef)
211-
} yield ()
212-
}
213-
214-
val result =
215-
for {
216-
docRef <- setup()
217-
result <- run(docRef)
218-
} yield result
219-
220-
recoverToExceptionIf[BadRequestException](result).map { e =>
221-
e.getMessage should include("the maximum number of streams has been reached for this client")
222-
}
223-
}
224193
}
225194

226195
def testSubscriber(messageCount: Int, publisher: Flow.Publisher[Value]): Future[List[Value]] = {

faunadb-scala/src/main/scala/faunadb/FaunaClient.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ object FaunaClient {
7373
case object PrevField extends EventField("prev")
7474
case object DiffField extends EventField("diff")
7575
case object ActionField extends EventField("action")
76+
case object IndexField extends EventField("index")
7677
}
7778

7879
/**

faunadb-scala/src/test/scala/faunadb/ClientSpec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,6 +2081,28 @@ class ClientSpec
20812081
}
20822082
}
20832083

2084+
it should "stream set events" in {
2085+
client.query(CreateCollection(Obj("name" -> "foo"))).futureValue
2086+
val publisherValue = client.stream(Documents(Collection("foo"))).futureValue
2087+
val events = testSubscriber(3, publisherValue)
2088+
2089+
// push 3 updates
2090+
val doc = client.query(Create(Collection("foo"), Obj())).futureValue
2091+
client.query(Delete(doc("ref"))).futureValue
2092+
2093+
// assertion 3 events (start + 2 updates)
2094+
events.futureValue match {
2095+
case start :: e1 :: e2 :: Nil =>
2096+
start("type").get shouldBe StringV("start")
2097+
e1("type").get shouldBe StringV("set")
2098+
e1("event", "action").get shouldBe StringV("add")
2099+
e2("type").get shouldBe StringV("set")
2100+
e2("event", "action").get shouldBe StringV("remove")
2101+
case _ =>
2102+
fail("expected 3 events")
2103+
}
2104+
}
2105+
20842106
it should "stream on document reference with opt-in fields" in {
20852107
val createdDoc = client.query(Create(Collection("spells"), Obj("data" -> Obj("testField" -> "testValue0")))).futureValue
20862108
val docRef = createdDoc(RefField)

0 commit comments

Comments
 (0)