Skip to content

MQTT5 client, unable to get data, seems like store gets corrupted #112

@saraheem

Description

@saraheem
  • Bifromq 3.3.4
  • Deployed in docker container with latest image.
  • Using Webhook for auth.
  • Here is the docker command to run the container:

docker run -d --name bifromq -p 1883:1883 -p 1884:1884 -v /home/root/bifromq/conf:/home/bifromq/conf -v /home/root/bifromq/data:/home/bifromq/data -v /home/root/bifromq/plugins:/home/bifromq/plugins -v /home/root/bifromq/logs:/home/bifromq/logs -e EXTRA_JVM_OPTS=-Dplugin.hdcauthprovider.url=https:/localhost/Auth -DMaxTopicFiltersPerSub=50 -DRetainMessageMatchLimit=1000 bifromq/bifromq:latest

Steps to reproduce:

  • After publishing some 5000+ retained messages into various topics using MQTT5 publish.
  • subscribing to specific topics does not give back retained messages.
  • Also subscribing to # does not give any message
  • KV store logs had no error except INFO logs
  • in the logs\error.log file I could see the following logs repeatedly:

2024-12-23 19:28:13.476 ERROR [mqtt-worker-elg-3] --- c.baidu.bifromq.basescheduler.Batcher [Batcher.java:142] Unexpected exception
java.lang.ClassCastException: class com.baidu.bifromq.basekv.proto.Boundary cannot be cast to class java.lang.Comparable (com.baidu.bifromq.basekv.proto.Boundary is in unnamed module of loader 'app'; java.lang.Comparable is in module java.base of loader 'bootstrap')
at java.base/java.util.TreeMap.getEntry(TreeMap.java:347)
at java.base/java.util.TreeMap.containsKey(TreeMap.java:233)
at java.base/java.util.Collections$UnmodifiableMap.containsKey(Collections.java:1500)
at com.baidu.bifromq.dist.server.scheduler.DistCallScheduler$DistWorkerCallBatcher$BatchDistCall.rangeLookup(DistCallScheduler.java:240)
at com.baidu.bifromq.dist.server.scheduler.DistCallScheduler$DistWorkerCallBatcher$BatchDistCall.execute(DistCallScheduler.java:150)
at com.baidu.bifromq.basescheduler.Batcher.batchAndEmit(Batcher.java:170)
at com.baidu.bifromq.basescheduler.Batcher.trigger(Batcher.java:139)
at com.baidu.bifromq.basescheduler.Batcher.submit(Batcher.java:111)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.lambda$schedule$2(BatchCallScheduler.java:119)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.schedule(BatchCallScheduler.java:113)
at com.baidu.bifromq.dist.server.DistResponsePipeline.handleRequest(DistResponsePipeline.java:66)
at com.baidu.bifromq.dist.server.DistResponsePipeline.handleRequest(DistResponsePipeline.java:42)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.startHandlingRequest(AbstractResponsePipeline.java:87)
at com.baidu.bifromq.baserpc.ResponsePipeline.onNext(ResponsePipeline.java:76)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:334)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:319)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at com.google.common.util.concurrent.DirectExecutorService.execute(DirectExecutorService.java:51)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:842)
at io.grpc.inprocess.InProcTransport$InProcessStream$InProcessClientStream.lambda$writeMessage$3(InProcTransport.java:803)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
at io.grpc.inprocess.InProcTransport$InProcessStream$InProcessClientStream.writeMessage(InProcTransport.java:808)
at io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
at io.grpc.internal.RetriableStream$1SendMessageEntry.runWith(RetriableStream.java:582)
at io.grpc.internal.RetriableStream.delayOrExecute(RetriableStream.java:559)
at io.grpc.internal.RetriableStream.sendMessage(RetriableStream.java:590)
at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:522)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:510)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:368)
at com.baidu.bifromq.baserpc.ManagedRequestPipeline.sendUntilStreamNotReadyOrNoTask(ManagedRequestPipeline.java:364)
at com.baidu.bifromq.baserpc.ManagedRequestPipeline.invoke(ManagedRequestPipeline.java:255)
at com.baidu.bifromq.dist.client.scheduler.DistServerCallScheduler$DistServerCallBatcher$DistServerBatchCall.execute(DistServerCallScheduler.java:100)
at com.baidu.bifromq.basescheduler.Batcher.batchAndEmit(Batcher.java:170)
at com.baidu.bifromq.basescheduler.Batcher.trigger(Batcher.java:139)
at com.baidu.bifromq.basescheduler.Batcher.submit(Batcher.java:111)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.lambda$schedule$2(BatchCallScheduler.java:119)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.basescheduler.BatchCallScheduler.schedule(BatchCallScheduler.java:113)
at com.baidu.bifromq.dist.client.DistClient.pub(DistClient.java:55)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.doPub(MQTTSessionHandler.java:1500)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.doPub(MQTTSessionHandler.java:1402)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.lambda$handleQoS1Pub$37(MQTTSessionHandler.java:1204)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.handleQoS1Pub(MQTTSessionHandler.java:1200)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.handlePubMsg(MQTTSessionHandler.java:459)
at com.baidu.bifromq.mqtt.handler.MQTTSessionHandler.channelRead(MQTTSessionHandler.java:414)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.baidu.bifromq.mqtt.handler.ConditionalSlowDownHandler.channelRead(ConditionalSlowDownHandler.java:78)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.baidu.bifromq.mqtt.handler.MQTTMessageDebounceHandler.channelRead(MQTTMessageDebounceHandler.java:61)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1347)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)
2024-12-23 19:28:14.131 ERROR [basekv-client-executor-1] --- c.b.b.r.s.scheduler.BatchRetainCall [BatchRetainCall.java:68] topic not found in result map, tenantId: global, topic: Tenant/1/Station/19/Unit/10078/Device/10522/Auth
2024-12-23 19:40:50.895 ERROR [basekv-server-executor] --- c.b.b.baserpc.AbstractResponsePipeline [AbstractResponsePipeline.java:93] Request handling with error in pipeline@556979744
java.util.concurrent.CancellationException: null
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478)
at com.baidu.bifromq.baserpc.utils.FutureTracker.stop(FutureTracker.java:38)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.cleanup(AbstractResponsePipeline.java:138)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.close(AbstractResponsePipeline.java:68)
at com.baidu.bifromq.baserpc.AbstractResponsePipeline.onError(AbstractResponsePipeline.java:42)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:288)
at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:375)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:364)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions