Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.baidu.bifromq.baseenv.EnvProvider;
import com.baidu.bifromq.deliverer.MessageDeliverer;
import com.baidu.bifromq.retain.RPCBluePrint;
import com.baidu.bifromq.retain.server.scheduler.DeleteCallScheduler;
import com.baidu.bifromq.retain.server.scheduler.MatchCallScheduler;
import com.baidu.bifromq.retain.server.scheduler.RetainCallScheduler;
import com.baidu.bifromq.retain.store.gc.RetainStoreGCProcessor;
Expand All @@ -39,7 +40,8 @@ class RetainServer implements IRetainServer {
new RetainStoreGCProcessor(builder.retainStoreClient, null),
new MessageDeliverer(builder.subBrokerManager),
new MatchCallScheduler(builder.retainStoreClient, builder.settingProvider),
new RetainCallScheduler(builder.retainStoreClient));
new RetainCallScheduler(builder.retainStoreClient),
new DeleteCallScheduler(builder.retainStoreClient));
if (builder.workerThreads == 0) {
rpcExecutor = MoreExecutors.newDirectExecutorService();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import com.baidu.bifromq.type.TopicMessagePack;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -47,35 +51,45 @@ public class RetainService extends RetainServiceGrpc.RetainServiceImplBase {
private final IMessageDeliverer messageDeliverer;
private final IMatchCallScheduler matchCallScheduler;
private final IRetainCallScheduler retainCallScheduler;
private final IRetainCallScheduler deleteCallScheduler;

RetainService(IRetainStoreGCProcessor gcProcessor,
IMessageDeliverer messageDeliverer,
IMatchCallScheduler matchCallScheduler,
IRetainCallScheduler retainCallScheduler) {
IRetainCallScheduler retainCallScheduler,
IRetainCallScheduler deleteCallScheduler) {
this.gcProcessor = gcProcessor;
this.messageDeliverer = messageDeliverer;
this.matchCallScheduler = matchCallScheduler;
this.retainCallScheduler = retainCallScheduler;
this.deleteCallScheduler = deleteCallScheduler;
}

@Override
public void retain(RetainRequest request, StreamObserver<RetainReply> responseObserver) {
log.trace("Handling retain request:\n{}", request);
response((tenantId, metadata) -> retainCallScheduler.schedule(request)
.exceptionally(e -> {
response((tenantId, metadata) -> {
CompletionStage<RetainReply> completionStage;
if (request.getMessage().getPayload().isEmpty()) {
completionStage = deleteCallScheduler.schedule(request);
}else {
completionStage = retainCallScheduler.schedule(request);
}
return completionStage.exceptionally(e -> {
if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) {
return RetainReply.newBuilder()
.setReqId(request.getReqId())
.setResult(RetainReply.Result.BACK_PRESSURE_REJECTED)
.build();
.setReqId(request.getReqId())
.setResult(RetainReply.Result.BACK_PRESSURE_REJECTED)
.build();
}
log.debug("Retain failed", e);
return RetainReply.newBuilder()
.setReqId(request.getReqId())
.setResult(RetainReply.Result.ERROR)
.build();
.setReqId(request.getReqId())
.setResult(RetainReply.Result.ERROR)
.build();

}), responseObserver);
});
}, responseObserver);
}

@Override
Expand Down Expand Up @@ -167,5 +181,7 @@ public void close() {
matchCallScheduler.close();
log.debug("Stop retain call scheduler");
retainCallScheduler.close();
log.debug("Stop delete call scheduler");
deleteCallScheduler.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
public class BatchRetainCall extends BatchMutationCall<RetainRequest, RetainReply> {

protected BatchRetainCall(KVRangeId rangeId,
IBaseKVStoreClient distWorkerClient,
IBaseKVStoreClient retainStoreClient,
Duration pipelineExpiryTime) {
super(rangeId, distWorkerClient, pipelineExpiryTime);
super(rangeId, retainStoreClient, pipelineExpiryTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.baidu.bifromq.retain.server.scheduler;

import com.baidu.bifromq.basekv.client.IBaseKVStoreClient;
import com.baidu.bifromq.basekv.client.scheduler.MutationCallBatcher;
import com.baidu.bifromq.basekv.client.scheduler.MutationCallBatcherKey;
import com.baidu.bifromq.basekv.client.scheduler.MutationCallScheduler;
import com.baidu.bifromq.basescheduler.Batcher;
import com.baidu.bifromq.basescheduler.IBatchCall;
import com.baidu.bifromq.retain.rpc.proto.RetainReply;
import com.baidu.bifromq.retain.rpc.proto.RetainRequest;
import com.baidu.bifromq.sysprops.props.DataPlaneBurstLatencyMillis;
import com.baidu.bifromq.sysprops.props.DataPlaneTolerableLatencyMillis;
import com.google.protobuf.ByteString;

import java.time.Duration;

import static com.baidu.bifromq.retain.utils.KeyUtil.retainKey;

public class DeleteCallScheduler extends MutationCallScheduler<RetainRequest, RetainReply>
implements IRetainCallScheduler {
private final IBaseKVStoreClient retainStoreClient;

public DeleteCallScheduler(IBaseKVStoreClient retainStoreClient) {
super("retain_server_delete_batcher", retainStoreClient,
Duration.ofMillis(DataPlaneTolerableLatencyMillis.INSTANCE.get()),
Duration.ofMillis(DataPlaneBurstLatencyMillis.INSTANCE.get()));
this.retainStoreClient = retainStoreClient;
}

@Override
protected ByteString rangeKey(RetainRequest request) {
return retainKey(request.getPublisher().getTenantId(), request.getTopic());
}

@Override
protected Batcher<RetainRequest, RetainReply, MutationCallBatcherKey> newBatcher(String name,
long tolerableLatencyNanos,
long burstLatencyNanos,
MutationCallBatcherKey batchKey) {
return new DeleteCallBatcher(batchKey, name, tolerableLatencyNanos, burstLatencyNanos, retainStoreClient);
}

private static class DeleteCallBatcher extends MutationCallBatcher<RetainRequest, RetainReply> {

protected DeleteCallBatcher(MutationCallBatcherKey batchKey,
String name,
long tolerableLatencyNanos,
long burstLatencyNanos,
IBaseKVStoreClient retainStoreClient) {
super(name, tolerableLatencyNanos, burstLatencyNanos, batchKey, retainStoreClient);
}

@Override
protected IBatchCall<RetainRequest, RetainReply, MutationCallBatcherKey> newBatch() {
return new BatchRetainCall(batcherKey.id, storeClient, Duration.ofMinutes(5));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.baidu.bifromq.type.MatchInfo;
import com.baidu.bifromq.type.Message;
import com.baidu.bifromq.type.TopicMessage;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -69,6 +70,8 @@ public class RetainServiceTest {
@Mock
private IRetainCallScheduler retainCallScheduler;
@Mock
private IRetainCallScheduler deleteCallScheduler;
@Mock
StreamObserver<RetainReply> retainResponseObserver;
@Mock
StreamObserver<MatchReply> matchResponseObserver;
Expand Down Expand Up @@ -103,7 +106,8 @@ public void recordSummary(RPCMetric metric, int depth) {
.withValue(RPCContext.TENANT_ID_CTX_KEY, tenantId)
.attach();
closeable = MockitoAnnotations.openMocks(this);
service = new RetainService(gcProcessor, messageDeliverer, matchCallScheduler, retainCallScheduler);
service = new RetainService(gcProcessor, messageDeliverer,
matchCallScheduler, retainCallScheduler, deleteCallScheduler);
}

@AfterMethod
Expand All @@ -112,12 +116,25 @@ public void tearDown(Method method) throws Exception {
closeable.close();
}

@Test
public void testDeleteWithException() {
when(deleteCallScheduler.schedule(any())).thenReturn(
CompletableFuture.failedFuture(new RuntimeException("Mocked")));
long reqId = 1;
service.retain(RetainRequest.newBuilder().setReqId(reqId).build(), retainResponseObserver);
verify(retainResponseObserver)
.onNext(argThat(r -> r.getReqId() == reqId && r.getResult() == RetainReply.Result.ERROR));
}

@Test
public void testPutRetainWithException() {
when(retainCallScheduler.schedule(any())).thenReturn(
CompletableFuture.failedFuture(new RuntimeException("Mocked")));
long reqId = 1;
service.retain(RetainRequest.newBuilder().setReqId(reqId).build(), retainResponseObserver);
service.retain(RetainRequest.newBuilder().setReqId(reqId)
.setMessage(Message.newBuilder().setPayload(ByteString.copyFromUtf8("mock"))
.build())
.build(), retainResponseObserver);
verify(retainResponseObserver)
.onNext(argThat(r -> r.getReqId() == reqId && r.getResult() == RetainReply.Result.ERROR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Supplier<RWCoProcOutput> mutate(RWCoProcInput input, IKVReader reader, IK
switch (coProcInput.getTypeCase()) {
case BATCHRETAIN -> {
BatchRetainReply.Builder replyBuilder = BatchRetainReply.newBuilder();
afterMutate.set(batchRetain(coProcInput.getBatchRetain(), replyBuilder, reader, writer));
afterMutate.set(batchRetain(coProcInput.getBatchRetain(), replyBuilder, writer));
outputBuilder.setBatchRetain(replyBuilder);
}
case GC -> {
Expand Down Expand Up @@ -186,7 +186,6 @@ private List<TopicMessage> match(String tenantId,

private Runnable batchRetain(BatchRetainRequest request,
BatchRetainReply.Builder replyBuilder,
IKVReader reader,
IKVWriter writer) {
replyBuilder.setReqId(request.getReqId());
Map<String, Map<String, Message>> addTopics = new HashMap<>();
Expand Down
Loading