Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 5854fbe

Browse files
Adam LindenthalGerrit Code Review
authored andcommitted
Merge "JAX-RS 2.1 SSE API Implementation"
2 parents 835a78f + 893dfe6 commit 5854fbe

File tree

33 files changed

+4660
-24
lines changed

33 files changed

+4660
-24
lines changed

bundles/jaxrs-ri/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
55
6-
Copyright (c) 2012-2016 Oracle and/or its affiliates. All rights reserved.
6+
Copyright (c) 2012-2017 Oracle and/or its affiliates. All rights reserved.
77
88
The contents of this file are subject to the terms of either the GNU
99
General Public License Version 2 only ("GPL") or the Common Development
@@ -105,6 +105,11 @@
105105
<artifactId>jersey-container-servlet</artifactId>
106106
<version>${project.version}</version>
107107
</dependency>
108+
<dependency>
109+
<groupId>org.glassfish.jersey.media</groupId>
110+
<artifactId>jersey-media-sse</artifactId>
111+
<version>${project.version}</version>
112+
</dependency>
108113

109114
<!-- JAX-RS RI Sources -->
110115
<dependency>
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Written by Doug Lea with assistance from members of JCP JSR-166
3+
* Expert Group and released to the public domain, as explained at
4+
* http://creativecommons.org/publicdomain/zero/1.0/
5+
*/
6+
7+
package org.glassfish.jersey.internal.jsr166;
8+
9+
/**
10+
* Interrelated interfaces and static methods for establishing
11+
* flow-controlled components in which {@link Publisher Publishers}
12+
* produce items consumed by one or more {@link Subscriber
13+
* Subscribers}, each managed by a {@link Subscription
14+
* Subscription}.
15+
*
16+
* <p>These interfaces correspond to the <a
17+
* href="http://www.reactive-streams.org/"> reactive-streams</a>
18+
* specification. They apply in both concurrent and distributed
19+
* asynchronous settings: All (seven) methods are defined in {@code
20+
* void} "one-way" message style. Communication relies on a simple form
21+
* of flow control (method {@link Subscription#request}) that can be
22+
* used to avoid resource management problems that may otherwise occur
23+
* in "push" based systems.
24+
*
25+
* <p><b>Examples.</b> A {@link Publisher} usually defines its own
26+
* {@link Subscription} implementation; constructing one in method
27+
* {@code subscribe} and issuing it to the calling {@link
28+
* Subscriber}. It publishes items to the subscriber asynchronously,
29+
* normally using an {@link Executor}. For example, here is a very
30+
* simple publisher that only issues (when requested) a single {@code
31+
* TRUE} item to a single subscriber. Because the subscriber receives
32+
* only a single item, this class does not use buffering and ordering
33+
* control required in most implementations (for example {@link
34+
* SubmissionPublisher}).
35+
*
36+
* <pre> {@code
37+
* class OneShotPublisher implements Publisher<Boolean> {
38+
* private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
39+
* private boolean subscribed; // true after first subscribe
40+
* public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
41+
* if (subscribed)
42+
* subscriber.onError(new IllegalStateException()); // only one allowed
43+
* else {
44+
* subscribed = true;
45+
* subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
46+
* }
47+
* }
48+
* static class OneShotSubscription implements Subscription {
49+
* private final Subscriber<? super Boolean> subscriber;
50+
* private final ExecutorService executor;
51+
* private Future<?> future; // to allow cancellation
52+
* private boolean completed;
53+
* OneShotSubscription(Subscriber<? super Boolean> subscriber,
54+
* ExecutorService executor) {
55+
* this.subscriber = subscriber;
56+
* this.executor = executor;
57+
* }
58+
* public synchronized void request(long n) {
59+
* if (n != 0 && !completed) {
60+
* completed = true;
61+
* if (n < 0) {
62+
* IllegalArgumentException ex = new IllegalArgumentException();
63+
* executor.execute(() -> subscriber.onError(ex));
64+
* } else {
65+
* future = executor.submit(() -> {
66+
* subscriber.onNext(Boolean.TRUE);
67+
* subscriber.onComplete();
68+
* });
69+
* }
70+
* }
71+
* }
72+
* public synchronized void cancel() {
73+
* completed = true;
74+
* if (future != null) future.cancel(false);
75+
* }
76+
* }
77+
* }}</pre>
78+
*
79+
* <p>A {@link Subscriber} arranges that items be requested and
80+
* processed. Items (invocations of {@link Subscriber#onNext}) are
81+
* not issued unless requested, but multiple items may be requested.
82+
* Many Subscriber implementations can arrange this in the style of
83+
* the following example, where a buffer size of 1 single-steps, and
84+
* larger sizes usually allow for more efficient overlapped processing
85+
* with less communication; for example with a value of 64, this keeps
86+
* total outstanding requests between 32 and 64.
87+
* Because Subscriber method invocations for a given {@link
88+
* Subscription} are strictly ordered, there is no need for these
89+
* methods to use locks or volatiles unless a Subscriber maintains
90+
* multiple Subscriptions (in which case it is better to instead
91+
* define multiple Subscribers, each with its own Subscription).
92+
*
93+
* <pre> {@code
94+
* class SampleSubscriber<T> implements Subscriber<T> {
95+
* final Consumer<? super T> consumer;
96+
* Subscription subscription;
97+
* final long bufferSize;
98+
* long count;
99+
* SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
100+
* this.bufferSize = bufferSize;
101+
* this.consumer = consumer;
102+
* }
103+
* public void onSubscribe(Subscription subscription) {
104+
* long initialRequestSize = bufferSize;
105+
* count = bufferSize - bufferSize / 2; // re-request when half consumed
106+
* (this.subscription = subscription).request(initialRequestSize);
107+
* }
108+
* public void onNext(T item) {
109+
* if (--count <= 0)
110+
* subscription.request(count = bufferSize - bufferSize / 2);
111+
* consumer.accept(item);
112+
* }
113+
* public void onError(Throwable ex) { ex.printStackTrace(); }
114+
* public void onComplete() {}
115+
* }}</pre>
116+
*
117+
* <p>The default value of {@link #defaultBufferSize} may provide a
118+
* useful starting point for choosing request sizes and capacities in
119+
* Flow components based on expected rates, resources, and usages.
120+
* Or, when flow control is never needed, a subscriber may initially
121+
* request an effectively unbounded number of items, as in:
122+
*
123+
* <pre> {@code
124+
* class UnboundedSubscriber<T> implements Subscriber<T> {
125+
* public void onSubscribe(Subscription subscription) {
126+
* subscription.request(Long.MAX_VALUE); // effectively unbounded
127+
* }
128+
* public void onNext(T item) { use(item); }
129+
* public void onError(Throwable ex) { ex.printStackTrace(); }
130+
* public void onComplete() {}
131+
* void use(T item) { ... }
132+
* }}</pre>
133+
*
134+
* @author Doug Lea
135+
* @since 9
136+
*/
137+
public final class Flow {
138+
139+
private Flow() {} // uninstantiable
140+
141+
/**
142+
* A producer of items (and related control messages) received by
143+
* Subscribers. Each current {@link Subscriber} receives the same
144+
* items (via method {@code onNext}) in the same order, unless
145+
* drops or errors are encountered. If a Publisher encounters an
146+
* error that does not allow items to be issued to a Subscriber,
147+
* that Subscriber receives {@code onError}, and then receives no
148+
* further messages. Otherwise, when it is known that no further
149+
* messages will be issued to it, a subscriber receives {@code
150+
* onComplete}. Publishers ensure that Subscriber method
151+
* invocations for each subscription are strictly ordered in <a
152+
* href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
153+
* order.
154+
*
155+
* <p>Publishers may vary in policy about whether drops (failures
156+
* to issue an item because of resource limitations) are treated
157+
* as unrecoverable errors. Publishers may also vary about
158+
* whether Subscribers receive items that were produced or
159+
* available before they subscribed.
160+
*
161+
* @param <T> the published item type
162+
*/
163+
@FunctionalInterface
164+
public static interface Publisher<T> {
165+
/**
166+
* Adds the given Subscriber if possible. If already
167+
* subscribed, or the attempt to subscribe fails due to policy
168+
* violations or errors, the Subscriber's {@code onError}
169+
* method is invoked with an {@link IllegalStateException}.
170+
* Otherwise, the Subscriber's {@code onSubscribe} method is
171+
* invoked with a new {@link Subscription}. Subscribers may
172+
* enable receiving items by invoking the {@code request}
173+
* method of this Subscription, and may unsubscribe by
174+
* invoking its {@code cancel} method.
175+
*
176+
* @param subscriber the subscriber
177+
* @throws NullPointerException if subscriber is null
178+
*/
179+
public void subscribe(Subscriber<? super T> subscriber);
180+
}
181+
182+
/**
183+
* A receiver of messages. The methods in this interface are
184+
* invoked in strict sequential order for each {@link
185+
* Subscription}.
186+
*
187+
* @param <T> the subscribed item type
188+
*/
189+
public static interface Subscriber<T> {
190+
/**
191+
* Method invoked prior to invoking any other Subscriber
192+
* methods for the given Subscription. If this method throws
193+
* an exception, resulting behavior is not guaranteed, but may
194+
* cause the Subscription not to be established or to be cancelled.
195+
*
196+
* <p>Typically, implementations of this method invoke {@code
197+
* subscription.request} to enable receiving items.
198+
*
199+
* @param subscription a new subscription
200+
*/
201+
public void onSubscribe(Subscription subscription);
202+
203+
/**
204+
* Method invoked with a Subscription's next item. If this
205+
* method throws an exception, resulting behavior is not
206+
* guaranteed, but may cause the Subscription to be cancelled.
207+
*
208+
* @param item the item
209+
*/
210+
public void onNext(T item);
211+
212+
/**
213+
* Method invoked upon an unrecoverable error encountered by a
214+
* Publisher or Subscription, after which no other Subscriber
215+
* methods are invoked by the Subscription. If this method
216+
* itself throws an exception, resulting behavior is
217+
* undefined.
218+
*
219+
* @param throwable the exception
220+
*/
221+
public void onError(Throwable throwable);
222+
223+
/**
224+
* Method invoked when it is known that no additional
225+
* Subscriber method invocations will occur for a Subscription
226+
* that is not already terminated by error, after which no
227+
* other Subscriber methods are invoked by the Subscription.
228+
* If this method throws an exception, resulting behavior is
229+
* undefined.
230+
*/
231+
public void onComplete();
232+
}
233+
234+
/**
235+
* Message control linking a {@link Publisher} and {@link
236+
* Subscriber}. Subscribers receive items only when requested,
237+
* and may cancel at any time. The methods in this interface are
238+
* intended to be invoked only by their Subscribers; usages in
239+
* other contexts have undefined effects.
240+
*/
241+
public static interface Subscription {
242+
/**
243+
* Adds the given number {@code n} of items to the current
244+
* unfulfilled demand for this subscription. If {@code n} is
245+
* negative, the Subscriber will receive an {@code onError}
246+
* signal with an {@link IllegalArgumentException} argument.
247+
* Otherwise, the Subscriber will receive up to {@code n}
248+
* additional {@code onNext} invocations (or fewer if
249+
* terminated).
250+
*
251+
* @param n the increment of demand; a value of {@code
252+
* Long.MAX_VALUE} may be considered as effectively unbounded
253+
*/
254+
public void request(long n);
255+
256+
/**
257+
* Causes the Subscriber to (eventually) stop receiving
258+
* messages. Implementation is best-effort -- additional
259+
* messages may be received after invoking this method.
260+
* A cancelled subscription need not ever receive an
261+
* {@code onComplete} or {@code onError} signal.
262+
*/
263+
public void cancel();
264+
}
265+
266+
/**
267+
* A component that acts as both a Subscriber and Publisher.
268+
*
269+
* @param <T> the subscribed item type
270+
* @param <R> the published item type
271+
*/
272+
public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
273+
}
274+
275+
static final int DEFAULT_BUFFER_SIZE = 256;
276+
277+
/**
278+
* Returns a default value for Publisher or Subscriber buffering,
279+
* that may be used in the absence of other constraints.
280+
*
281+
* @implNote
282+
* The current value returned is 256.
283+
*
284+
* @return the buffer size value
285+
*/
286+
public static int defaultBufferSize() {
287+
return DEFAULT_BUFFER_SIZE;
288+
}
289+
290+
}

0 commit comments

Comments
 (0)