Skip to content

[improve][broker] cursor read entry would trigger readMoreEntry() one more time when addWaitingCursor and notify #23027

@TakaHiR07

Description

@TakaHiR07

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

As the code shown, even if we disable txn, we use topic.getMaxReadPosition() which is topic.lastConfirmedEntry as OpReadEntry's maxPosition.

However, exist the situation:

  1. create OpReadEntry, using current topic's lastConfirmedEntry
  2. cursor hasNoMoreEntry, addWaitingCursor. opReadEntry become waiting.
  3. topic add new entry, trigger notifyEntriesAvailable() to notify opReadEntry. topic's lastConfirmedEntry move forward.
  4. Since opReadEntry.maxPosition is the previous lastConfirmedEntry, and now the lastConfirmedEntry has been changed. This op would enter opReadEntry.checkReadCompletion and trigger callback directly without reading any entry.
  5. Therefore, it would cause one more time to execute readEntry request.

So this is where we can improve, actually we may be able to execute the opReadEntry directly.

By the way, in version-2.9, if we disable txn, topic.getMaxReadPosition() is Position.latest, so opReadEntry.maxPosition is Position.latest too. And it would not have this issue since op.maxPosition is always > op.readPosition.

@Override
public Position getMaxReadPosition() {
return topic.getLastPosition();
}

havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
|| hasValidMarkDeletePosition(cursor));
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
}

public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
if (hasMoreEntries()) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition);
}
// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
}
}
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
opReadEntry.checkReadCompletion();
return;
}

void checkReadCompletion() {
// op readPosition is smaller or equals maxPosition then can read again
if (entries.size() < count && cursor.hasMoreEntries()
&& maxPosition.compareTo(readPosition) > 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
});
} else {
// The reading was already completed, release resources and trigger callback
try {
cursor.readOperationCompleted();
} finally {
cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
});
}
}
}

Solution

Update the opReadEntry.maxPosition when trigger notifyEntriesAvailable()

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions