Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 77 additions & 42 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import scala.Option;
Expand All @@ -53,9 +54,8 @@ public class DelayedShareFetch extends DelayedOperation {

private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;

private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete;
private final SharePartitionManager sharePartitionManager;
private Map<TopicIdPartition, FetchPartitionData> replicaManagerFetchDataFromTryComplete;

DelayedShareFetch(
ShareFetchData shareFetchData,
Expand All @@ -64,8 +64,8 @@ public class DelayedShareFetch extends DelayedOperation {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
this.sharePartitionManager = sharePartitionManager;
this.replicaManagerFetchDataFromTryComplete = new HashMap<>();
}

@Override
Expand All @@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (topicPartitionDataFromTryComplete.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
topicPartitionData = topicPartitionDataFromTryComplete;

if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchData.future().complete(Collections.emptyMap());
return;
Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this extra variable or can just write later, if needed? And then no need of else block below.

replicaManagerFetchDataFromTryComplete = replicaManagerFetchData(topicPartitionData, true);

// tryComplete did not invoke forceComplete, so we need to get replica manager response data.
if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchData.future().complete(Collections.emptyMap());
return;
}
fetchResponseData = replicaManagerFetchData(topicPartitionData, true);
} else {
// tryComplete invoked forceComplete, so we can use the replica manager response data from tryComplete.
fetchResponseData = replicaManagerFetchDataFromTryComplete;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());

try {
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchData.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList())
),
QuotaFactory.UnboundedQuota$.MODULE$,
true);

Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
responseLogResult.foreach(tpLogResult -> {
TopicIdPartition topicIdPartition = tpLogResult._1();
LogReadResult logResult = tpLogResult._2();
FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false);
responseData.put(topicIdPartition, fetchPartitionData);
return BoxedUnit.UNIT;
});

log.trace("Data successfully retrieved by replica manager: {}", responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result =
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, fetchResponseData, sharePartitionManager, replicaManager);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchResponseData can still be empty, though processFetchResponse handles the empty check, is it intended? Though no harm, just checking with you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, since processFetchResponse can handle it, that's why I didn't add any check here

shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
releasePartitionLocks(shareFetchData.groupId(), fetchResponseData.keySet());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if partitions were locked but no response in data aarived then will the lock be correctly released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for that case, even when the data is not received from replica manager, the fetchResponseData should still have keys as the locked topic partitions and values as empty data, so it should work. Am I wrong in that understanding?

// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addToActionQueue(() -> {
topicPartitionData.keySet().forEach(topicIdPartition ->
fetchResponseData.keySet().forEach(topicIdPartition ->
replicaManager.completeDelayedShareFetchRequest(
new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())));
return BoxedUnit.UNIT;
Expand All @@ -153,10 +133,13 @@ public boolean tryComplete() {
shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());

topicPartitionDataFromTryComplete = acquirablePartitions();
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();

if (!topicPartitionDataFromTryComplete.isEmpty())
return forceComplete();
if (!topicPartitionData.isEmpty()) {
replicaManagerFetchDataFromTryComplete = replicaManagerFetchData(topicPartitionData, false);
if (!replicaManagerFetchDataFromTryComplete.isEmpty())
return forceComplete();
}
log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can come to this code path getting no result from replicaManagerFetchData. Hence is the log line still correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right... I'll change it to log.info("Fetch cannot be completed for the partitions in the share fetch request for group {}, member {}, " + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet())

"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
Expand Down Expand Up @@ -204,6 +187,58 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
return topicPartitionData;
}

/**
* Prepare partitions fetch data structure for acquirable partitions in the share fetch request satisfying minBytes criteria.
*/
Map<TopicIdPartition, FetchPartitionData> replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
boolean hasRequestTimedOut) {
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", topicPartitionData,
shareFetchData.groupId(), shareFetchData.fetchParams());
Map<TopicIdPartition, FetchPartitionData> replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most scenarios the request might have minBytes, hence do you always want to initialize a hash map? Mostly it will be overriden with responseData map. So can't it be null? Moreover can't it be simpy a boolean variable i.e.

boolean minBytesSatisfied = false

if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
replicaManagerFetchSatisfyingMinBytes = responseData;
=>
if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
minBytesSatisfied = true;

if (replicaManagerFetchSatisfyingMinBytes.isEmpty() && !hasRequestTimedOut) {
=>
if (!minBytesSatisfied && !hasRequestTimedOut) {

return replicaManagerFetchSatisfyingMinBytes;
=>
return Collections.emptyMap()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it makes sense. I'll make the change.

Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
try {
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchData.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList())
),
QuotaFactory.UnboundedQuota$.MODULE$,
true);

AtomicInteger accumulatedBytes = new AtomicInteger(0);

responseLogResult.foreach(tpLogResult -> {
TopicIdPartition topicIdPartition = tpLogResult._1();
LogReadResult logResult = tpLogResult._2();
FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false);
responseData.put(topicIdPartition, fetchPartitionData);
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my understanding: Do we need to this calculation? As I can see fetch params already has minBytes in request to replica manager hence isn't the response from replica manager should be empty if minBytes criteria is not satisfied?

So the question arise that how do we differentiate between empty reponse from replica manager log read, if that's beacus of min bytes or there is no data in the log? In either case we should continue holding the request in purgatory? Wdyt?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10 , IIUC, minBytes is utilized in replicaManager.fetchMessages functionality here not in replicaManager.readFromLog. The way it calculates the accumulatedBytes is the same way I have done it in my code (original code reference). I don't see the usage of params.minBytes in readFromLog functionality

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. I also see only reference of param minBytes in fetchMessages and not in readFromLog. Also the readFromLog says upto maximum in description and nothing about minBytes.

Then the PR change sounds good but I was wondering why do we accept complete fetchParams in readFromLog when we don't utilize something like minBytes there. Not sure if we should have minBytes support in readFromLog itself. Maybe out of scope of this PR.

@junrao can help is with more context.

return BoxedUnit.UNIT;
});
log.trace("Data successfully retrieved by replica manager: {}", responseData);

if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
replicaManagerFetchSatisfyingMinBytes = responseData;
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
} finally {
// The case when we cannot satisfy the share fetch requests because the response has lesser data than minBytes
// and the call is coming from tryComplete, hence we want to release partitions lock so that the next
// tryComplete/onComplete call complete successfully.
if (replicaManagerFetchSatisfyingMinBytes.isEmpty() && !hasRequestTimedOut) {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
}
Comment on lines +229 to +232
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why don't we want to release the partition locks from onComplete?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, we want to call ShareFetchUtils.processFetchResponse(shareFetchData, fetchResponseData, sharePartitionManager, replicaManager) before we want to release the locks. That part is in onComplete, hence we don't release the lock

}
// This call is coming from onComplete, hence we return the response data irrespective of whether minBytes is
// satisfied or not.
if (hasRequestTimedOut)
return responseData;
Comment on lines +234 to +237
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, is this same for regular fetch operations as well?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think again, I should return a map with key as topic partition and value as fetchPartitionData object containing 0 records and since we have not been able to satisfy all the fetch request criterias. @junrao your thoughts?

// Return either empty map if replica manager fetch does not satisfy minBytes OR fetch response map if it satisfies the minBytes criteria.
return replicaManagerFetchSatisfyingMinBytes;
}

private void releasePartitionLocks(String groupId, Set<TopicIdPartition> topicIdPartitions) {
topicIdPartitions.forEach(tp -> {
SharePartition sharePartition = sharePartitionManager.sharePartition(groupId, tp);
Expand Down
55 changes: 51 additions & 4 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void tearDown() throws Exception {
}

@Test
public void testDelayedShareFetchTryCompleteReturnsFalse() {
public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartitions() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
Expand Down Expand Up @@ -118,6 +118,51 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() {
assertFalse(delayedShareFetch.isCompleted());
}

@Test
public void testDelayedShareFetchTryCompleteReturnsFalseDueToMinBytes() {
String groupId = "grp";
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);

when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);

SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class);
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);

ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes);

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
// Replica Manager fetch response size -> 0 bytes (< minBytes)
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0), 0)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withSharePartitionManager(sharePartitionManager)
.withReplicaManager(replicaManager)
.build();
assertFalse(delayedShareFetch.isCompleted());

// Since sp1 can be acquired, tryComplete should return true.
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
}

@Test
public void testDelayedShareFetchTryCompleteReturnsTrue() {
String groupId = "grp";
Expand Down Expand Up @@ -148,7 +193,8 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
// Replica Manager fetch response size -> 1 bytes (>= minBytes)
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0), 1)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
Expand Down Expand Up @@ -235,7 +281,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), any())).thenReturn(
Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0), 0)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
Expand All @@ -250,6 +296,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
Mockito.verify(sp0, times(1)).nextFetchOffset();
Mockito.verify(sp1, times(0)).nextFetchOffset();
assertTrue(delayedShareFetch.isCompleted());
assertTrue(shareFetchData.future().isDone());
}

@Test
Expand Down Expand Up @@ -356,7 +403,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1), 0)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

Map<SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
Expand Down
Loading