Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Oct 18, 2024

About

minBytes is a constraint that should be used to delay ShareFetch requests. Hence, I have added the support for minBytes in DelayedShareFetch class

Testing

The added code has been tested with the help of unit tests.

@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels Oct 18, 2024
@github-actions github-actions bot removed the small Small PRs label Oct 18, 2024
@adixitconfluent adixitconfluent marked this pull request as ready for review October 18, 2024 12:18
LogReadResult logResult = tpLogResult._2();
FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false);
responseData.put(topicIdPartition, fetchPartitionData);
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my understanding: Do we need to this calculation? As I can see fetch params already has minBytes in request to replica manager hence isn't the response from replica manager should be empty if minBytes criteria is not satisfied?

So the question arise that how do we differentiate between empty reponse from replica manager log read, if that's beacus of min bytes or there is no data in the log? In either case we should continue holding the request in purgatory? Wdyt?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @apoorvmittal10 , IIUC, minBytes is utilized in replicaManager.fetchMessages functionality here not in replicaManager.readFromLog. The way it calculates the accumulatedBytes is the same way I have done it in my code (original code reference). I don't see the usage of params.minBytes in readFromLog functionality

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. I also see only reference of param minBytes in fetchMessages and not in readFromLog. Also the readFromLog says upto maximum in description and nothing about minBytes.

Then the PR change sounds good but I was wondering why do we accept complete fetchParams in readFromLog when we don't utilize something like minBytes there. Not sure if we should have minBytes support in readFromLog itself. Maybe out of scope of this PR.

@junrao can help is with more context.

Comment on lines +234 to +237
// This call is coming from onComplete, hence we return the response data irrespective of whether minBytes is
// satisfied or not.
if (hasRequestTimedOut)
return responseData;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, is this same for regular fetch operations as well?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think again, I should return a map with key as topic partition and value as fetchPartitionData object containing 0 records and since we have not been able to satisfy all the fetch request criterias. @junrao your thoughts?

Comment on lines +229 to +232
if (replicaManagerFetchSatisfyingMinBytes.isEmpty() && !hasRequestTimedOut) {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why don't we want to release the partition locks from onComplete?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, we want to call ShareFetchUtils.processFetchResponse(shareFetchData, fetchResponseData, sharePartitionManager, replicaManager) before we want to release the locks. That part is in onComplete, hence we don't release the lock

boolean hasRequestTimedOut) {
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", topicPartitionData,
shareFetchData.groupId(), shareFetchData.fetchParams());
Map<TopicIdPartition, FetchPartitionData> replicaManagerFetchSatisfyingMinBytes = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most scenarios the request might have minBytes, hence do you always want to initialize a hash map? Mostly it will be overriden with responseData map. So can't it be null? Moreover can't it be simpy a boolean variable i.e.

boolean minBytesSatisfied = false

if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
replicaManagerFetchSatisfyingMinBytes = responseData;
=>
if (accumulatedBytes.get() >= shareFetchData.fetchParams().minBytes)
minBytesSatisfied = true;

if (replicaManagerFetchSatisfyingMinBytes.isEmpty() && !hasRequestTimedOut) {
=>
if (!minBytesSatisfied && !hasRequestTimedOut) {

return replicaManagerFetchSatisfyingMinBytes;
=>
return Collections.emptyMap()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it makes sense. I'll make the change.

replicaManagerFetchDataFromTryComplete = replicaManagerFetchData(topicPartitionData, false);
if (!replicaManagerFetchDataFromTryComplete.isEmpty())
return forceComplete();
}
log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can come to this code path getting no result from replicaManagerFetchData. Hence is the log line still correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right... I'll change it to log.info("Fetch cannot be completed for the partitions in the share fetch request for group {}, member {}, " + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet())

// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchData.future().complete(Collections.emptyMap());
return;
Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this extra variable or can just write later, if needed? And then no need of else block below.

replicaManagerFetchDataFromTryComplete = replicaManagerFetchData(topicPartitionData, true);

Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result =
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, fetchResponseData, sharePartitionManager, replicaManager);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchResponseData can still be empty, though processFetchResponse handles the empty check, is it intended? Though no harm, just checking with you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, since processFetchResponse can handle it, that's why I didn't add any check here

shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
releasePartitionLocks(shareFetchData.groupId(), fetchResponseData.keySet());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if partitions were locked but no response in data aarived then will the lock be correctly released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for that case, even when the data is not received from replica manager, the fetchResponseData should still have keys as the locked topic partitions and values as empty data, so it should work. Am I wrong in that understanding?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants