Skip to content

Commit de31f59

Browse files
committed
1.2.2 snapshot
fix cached uid block error!
1 parent 7852a05 commit de31f59

File tree

18 files changed

+82
-65
lines changed

18 files changed

+82
-65
lines changed

.idea/misc.xml

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>example</artifactId>
@@ -82,7 +82,7 @@
8282
<dependency>
8383
<groupId>io.github.cooperlyt</groupId>
8484
<artifactId>uid-reactive-generator-db-spring-boot-starter</artifactId>
85-
<version>1.2.1</version>
85+
<version>1.2.2-snapshot</version>
8686
</dependency>
8787

8888
<!-- jdbc -->

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
<packaging>pom</packaging>
1111

1212
<name>uid-reactive-generator-spring</name>
@@ -65,7 +65,7 @@
6565
<dependency>
6666
<groupId>org.springframework.boot</groupId>
6767
<artifactId>spring-boot-dependencies</artifactId>
68-
<version>3.0.3</version>
68+
<version>3.1.4</version>
6969
<type>pom</type>
7070
<scope>import</scope>
7171
</dependency>

uid-reactive-generator-api/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>uid-reactive-generator-api</artifactId>

uid-reactive-generator-db-spring-boot-starter/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>uid-reactive-generator-db-spring-boot-starter</artifactId>
@@ -15,7 +15,7 @@
1515
<dependency>
1616
<groupId>io.github.cooperlyt</groupId>
1717
<artifactId>uid-reactive-generator-spring-boot-starter</artifactId>
18-
<version>1.2.1</version>
18+
<version>1.2.2-snapshot</version>
1919
</dependency>
2020

2121
<dependency>

uid-reactive-generator-db-spring-boot-starter/src/main/java/io/github/cooperlyt/cloud/uid/worker/jpa/R2DBCWorkerIdAssigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public R2DBCWorkerIdAssigner(WorkerNodeIdent workerNodeIdent,
2424

2525
@Transactional(rollbackFor = Exception.class)
2626
@Override
27-
protected Mono<Long> assignWorkerId(WorkerNodeIdent workerNodeIdent) {
27+
public Mono<Long> assignWorkerId(WorkerNodeIdent workerNodeIdent) {
2828
return workerNodeRepository
2929
.getWorkerNodeByHostAndPort(workerNodeIdent.getHost(),workerNodeIdent.getPort())
3030
.map(Optional::of)

uid-reactive-generator-spring-boot-starter/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>uid-reactive-generator-spring-boot-starter</artifactId>
@@ -21,7 +21,7 @@
2121
<dependency>
2222
<groupId>io.github.cooperlyt</groupId>
2323
<artifactId>uid-reactive-generator</artifactId>
24-
<version>1.2.1</version>
24+
<version>1.2.2-snapshot</version>
2525
</dependency>
2626

2727

uid-reactive-generator-spring-cloud-starter-discovery/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>uid-reactive-generator-spring-cloud-starter-discovery</artifactId>
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>io.github.cooperlyt</groupId>
1818
<artifactId>uid-reactive-generator-spring-boot-starter</artifactId>
19-
<version>1.2.1</version>
19+
<version>1.2.2-snapshot</version>
2020
</dependency>
2121

2222
<dependency>

uid-reactive-generator-spring-cloud-starter-discovery/src/main/java/io/github/cooperlyt/cloud/uid/worker/DiscoveryWorkerIdAssigner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void onRegisteredEvent(InstanceRegisteredEvent<?> event){
5151
sink.tryEmitValue(event).orThrow();
5252
}
5353

54+
5455
@EventListener
5556
public void onPreRegisteredEvent(InstancePreRegisteredEvent event) {
5657

@@ -59,6 +60,7 @@ public void onPreRegisteredEvent(InstancePreRegisteredEvent event) {
5960
prepareWorkerId = discoveryClientAdapter.getInstances(serverId)
6061
.filter(instance -> !instance.getInstanceId().equals(instanceId))
6162
.map(this::getOrder)
63+
6264
.reduce(0L,(max,order) -> (order > max) ? order : max)
6365
.blockOptional().orElse(0L) + 1L;
6466

uid-reactive-generator-spring-cloud-starter-discovery/src/main/java/io/github/cooperlyt/cloud/uid/worker/WorkerIdAssignerAutoConfigure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.github.cooperlyt.cloud.uid.worker.client.DiscoveryClientAdapter;
44
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
56
import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;
67
import org.springframework.context.annotation.Bean;
78
import org.springframework.context.annotation.Configuration;
@@ -14,6 +15,7 @@ public class WorkerIdAssignerAutoConfigure {
1415
@Bean
1516
@Lazy
1617
@ConditionalOnMissingBean
18+
// @ConditionalOnProperty(prefix = "spring.cloud.service-registry.auto-registration", name = "enabled")
1719
public WorkerIdAssigner workerIdAssigner(DiscoveryClientAdapter discoveryClientAdapter){
1820
return new DiscoveryWorkerIdAssigner(discoveryClientAdapter);
1921
}

uid-reactive-generator-spring-cloud-starter-discovery/src/main/java/io/github/cooperlyt/cloud/uid/worker/client/BlockingDiscoveryClientAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package io.github.cooperlyt.cloud.uid.worker.client;
22

3+
import lombok.extern.slf4j.Slf4j;
34
import org.springframework.cloud.client.ServiceInstance;
45
import org.springframework.cloud.client.discovery.DiscoveryClient;
56
import reactor.core.publisher.Flux;
67

8+
@Slf4j
79
public class BlockingDiscoveryClientAdapter implements DiscoveryClientAdapter {
810

911
private final DiscoveryClient discoveryClient;
1012

1113
public BlockingDiscoveryClientAdapter(DiscoveryClient discoveryClient) {
1214
this.discoveryClient = discoveryClient;
15+
log.info("init by DiscoveryClient");
1316
}
1417

1518
@Override

uid-reactive-generator-spring-cloud-starter-discovery/src/main/java/io/github/cooperlyt/cloud/uid/worker/client/ReactiveDiscoveryClientAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package io.github.cooperlyt.cloud.uid.worker.client;
22

3+
import lombok.extern.slf4j.Slf4j;
34
import org.springframework.cloud.client.ServiceInstance;
45
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
56
import reactor.core.publisher.Flux;
67

8+
@Slf4j
79
public class ReactiveDiscoveryClientAdapter implements DiscoveryClientAdapter {
810

911
private final ReactiveDiscoveryClient discoveryClient;
1012

1113
public ReactiveDiscoveryClientAdapter(ReactiveDiscoveryClient discoveryClient) {
1214
this.discoveryClient = discoveryClient;
15+
log.info("init by ReactiveDiscoveryClient");
1316
}
1417

1518
@Override

uid-reactive-generator/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>io.github.cooperlyt</groupId>
88
<artifactId>uid-reactive-generator-spring</artifactId>
9-
<version>1.2.1</version>
9+
<version>1.2.2-snapshot</version>
1010
</parent>
1111

1212
<artifactId>uid-reactive-generator</artifactId>
@@ -26,7 +26,7 @@
2626
<dependency>
2727
<groupId>io.github.cooperlyt</groupId>
2828
<artifactId>uid-reactive-generator-api</artifactId>
29-
<version>1.2.1</version>
29+
<version>1.2.2-snapshot</version>
3030
</dependency>
3131
<dependency>
3232
<groupId>io.projectreactor</groupId>

uid-reactive-generator/src/main/java/io/github/cooperlyt/cloud/uid/buffer/BufferPaddingExecutor.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.github.cooperlyt.cloud.uid.utils.NamingThreadFactory;
1919
import io.github.cooperlyt.cloud.uid.utils.PaddedAtomicLong;
20+
import lombok.Getter;
2021
import lombok.extern.slf4j.Slf4j;
2122
import reactor.core.publisher.Mono;
2223

@@ -93,13 +94,16 @@ public BufferPaddingExecutor(RingBuffer ringBuffer, BufferedUidProvider uidProvi
9394
}
9495
}
9596

97+
private long workerId;
9698
/**
9799
* Start executors such as schedule
98100
*/
99-
public void start() {
101+
public void start(long workerId) {
102+
this.workerId = workerId;
100103
if (bufferPadSchedule != null) {
101104
bufferPadSchedule.scheduleWithFixedDelay(this::asyncPadding, scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
102105
}
106+
paddingFuture = CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors);
103107
}
104108

105109
/**
@@ -109,7 +113,6 @@ public void shutdown() {
109113
if (!bufferPadExecutors.isShutdown()) {
110114
bufferPadExecutors.shutdownNow();
111115
}
112-
113116
if (bufferPadSchedule != null && !bufferPadSchedule.isShutdown()) {
114117
bufferPadSchedule.shutdownNow();
115118
}
@@ -125,26 +128,38 @@ public boolean isRunning() {
125128
}
126129

127130

128-
private Mono<Void> paddingFuture = Mono.empty();
131+
//@Getter
132+
//private Mono<Void> paddingFuture = Mono.empty();
133+
134+
private CompletableFuture<Void> paddingFuture = null;
129135

130-
public Mono<Void> getPaddingFuture() {
131-
return paddingFuture;
136+
public Mono<Void> requestPadding(long workerId){
137+
this.workerId = workerId;
138+
if (paddingFuture != null && isRunning() && !paddingFuture.isDone()){
139+
return Mono.fromFuture(paddingFuture);
140+
}else
141+
return Mono.fromFuture(CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors));
132142
}
133143

144+
public void asyncPadding(long workerId) {
145+
this.workerId = workerId;
146+
asyncPadding();
147+
}
134148
/**
135149
* Padding buffer in the thread pool
136150
*/
137-
public void asyncPadding() {
138-
// Mono.fromFuture(CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors)).
139-
paddingFuture = Mono.fromFuture(CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors)).cache();
151+
private void asyncPadding() {
152+
// paddingFuture = Mono.fromFuture(CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors)).cache();
140153
// t.isDone()
141-
// bufferPadExecutors.submit(this::paddingBuffer).;
154+
paddingFuture = CompletableFuture.runAsync(this::paddingBuffer,bufferPadExecutors);
155+
//bufferPadExecutors.submit(this::paddingBuffer);
156+
142157
}
143158

144159
/**
145160
* Padding buffer fill the slots until to catch the cursor
146161
*/
147-
public void paddingBuffer() {
162+
private void paddingBuffer() {
148163

149164
log.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
150165

@@ -162,7 +177,7 @@ public void paddingBuffer() {
162177
if (providerTime > currentTime){
163178
timeIsFutureHandler.timeIsFuture(providerTime,currentTime);
164179
}
165-
List<Long> uidList = uidProvider.provide(providerTime);
180+
List<Long> uidList = uidProvider.provide(workerId,providerTime);
166181
for (Long uid : uidList) {
167182
isFullRingBuffer = !ringBuffer.put(uid);
168183
if (isFullRingBuffer) {

uid-reactive-generator/src/main/java/io/github/cooperlyt/cloud/uid/buffer/BufferedUidProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.github.cooperlyt.cloud.uid.buffer;
1717

18+
import reactor.core.publisher.Mono;
19+
1820
import java.util.List;
1921

2022
/**
@@ -31,5 +33,5 @@ public interface BufferedUidProvider {
3133
* @param momentInSecond
3234
* @return
3335
*/
34-
List<Long> provide(long momentInSecond);
36+
List<Long> provide(long workerId, long momentInSecond);
3537
}

uid-reactive-generator/src/main/java/io/github/cooperlyt/cloud/uid/buffer/RingBuffer.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.github.cooperlyt.cloud.uid.buffer;
1717

1818
import io.github.cooperlyt.cloud.uid.utils.PaddedAtomicLong;
19+
import lombok.Getter;
1920
import lombok.extern.slf4j.Slf4j;
2021
import reactor.core.publisher.Mono;
2122

@@ -44,6 +45,7 @@ public class RingBuffer {
4445
public static final int DEFAULT_PADDING_PERCENT = 50;
4546

4647
/** The size of RingBuffer's slots, each slot hold a UID */
48+
@Getter
4749
private final int bufferSize;
4850
private final long indexMask;
4951
private final long[] slots;
@@ -147,7 +149,7 @@ public synchronized boolean put(long uid) {
147149
* @return UID
148150
* @throws IllegalStateException if the cursor moved back
149151
*/
150-
public Mono<Long> take() {
152+
public Mono<Long> take(long workerId) {
151153
// spin get next available cursor
152154
long currentCursor = cursor.get();
153155
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
@@ -161,13 +163,13 @@ public Mono<Long> take() {
161163

162164
log.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
163165
nextCursor, currentTail - nextCursor);
164-
bufferPaddingExecutor.asyncPadding();
166+
bufferPaddingExecutor.asyncPadding(workerId);
165167
}
166168

167169
// cursor catch the tail, means that there is no more available UID to take
168170
if (nextCursor == currentCursor) {
169171
log.info("Buffer empty waiting put");
170-
return bufferPaddingExecutor.getPaddingFuture().map(v -> -1L).switchIfEmpty(take());
172+
return bufferPaddingExecutor.requestPadding(workerId).then(take(workerId));
171173
//rejectedTakeHandler.rejectTakeBuffer(this);
172174
}
173175

@@ -225,10 +227,6 @@ public long getCursor() {
225227
return cursor.get();
226228
}
227229

228-
public int getBufferSize() {
229-
return bufferSize;
230-
}
231-
232230
/**
233231
* Setters
234232
*/
@@ -242,13 +240,10 @@ public void setRejectedPutHandler(RejectedPutBufferHandler rejectedPutHandler) {
242240

243241
@Override
244242
public String toString() {
245-
StringBuilder builder = new StringBuilder();
246-
builder.append("RingBuffer [bufferSize=").append(bufferSize)
247-
.append(", tail=").append(tail)
248-
.append(", cursor=").append(cursor)
249-
.append(", paddingThreshold=").append(paddingThreshold).append("]");
250-
251-
return builder.toString();
243+
return "RingBuffer [bufferSize=" + bufferSize +
244+
", tail=" + tail +
245+
", cursor=" + cursor +
246+
", paddingThreshold=" + paddingThreshold + "]";
252247
}
253248

254249
}

0 commit comments

Comments
 (0)