Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High GC pressure due to numerous temporary objects in high-concurrency consumption scenario #2951

Open
xiaomudk opened this issue Aug 5, 2024 · 0 comments

Comments

@xiaomudk
Copy link

xiaomudk commented Aug 5, 2024

Description

Hello Sarama maintainers,
I'm encountering high GC pressure when consuming Kafka messages using Sarama under a 100K TPS high concurrency scenario.

pprof profile
image

pprof heap

Type: alloc_objects
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top20
Showing nodes accounting for 345968905, 96.71% of 357746798 total
Dropped 298 nodes (cum <= 1788733)
Showing top 20 nodes out of 34
      flat  flat%   sum%        cum   cum%
 104992531 29.35% 29.35%  214004373 59.82%  github.com/IBM/sarama.(*MessageSet).decode
 101894004 28.48% 57.83%  101894004 28.48%  github.com/IBM/sarama.(*partitionConsumer).parseMessages
  96394846 26.94% 84.78%  211677820 59.17%  github.com/IBM/sarama.(*MessageBlock).decode
   5355012  1.50% 86.27%    5355012  1.50%  github.com/IBM/sarama.(*FetchRequest).AddBlock
   5266207  1.47% 87.74%  224329016 62.71%  github.com/IBM/sarama.(*FetchResponse).decode
   4128776  1.15% 88.90%    5161101  1.44%  github.com/eapache/go-xerial-snappy.DecodeInto
   3522638  0.98% 89.88%    3522638  0.98%  github.com/IBM/sarama.(*realDecoder).push
   3173296  0.89% 90.77%    3173296  0.89%  github.com/IBM/sarama.makeResponsePromise

After analyzing the pprof heap profile, I've identified several areas of concern related to frequent memory allocations during message decoding. Here are the key findings:

  1. in (*MessageSet).decode
func (ms *MessageSet) decode(pd packetDecoder) (err error) {
	ms.Messages = nil                                 // reset to nil

	for pd.remaining() > 0 {
		.....

		msb := new(MessageBlock)              // new MessageBlock objects are created for each message
		err = msb.decode(pd)
		if err == nil {
			ms.Messages = append(ms.Messages, msb)       // Append will dynamically expand
		} else if errors.Is(err, ErrInsufficientData) {
                         ...
		} else {
			return err
		}
	}

	return nil
}

Additionally, I noticed that the ms.Messages slice is reset to nil in each decode call. This approach might lead to unnecessary allocations and GC pressure. Would it be beneficial to reuse the existing slice instead of creating a new one each time? Also, is there a way to pre-allocate the slice with an estimated capacity to reduce the number of dynamic expansions?

  1. in (*MessageBlock).decode
func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
	......

	msb.Msg = new(Message)                   // a new Message object is created for each message block
	if err = msb.Msg.decode(pd); err != nil {
		return err
	}

	if err = pd.pop(); err != nil {
		return err
	}

	return nil
}
  1. in (*partitionConsumer).parseMessages
func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
	var messages []*ConsumerMessage           // should pre-allocate the slice with an estimated capacity
	for _, msgBlock := range msgSet.Messages {
		for _, msg := range msgBlock.Messages() {
			.......
			messages = append(messages, &ConsumerMessage{          // new ConsumerMessage objects are created for each parsed message
				Topic:          child.topic,
				Partition:      child.partition,
				Key:            msg.Msg.Key,
				Value:          msg.Msg.Value,
				Offset:         offset,
				Timestamp:      timestamp,
				BlockTimestamp: msgBlock.Msg.Timestamp,
			})
			child.offset = offset + 1
		}
	}
	if len(messages) == 0 {
		child.offset++
	}
	return messages, nil
}

Are there any optimization strategies or best practices you could recommend to reduce these allocations and mitigate the GC pressure? I'm open to suggestions on how to improve the performance in this high-concurrency scenario.
Thank you for your time and assistance.

Versions
Sarama Kafka Go
1.43.2 0.10.2 1.22
Configuration
	conf := sarama.NewConfig()
	conf.Version = sarama.V0_10_2_0
	conf.Consumer.Offsets.Initial = sarama.OffsetNewest
Logs
logs: CLICK ME


Additional Context
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant