-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
As the code shown, even if we disable txn, we use topic.getMaxReadPosition() which is topic.lastConfirmedEntry as OpReadEntry's maxPosition.
However, exist the situation:
- create OpReadEntry, using current topic's lastConfirmedEntry
- cursor hasNoMoreEntry, addWaitingCursor. opReadEntry become waiting.
- topic add new entry, trigger notifyEntriesAvailable() to notify opReadEntry. topic's lastConfirmedEntry move forward.
- Since opReadEntry.maxPosition is the previous lastConfirmedEntry, and now the lastConfirmedEntry has been changed. This op would enter opReadEntry.checkReadCompletion and trigger callback directly without reading any entry.
- Therefore, it would cause one more time to execute readEntry request.
So this is where we can improve, actually we may be able to execute the opReadEntry directly.
By the way, in version-2.9, if we disable txn, topic.getMaxReadPosition() is Position.latest, so opReadEntry.maxPosition is Position.latest too. And it would not have this issue since op.maxPosition is always > op.readPosition.
Lines 109 to 113 in c160cc9
| @Override | |
| public Position getMaxReadPosition() { | |
| return topic.getLastPosition(); | |
| } | |
Lines 368 to 381 in c160cc9
| havePendingRead = true; | |
| if (consumer.readCompacted()) { | |
| boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()) | |
| && (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION) | |
| || hasValidMarkDeletePosition(cursor)); | |
| TopicCompactionService topicCompactionService = topic.getTopicCompactionService(); | |
| CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead, | |
| bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer); | |
| } else { | |
| ReadEntriesCtx readEntriesCtx = | |
| ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); | |
| cursor.asyncReadEntriesOrWait(messagesToRead, | |
| bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition()); | |
| } |
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Lines 934 to 979 in 5dc0304
| public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, | |
| Object ctx, Position maxPosition, | |
| Predicate<Position> skipCondition) { | |
| checkArgument(maxEntries > 0); | |
| if (isClosed()) { | |
| callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx); | |
| return; | |
| } | |
| int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); | |
| if (hasMoreEntries()) { | |
| // If we have available entries, we can read them immediately | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name); | |
| } | |
| asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, | |
| maxPosition, skipCondition); | |
| } else { | |
| // Skip deleted entries. | |
| skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); | |
| OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, | |
| ctx, maxPosition, skipCondition); | |
| if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { | |
| op.recycle(); | |
| callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); | |
| return; | |
| } | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition); | |
| } | |
| // Check again for new entries after the configured time, then if still no entries are available register | |
| // to be notified | |
| if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { | |
| ledger.getScheduledExecutor() | |
| .schedule(() -> checkForNewEntries(op, callback, ctx), | |
| getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); | |
| } else { | |
| // If there's no delay, check directly from the same thread | |
| checkForNewEntries(op, callback, ctx); | |
| } | |
| } | |
| } |
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Lines 2051 to 2056 in 5dc0304
| private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { | |
| if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { | |
| opReadEntry.checkReadCompletion(); | |
| return; | |
| } |
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
Lines 164 to 186 in 5dc0304
| void checkReadCompletion() { | |
| // op readPosition is smaller or equals maxPosition then can read again | |
| if (entries.size() < count && cursor.hasMoreEntries() | |
| && maxPosition.compareTo(readPosition) > 0) { | |
| // We still have more entries to read from the next ledger, schedule a new async operation | |
| cursor.ledger.getExecutor().execute(() -> { | |
| readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); | |
| cursor.ledger.asyncReadEntries(OpReadEntry.this); | |
| }); | |
| } else { | |
| // The reading was already completed, release resources and trigger callback | |
| try { | |
| cursor.readOperationCompleted(); | |
| } finally { | |
| cursor.ledger.getExecutor().execute(() -> { | |
| callback.readEntriesComplete(entries, ctx); | |
| recycle(); | |
| }); | |
| } | |
| } | |
| } |
Solution
Update the opReadEntry.maxPosition when trigger notifyEntriesAvailable()
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!