Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit e7f5284

Browse files
author
Adam Lindenthal
committed
JerseyPublisher strategies (non-blocking, best-effort)
Change-Id: Ibf20722e66f08cc2df0a1441e18ee6e2bdf80018
1 parent eb092a6 commit e7f5284

File tree

5 files changed

+342
-69
lines changed

5 files changed

+342
-69
lines changed

core-common/src/main/java/org/glassfish/jersey/internal/util/JerseyPublisher.java

Lines changed: 261 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@
4343
import java.util.concurrent.CompletableFuture;
4444
import java.util.concurrent.Executor;
4545
import java.util.concurrent.ForkJoinPool;
46+
import java.util.concurrent.RejectedExecutionException;
47+
import java.util.concurrent.TimeUnit;
48+
import java.util.function.BiPredicate;
4649
import java.util.function.Consumer;
4750

51+
import org.glassfish.jersey.internal.LocalizationMessages;
4852
import org.glassfish.jersey.internal.jsr166.Flow;
4953
import org.glassfish.jersey.internal.jsr166.SubmissionPublisher;
5054

@@ -61,32 +65,65 @@ public class JerseyPublisher<T> implements javax.ws.rs.Flow.Publisher<T> {
6165
private static final int DEFAULT_BUFFER_CAPACITY = 256;
6266
private SubmissionPublisher<T> submissionPublisher = new SubmissionPublisher<>();
6367

68+
private final PublisherStrategy strategy;
69+
6470
/**
6571
* Creates a new JerseyPublisher using the {@link ForkJoinPool#commonPool()} for async delivery to subscribers
6672
* (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run
67-
* each task), with maximum buffer capacity of {@value DEFAULT_BUFFER_CAPACITY}.
73+
* each task), with maximum buffer capacity of {@value DEFAULT_BUFFER_CAPACITY} and default {@link PublisherStrategy},
74+
* which is {@link PublisherStrategy#BEST_EFFORT}.
6875
*/
6976
public JerseyPublisher() {
70-
this(ForkJoinPool.commonPool(), DEFAULT_BUFFER_CAPACITY);
77+
this(ForkJoinPool.commonPool(), DEFAULT_BUFFER_CAPACITY, PublisherStrategy.BEST_EFFORT);
78+
}
79+
80+
/**
81+
* Creates a new JerseyPublisher using the {@link ForkJoinPool#commonPool()} for async delivery to subscribers
82+
* (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run
83+
* each task), with maximum buffer capacity of {@value DEFAULT_BUFFER_CAPACITY} and given {@link PublisherStrategy}.
84+
*
85+
* @param strategy publisher delivering strategy
86+
*/
87+
public JerseyPublisher(final PublisherStrategy strategy) {
88+
this(ForkJoinPool.commonPool(), DEFAULT_BUFFER_CAPACITY, strategy);
7189
}
7290

7391
/**
7492
* Creates a new JerseyPublisher using the given {@link Executor} for async delivery to subscribers, with the default
75-
* maximum buffer size of {@value DEFAULT_BUFFER_CAPACITY}.
93+
* maximum buffer size of {@value DEFAULT_BUFFER_CAPACITY} and default {@link PublisherStrategy}, which is
94+
* {@link PublisherStrategy#BEST_EFFORT}.
7695
*
7796
* @param executor {@code Executor} the executor to use for async delivery,
7897
* supporting creation of at least one independent thread
7998
* @throws NullPointerException if executor is null
8099
* @throws IllegalArgumentException if maxBufferCapacity not positive
81100
*/
82101
public JerseyPublisher(final Executor executor) {
102+
this(executor, PublisherStrategy.BEST_EFFORT);
103+
}
104+
105+
/**
106+
* Creates a new JerseyPublisher using the given {@link Executor} for async delivery to subscribers, with the default
107+
* maximum buffer size of {@value DEFAULT_BUFFER_CAPACITY} and given {@link PublisherStrategy}.
108+
*
109+
* @param executor {@code Executor} the executor to use for async delivery,
110+
* supporting creation of at least one independent thread
111+
* @param strategy publisher delivering strategy
112+
* @throws NullPointerException if executor is null
113+
* @throws IllegalArgumentException if maxBufferCapacity not positive
114+
*/
115+
public JerseyPublisher(final Executor executor, final PublisherStrategy strategy) {
116+
this.strategy = strategy;
83117
submissionPublisher = new SubmissionPublisher<>(executor, DEFAULT_BUFFER_CAPACITY);
84118
}
85119

120+
121+
86122
/**
87123
* Creates a new JerseyPublisher using the {@link ForkJoinPool#commonPool()} for async delivery to subscribers
88124
* (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run
89-
* each task), with specified maximum buffer capacity.
125+
* each task), with specified maximum buffer capacity and default {@link PublisherStrategy}, which is
126+
* {@link PublisherStrategy#BEST_EFFORT}.
90127
*
91128
* @param maxBufferCapacity the maximum capacity for each
92129
* subscriber's buffer (the enforced capacity may be rounded up to
@@ -95,12 +132,12 @@ public JerseyPublisher(final Executor executor) {
95132
* returns the actual value)
96133
*/
97134
public JerseyPublisher(final int maxBufferCapacity) {
98-
this(ForkJoinPool.commonPool(), maxBufferCapacity);
135+
this(ForkJoinPool.commonPool(), maxBufferCapacity, PublisherStrategy.BEST_EFFORT);
99136
}
100137

101138
/**
102139
* Creates a new JerseyPublisher using the given {@link Executor} for async delivery to subscribers, with the given
103-
* maximum buffer size for each subscriber.
140+
* maximum buffer size for each subscriber and given {@link PublisherStrategy}.
104141
*
105142
* @param executor {@code Executor} the executor to use for async delivery,
106143
* supporting creation of at least one independent thread
@@ -109,48 +146,18 @@ public JerseyPublisher(final int maxBufferCapacity) {
109146
* the nearest power of two and/or bounded by the largest value
110147
* supported by this implementation; method {@link #getMaxBufferCapacity}
111148
* returns the actual value)
149+
* @param strategy publisher delivering strategy
112150
* @throws NullPointerException if executor is null
113151
* @throws IllegalArgumentException if maxBufferCapacity not positive
114152
*/
115-
public JerseyPublisher(final Executor executor, final int maxBufferCapacity) {
153+
public JerseyPublisher(final Executor executor, final int maxBufferCapacity, PublisherStrategy strategy) {
154+
this.strategy = strategy;
116155
submissionPublisher = new SubmissionPublisher<>(executor, maxBufferCapacity);
117156
}
118157

119158
@Override
120159
public void subscribe(final javax.ws.rs.Flow.Subscriber<? super T> subscriber) {
121-
submissionPublisher.subscribe(new Flow.Subscriber<T>() {
122-
123-
@Override
124-
public void onSubscribe(final Flow.Subscription subscription) {
125-
subscriber.onSubscribe(new javax.ws.rs.Flow.Subscription() {
126-
127-
@Override
128-
public void request(final long n) {
129-
subscription.request(n);
130-
}
131-
132-
@Override
133-
public void cancel() {
134-
subscription.cancel();
135-
}
136-
});
137-
}
138-
139-
@Override
140-
public void onNext(final T item) {
141-
subscriber.onNext(item);
142-
}
143-
144-
@Override
145-
public void onError(final Throwable throwable) {
146-
subscriber.onError(throwable);
147-
}
148-
149-
@Override
150-
public void onComplete() {
151-
subscriber.onComplete();
152-
}
153-
});
160+
submissionPublisher.subscribe(new SubscriberWrapper<T>(subscriber));
154161
}
155162

156163
/**
@@ -164,7 +171,7 @@ public void onComplete() {
164171
* @throws NullPointerException if data is null
165172
* @throws java.util.concurrent.RejectedExecutionException if thrown by Executor
166173
*/
167-
public int submit(final T data) {
174+
private int submit(final T data) {
168175
return submissionPublisher.submit(data);
169176
}
170177

@@ -183,6 +190,147 @@ public CompletableFuture<Void> consume(final Consumer<? super T> consumer) {
183190
return submissionPublisher.consume(consumer);
184191
}
185192

193+
/**
194+
* Publishes the given item, if possible, to each current subscriber
195+
* by asynchronously invoking its
196+
* {@link javax.ws.rs.Flow.Subscriber#onNext(Object) onNext} method.
197+
* The item may be dropped by one or more subscribers if resource
198+
* limits are exceeded, in which case the given handler (if non-null)
199+
* is invoked, and if it returns true, retried once. Other calls to
200+
* methods in this class by other threads are blocked while the
201+
* handler is invoked. Unless recovery is assured, options are
202+
* usually limited to logging the error and/or issuing an {@link
203+
* javax.ws.rs.Flow.Subscriber#onError(Throwable) onError}
204+
* signal to the subscriber.
205+
* <p>
206+
* This method returns a status indicator: If negative, it
207+
* represents the (negative) number of drops (failed attempts to
208+
* issue the item to a subscriber). Otherwise it is an estimate of
209+
* the maximum lag (number of items submitted but not yet
210+
* consumed) among all current subscribers. This value is at least
211+
* one (accounting for this submitted item) if there are any
212+
* subscribers, else zero.
213+
* <p>
214+
* If the Executor for this publisher throws a
215+
* RejectedExecutionException (or any other RuntimeException or
216+
* Error) when attempting to asynchronously notify subscribers, or
217+
* the drop handler throws an exception when processing a dropped
218+
* item, then this exception is rethrown.
219+
*
220+
* @param item the (non-null) item to publish
221+
* @param onDrop if non-null, the handler invoked upon a drop to a
222+
* subscriber, with arguments of the subscriber and item; if it
223+
* returns true, an offer is re-attempted (once)
224+
* @return if negative, the (negative) number of drops; otherwise
225+
* an estimate of maximum lag
226+
* @throws IllegalStateException if closed
227+
* @throws NullPointerException if item is null
228+
* @throws RejectedExecutionException if thrown by Executor
229+
*/
230+
private int offer(T item, BiPredicate<javax.ws.rs.Flow.Subscriber<? super T>, ? super T> onDrop) {
231+
return offer(item, 0, TimeUnit.MILLISECONDS, onDrop);
232+
}
233+
234+
/**
235+
* Publishes the given item, if possible, to each current subscriber
236+
* by asynchronously invoking its {@link
237+
* javax.ws.rs.Flow.Subscriber#onNext(Object) onNext} method,
238+
* blocking while resources for any subscription are unavailable,
239+
* up to the specified timeout or until the caller thread is
240+
* interrupted, at which point the given handler (if non-null) is
241+
* invoked, and if it returns true, retried once. (The drop handler
242+
* may distinguish timeouts from interrupts by checking whether
243+
* the current thread is interrupted.)
244+
* Other calls to methods in this class by other
245+
* threads are blocked while the handler is invoked. Unless
246+
* recovery is assured, options are usually limited to logging the
247+
* error and/or issuing an
248+
* {@link javax.ws.rs.Flow.Subscriber#onError(Throwable) onError}
249+
* signal to the subscriber.
250+
* <p>
251+
* This method returns a status indicator: If negative, it
252+
* represents the (negative) number of drops (failed attempts to
253+
* issue the item to a subscriber). Otherwise it is an estimate of
254+
* the maximum lag (number of items submitted but not yet
255+
* consumed) among all current subscribers. This value is at least
256+
* one (accounting for this submitted item) if there are any
257+
* subscribers, else zero.
258+
* <p>
259+
* If the Executor for this publisher throws a
260+
* RejectedExecutionException (or any other RuntimeException or
261+
* Error) when attempting to asynchronously notify subscribers, or
262+
* the drop handler throws an exception when processing a dropped
263+
* item, then this exception is rethrown.
264+
*
265+
* @param item the (non-null) item to publish
266+
* @param timeout how long to wait for resources for any subscriber
267+
* before giving up, in units of {@code unit}
268+
* @param unit a {@code TimeUnit} determining how to interpret the
269+
* {@code timeout} parameter
270+
* @param onDrop if non-null, the handler invoked upon a drop to a
271+
* subscriber, with arguments of the subscriber and item; if it
272+
* returns true, an offer is re-attempted (once)
273+
* @return if negative, the (negative) number of drops; otherwise
274+
* an estimate of maximum lag
275+
* @throws IllegalStateException if closed
276+
* @throws NullPointerException if item is null
277+
* @throws RejectedExecutionException if thrown by Executor
278+
*/
279+
private int offer(T item,
280+
long timeout,
281+
TimeUnit unit,
282+
BiPredicate<javax.ws.rs.Flow.Subscriber<? super T>, ? super T> onDrop) {
283+
284+
285+
BiPredicate<Flow.Subscriber<? super T>, ? super T> callback;
286+
287+
callback = onDrop == null
288+
? this::onDrop
289+
: (BiPredicate<Flow.Subscriber<? super T>, T>)
290+
(subscriber, data) -> {
291+
onDrop.test(getSubscriberWrapper(subscriber).getWrappedSubscriber(), data);
292+
return false;
293+
};
294+
295+
return submissionPublisher.offer(item, timeout, unit, callback);
296+
}
297+
298+
private boolean onDrop(Flow.Subscriber<? super T> subscriber, T t) {
299+
subscriber.onError(new IllegalStateException(LocalizationMessages.SLOW_SUBSCRIBER(t)));
300+
getSubscriberWrapper(subscriber).getSubscription().cancel();
301+
return false;
302+
}
303+
304+
private SubscriberWrapper getSubscriberWrapper(Flow.Subscriber subscriber) {
305+
if (subscriber instanceof SubscriberWrapper) {
306+
return ((SubscriberWrapper) subscriber);
307+
} else {
308+
throw new IllegalArgumentException(LocalizationMessages.UNKNOWN_SUBSCRIBER());
309+
}
310+
311+
}
312+
313+
/**
314+
* Publishes the given item to all current subscribers by invoking its {@code onNext() method} using {@code Executor}
315+
* provided as constructor parameter (or the default {@code Executor} if not provided).
316+
* <p>
317+
* Concrete behaviour is specified by {@link PublisherStrategy} selected upon {@code JerseyPublisher} creation.
318+
*
319+
* @param item the (non-null) item to publish.
320+
* @return if negative, the (negative) number of drops; otherwise an estimate of maximum lag.
321+
* @throws IllegalStateException if closed
322+
* @throws NullPointerException if item is null
323+
* @throws RejectedExecutionException if thrown by {@code Executor}
324+
*/
325+
public int publish(T item) {
326+
if (PublisherStrategy.BLOCKING == strategy) {
327+
return submit(item);
328+
} else {
329+
// PublisherStrategy.BEST_EFFORT
330+
return submissionPublisher.offer(item, this::onDrop);
331+
}
332+
}
333+
186334
/**
187335
* Unless already closed, issues {@code onComplete()} signals to current subscribers, and disallows subsequent
188336
* attempts to publish. Upon return, this method does <em>NOT</em> guarantee that all subscribers have yet
@@ -237,4 +385,76 @@ public Throwable getClosedException() {
237385
public int getMaxBufferCapacity() {
238386
return submissionPublisher.getMaxBufferCapacity();
239387
}
388+
389+
public static class SubscriberWrapper<T> implements Flow.Subscriber<T> {
390+
private javax.ws.rs.Flow.Subscriber<? super T> subscriber;
391+
private Flow.Subscription subscription = null;
392+
393+
public SubscriberWrapper(javax.ws.rs.Flow.Subscriber<? super T> subscriber) {
394+
this.subscriber = subscriber;
395+
}
396+
397+
@Override
398+
public void onSubscribe(final Flow.Subscription subscription) {
399+
this.subscription = subscription;
400+
subscriber.onSubscribe(new javax.ws.rs.Flow.Subscription() {
401+
@Override
402+
public void request(final long n) {
403+
subscription.request(n);
404+
}
405+
406+
@Override
407+
public void cancel() {
408+
subscription.cancel();
409+
}
410+
});
411+
}
412+
413+
@Override
414+
public void onNext(final T item) {
415+
subscriber.onNext(item);
416+
}
417+
418+
@Override
419+
public void onError(final Throwable throwable) {
420+
subscriber.onError(throwable);
421+
}
422+
423+
@Override
424+
public void onComplete() {
425+
subscriber.onComplete();
426+
}
427+
428+
public javax.ws.rs.Flow.Subscriber<? super T> getWrappedSubscriber() {
429+
return subscriber;
430+
}
431+
432+
/**
433+
* Get reference to subscriber's {@link Flow.Subscription}.
434+
*
435+
* @return subscriber's {@code subscription}
436+
*/
437+
public Flow.Subscription getSubscription() {
438+
return this.subscription;
439+
}
440+
}
441+
442+
public enum PublisherStrategy{
443+
/**
444+
* Blocking publisher strategy - tries to deliver to all subscribers regardless the cost.
445+
*
446+
* The thread is blocked uninterruptibly while resources for any subscriber are unavailable.
447+
* This strategy comes with a risk of thread exhaustion, that will lead to publisher being completely blocked by slow
448+
* or incorrectly implemented subscribers.
449+
*/
450+
BLOCKING,
451+
452+
/**
453+
* Best effort publisher strategy - tries to deliver to all subscribers if possible without blocking the processing.
454+
*
455+
* If the buffer is full, publisher invokes {@code onError()} and cancels subscription on a subscriber, that is not
456+
* capable of read the messages at a speed sufficient to unblock the processing.
457+
*/
458+
BEST_EFFORT,
459+
}
240460
}

0 commit comments

Comments
 (0)