123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package sarama
- import (
- "encoding/binary"
- "errors"
- "time"
- )
- type partitionSet struct {
- msgs []*ProducerMessage
- recordsToSend Records
- bufferBytes int
- }
- type produceSet struct {
- parent *asyncProducer
- msgs map[string]map[int32]*partitionSet
- bufferBytes int
- bufferCount int
- }
- func newProduceSet(parent *asyncProducer) *produceSet {
- return &produceSet{
- msgs: make(map[string]map[int32]*partitionSet),
- parent: parent,
- }
- }
- func (ps *produceSet) add(msg *ProducerMessage) error {
- var err error
- var key, val []byte
- if msg.Key != nil {
- if key, err = msg.Key.Encode(); err != nil {
- return err
- }
- }
- if msg.Value != nil {
- if val, err = msg.Value.Encode(); err != nil {
- return err
- }
- }
- timestamp := msg.Timestamp
- if timestamp.IsZero() {
- timestamp = time.Now()
- }
- timestamp = timestamp.Truncate(time.Millisecond)
- partitions := ps.msgs[msg.Topic]
- if partitions == nil {
- partitions = make(map[int32]*partitionSet)
- ps.msgs[msg.Topic] = partitions
- }
- var size int
- set := partitions[msg.Partition]
- if set == nil {
- if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
- batch := &RecordBatch{
- FirstTimestamp: timestamp,
- Version: 2,
- Codec: ps.parent.conf.Producer.Compression,
- CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
- ProducerID: ps.parent.txnmgr.producerID,
- ProducerEpoch: ps.parent.txnmgr.producerEpoch,
- }
- if ps.parent.conf.Producer.Idempotent {
- batch.FirstSequence = msg.sequenceNumber
- }
- set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
- size = recordBatchOverhead
- } else {
- set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
- }
- partitions[msg.Partition] = set
- }
- set.msgs = append(set.msgs, msg)
- if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
- if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
- return errors.New("assertion failed: message out of sequence added to a batch")
- }
- // We are being conservative here to avoid having to prep encode the record
- size += maximumRecordOverhead
- rec := &Record{
- Key: key,
- Value: val,
- TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
- }
- size += len(key) + len(val)
- if len(msg.Headers) > 0 {
- rec.Headers = make([]*RecordHeader, len(msg.Headers))
- for i := range msg.Headers {
- rec.Headers[i] = &msg.Headers[i]
- size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
- }
- }
- set.recordsToSend.RecordBatch.addRecord(rec)
- } else {
- msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
- if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
- msgToSend.Timestamp = timestamp
- msgToSend.Version = 1
- }
- set.recordsToSend.MsgSet.addMessage(msgToSend)
- size = producerMessageOverhead + len(key) + len(val)
- }
- set.bufferBytes += size
- ps.bufferBytes += size
- ps.bufferCount++
- return nil
- }
- func (ps *produceSet) buildRequest() *ProduceRequest {
- req := &ProduceRequest{
- RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
- Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
- }
- if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
- req.Version = 2
- }
- if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
- req.Version = 3
- }
- if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
- req.Version = 7
- }
- for topic, partitionSets := range ps.msgs {
- for partition, set := range partitionSets {
- if req.Version >= 3 {
- // If the API version we're hitting is 3 or greater, we need to calculate
- // offsets for each record in the batch relative to FirstOffset.
- // Additionally, we must set LastOffsetDelta to the value of the last offset
- // in the batch. Since the OffsetDelta of the first record is 0, we know that the
- // final record of any batch will have an offset of (# of records in batch) - 1.
- // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
- // under the RecordBatch section for details.)
- rb := set.recordsToSend.RecordBatch
- if len(rb.Records) > 0 {
- rb.LastOffsetDelta = int32(len(rb.Records) - 1)
- for i, record := range rb.Records {
- record.OffsetDelta = int64(i)
- }
- }
- req.AddBatch(topic, partition, rb)
- continue
- }
- if ps.parent.conf.Producer.Compression == CompressionNone {
- req.AddSet(topic, partition, set.recordsToSend.MsgSet)
- } else {
- // When compression is enabled, the entire set for each partition is compressed
- // and sent as the payload of a single fake "message" with the appropriate codec
- // set and no key. When the server sees a message with a compression codec, it
- // decompresses the payload and treats the result as its message set.
- if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
- // If our version is 0.10 or later, assign relative offsets
- // to the inner messages. This lets the broker avoid
- // recompressing the message set.
- // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
- // for details on relative offsets.)
- for i, msg := range set.recordsToSend.MsgSet.Messages {
- msg.Offset = int64(i)
- }
- }
- payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
- if err != nil {
- Logger.Println(err) // if this happens, it's basically our fault.
- panic(err)
- }
- compMsg := &Message{
- Codec: ps.parent.conf.Producer.Compression,
- CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
- Key: nil,
- Value: payload,
- Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
- }
- if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
- compMsg.Version = 1
- compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
- }
- req.AddMessage(topic, partition, compMsg)
- }
- }
- }
- return req
- }
- func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
- for topic, partitionSet := range ps.msgs {
- for partition, set := range partitionSet {
- cb(topic, partition, set)
- }
- }
- }
- func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
- if ps.msgs[topic] == nil {
- return nil
- }
- set := ps.msgs[topic][partition]
- if set == nil {
- return nil
- }
- ps.bufferBytes -= set.bufferBytes
- ps.bufferCount -= len(set.msgs)
- delete(ps.msgs[topic], partition)
- return set.msgs
- }
- func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
- version := 1
- if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
- version = 2
- }
- switch {
- // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
- case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
- return true
- // Would we overflow the size-limit of a message-batch for this partition?
- case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
- ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
- return true
- // Would we overflow simply in number of messages?
- case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
- return true
- default:
- return false
- }
- }
- func (ps *produceSet) readyToFlush() bool {
- switch {
- // If we don't have any messages, nothing else matters
- case ps.empty():
- return false
- // If all three config values are 0, we always flush as-fast-as-possible
- case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
- return true
- // If we've passed the message trigger-point
- case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
- return true
- // If we've passed the byte trigger-point
- case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
- return true
- default:
- return false
- }
- }
- func (ps *produceSet) empty() bool {
- return ps.bufferCount == 0
- }
|