Skip to content

Commit fbc1734

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent edaa8c4 commit fbc1734

File tree

5 files changed

+404
-0
lines changed

5 files changed

+404
-0
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
5+
import java.util.List;
6+
7+
@FunctionalInterface
8+
public interface ResultConverter<V> {
9+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
10+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
/* package */ final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final List<ValueProtos.Column> columns;
15+
16+
private final ResultConverter<V> converter;
17+
18+
private int position = 0;
19+
20+
public ResultSetIterator(ResultConverter<V> converter, ResultSetReader resultSet) {
21+
this.converter = converter;
22+
this.resultSet = resultSet;
23+
24+
if (resultSet.getRowCount() > 0) {
25+
columns = getColumns(resultSet);
26+
} else {
27+
columns = new ArrayList<>();
28+
}
29+
30+
this.resultSet.setRowIndex(0);
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int col = 0; col < columns.size(); col++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(col)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
resultSet.setRowIndex(0);
60+
List<ValueProtos.Column> result = new ArrayList<>();
61+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
62+
result.add(ValueProtos.Column.newBuilder()
63+
.setName(resultSet.getColumnName(i))
64+
.build());
65+
}
66+
return result;
67+
}
68+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.table.result.ResultSetReader;
4+
import tech.ydb.yoj.ExperimentalApi;
5+
6+
import java.time.Duration;
7+
import java.util.Spliterator;
8+
import java.util.function.Consumer;
9+
import java.util.stream.Stream;
10+
import java.util.stream.StreamSupport;
11+
12+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
13+
public final class YdbSpliterator<V> implements Spliterator<V> {
14+
private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5);
15+
16+
private final ResultConverter<V> converter;
17+
private final int flags;
18+
private final YdbSpliteratorQueue<ResultSetReader> queue;
19+
20+
private ResultSetIterator<V> resultIterator;
21+
22+
private boolean closed = false;
23+
24+
public YdbSpliterator(ResultConverter<V> converter, boolean isOrdered) {
25+
this(converter, isOrdered, DEFAULT_STREAM_WORK_TIMEOUT);
26+
}
27+
28+
private YdbSpliterator(ResultConverter<V> converter, boolean isOrdered, Duration streamWorkTimeout) {
29+
this.converter = converter;
30+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
31+
this.queue = new YdbSpliteratorQueue<>(1, streamWorkTimeout);
32+
}
33+
34+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
35+
public Stream<V> createStream() {
36+
return StreamSupport.stream(this, false).onClose(this::close);
37+
}
38+
39+
@Override
40+
public boolean tryAdvance(Consumer<? super V> action) {
41+
if (closed) {
42+
return false;
43+
}
44+
45+
if (resultIterator == null || !resultIterator.hasNext()) {
46+
ResultSetReader resultSet = queue.poll();
47+
if (resultSet == null) {
48+
closed = true;
49+
return false;
50+
}
51+
resultIterator = new ResultSetIterator<>(converter, resultSet);
52+
}
53+
54+
V value = resultIterator.next();
55+
56+
action.accept(value);
57+
58+
return true;
59+
}
60+
61+
public void close() {
62+
closed = true;
63+
queue.close();
64+
}
65+
66+
@Override
67+
public Spliterator<V> trySplit() {
68+
return null;
69+
}
70+
71+
@Override
72+
public long estimateSize() {
73+
return Long.MAX_VALUE;
74+
}
75+
76+
@Override
77+
public long getExactSizeIfKnown() {
78+
return -1;
79+
}
80+
81+
@Override
82+
public int characteristics() {
83+
return flags;
84+
}
85+
86+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import com.google.common.base.Preconditions;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import tech.ydb.yoj.ExperimentalApi;
7+
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
8+
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
9+
10+
import java.time.Duration;
11+
import java.util.ArrayDeque;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.locks.Condition;
14+
import java.util.concurrent.locks.Lock;
15+
import java.util.concurrent.locks.ReentrantLock;
16+
17+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
18+
/* package */ final class YdbSpliteratorQueue<V> {
19+
private static final Logger log = LoggerFactory.getLogger(YdbSpliteratorQueue.class);
20+
21+
private static final SupplierStatus UNDONE_SUPPLIER_STATUS = () -> false;
22+
23+
private final int maxQueueSize;
24+
private final ArrayDeque<V> queue;
25+
private final long streamWorkDeadlineNanos;
26+
27+
private final Lock lock = new ReentrantLock();
28+
private final Condition newElement = lock.newCondition();
29+
private final Condition queueIsNotFull = lock.newCondition();
30+
31+
private SupplierStatus supplierStatus = UNDONE_SUPPLIER_STATUS;
32+
private boolean closed = false;
33+
34+
public YdbSpliteratorQueue(int maxQueueSize, Duration streamWorkTimeout) {
35+
Preconditions.checkArgument(maxQueueSize > 0, "maxQueueSize must be greater than 0");
36+
this.maxQueueSize = maxQueueSize;
37+
this.queue = new ArrayDeque<>(maxQueueSize);
38+
this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout));
39+
}
40+
41+
public boolean onNext(V value) {
42+
Preconditions.checkState(supplierStatus.equals(UNDONE_SUPPLIER_STATUS),
43+
"can't call onNext after supplierDone"
44+
);
45+
46+
lock.lock();
47+
try {
48+
if (closed) {
49+
return false;
50+
}
51+
52+
// Only one supplier is possible, queue can't be full in this situation
53+
queue.add(value);
54+
55+
newElement.signal();
56+
57+
if (queue.size() == maxQueueSize) {
58+
try {
59+
if (!queueIsNotFull.await(calculateTimeout(), TimeUnit.NANOSECONDS)) {
60+
throw new OfferDeadlineExceededException();
61+
}
62+
} catch (InterruptedException e) {
63+
Thread.currentThread().interrupt();
64+
throw new QueryInterruptedException("Supplier thread interrupted", e);
65+
}
66+
67+
if (closed) {
68+
return false;
69+
}
70+
}
71+
} finally {
72+
lock.unlock();
73+
}
74+
75+
return true;
76+
}
77+
78+
// (supplier thread) Send knowledge to stream when data is over.
79+
public void supplierDone(SupplierStatus status) {
80+
lock.lock();
81+
try {
82+
if (closed) {
83+
return;
84+
}
85+
86+
supplierStatus = status;
87+
88+
newElement.signal();
89+
} finally {
90+
lock.unlock();
91+
}
92+
}
93+
94+
public boolean isClosed() {
95+
lock.lock();
96+
try {
97+
return closed;
98+
} finally {
99+
lock.unlock();
100+
}
101+
}
102+
103+
public V poll() {
104+
lock.lock();
105+
try {
106+
if (closed) {
107+
return null;
108+
}
109+
110+
if (queue.isEmpty()) {
111+
if (supplierStatus.isDone()) {
112+
return null;
113+
}
114+
115+
try {
116+
if (!newElement.await(calculateTimeout(), TimeUnit.NANOSECONDS)) {
117+
log.warn("Supplier thread was closed because consumer didn't poll an element of stream on timeout");
118+
throw new DeadlineExceededException("Stream deadline exceeded on poll");
119+
}
120+
} catch (InterruptedException e) {
121+
Thread.currentThread().interrupt();
122+
throw new QueryInterruptedException("Consumer thread interrupted", e);
123+
}
124+
125+
if (closed || supplierStatus.isDone()) {
126+
return null;
127+
}
128+
}
129+
130+
V value = queue.pop();
131+
132+
queueIsNotFull.signal();
133+
134+
return value;
135+
} finally {
136+
lock.unlock();
137+
}
138+
}
139+
140+
public void close() {
141+
lock.lock();
142+
try {
143+
if (closed) {
144+
return;
145+
}
146+
147+
closed = true;
148+
149+
queueIsNotFull.signal();
150+
newElement.signalAll();
151+
} finally {
152+
lock.unlock();
153+
}
154+
}
155+
156+
private long calculateTimeout() {
157+
return TimeUnit.NANOSECONDS.toNanos(streamWorkDeadlineNanos - System.nanoTime());
158+
}
159+
160+
public static final class OfferDeadlineExceededException extends RuntimeException {
161+
}
162+
163+
// copy-paste from com.google.common.util.concurrent.Uninterruptibles
164+
private static long saturatedToNanos(Duration duration) {
165+
try {
166+
return duration.toNanos();
167+
} catch (ArithmeticException ignore) {
168+
return duration.isNegative() ? -9223372036854775808L : 9223372036854775807L;
169+
}
170+
}
171+
172+
@FunctionalInterface
173+
public interface SupplierStatus {
174+
boolean isDone();
175+
}
176+
}

0 commit comments

Comments
 (0)