Преглед изворни кода

Refactor the producer, part 1

First stand-alone chunk extracted from #544. This introduces and uses a
produceSet structure which takes care of collecting messages, rejecting ones
which don't encode, and turning the rest into a `ProduceRequest`.

This has several knock-on effects:
- we no longer nil out array entries, so nil checks in several places (e.g.
  `returnErrors`) can be removed
- parseResponse gets re-indented since it loops over the partitions in the set
  via a callback now rather than a pair of nested loops
- groupAndFilter is much simpler, a lot of its logic now lives in the produceSet
- the flusher has to use the set to return errors/successes/retries, rather than
  `batch` which may now contain messages not in the eventual request
- keyCache and valueCache can be removed
Evan Huus пре 10 година
родитељ
комит
aa044e2295
1 измењених фајлова са 167 додато и 131 уклоњено
  1. 167 131
      async_producer.go

+ 167 - 131
async_producer.go

@@ -137,12 +137,12 @@ type ProducerMessage struct {
 
 
 	retries int
 	retries int
 	flags   flagSet
 	flags   flagSet
-
-	keyCache, valueCache []byte
 }
 }
 
 
+const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
+
 func (m *ProducerMessage) byteSize() int {
 func (m *ProducerMessage) byteSize() int {
-	size := 26 // the metadata overhead of CRC, flags, etc.
+	size := producerMessageOverhead
 	if m.Key != nil {
 	if m.Key != nil {
 		size += m.Key.Length()
 		size += m.Key.Length()
 	}
 	}
@@ -155,8 +155,6 @@ func (m *ProducerMessage) byteSize() int {
 func (m *ProducerMessage) clear() {
 func (m *ProducerMessage) clear() {
 	m.flags = 0
 	m.flags = 0
 	m.retries = 0
 	m.retries = 0
-	m.keyCache = nil
-	m.valueCache = nil
 }
 }
 
 
 // ProducerError is the type of error generated when the producer fails to deliver a message.
 // ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -645,50 +643,54 @@ func (f *flusher) run() {
 			continue
 			continue
 		}
 		}
 
 
-		msgSets := f.groupAndFilter(batch)
-		request := f.parent.buildRequest(msgSets)
-		if request == nil {
+		set := f.groupAndFilter(batch)
+		if set.empty() {
 			continue
 			continue
 		}
 		}
 
 
+		request := set.buildRequest()
 		response, err := f.broker.Produce(request)
 		response, err := f.broker.Produce(request)
 
 
 		switch err.(type) {
 		switch err.(type) {
 		case nil:
 		case nil:
 			break
 			break
 		case PacketEncodingError:
 		case PacketEncodingError:
-			f.parent.returnErrors(batch, err)
+			set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+				f.parent.returnErrors(msgs, err)
+			})
 			continue
 			continue
 		default:
 		default:
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
 			f.parent.abandonBrokerConnection(f.broker)
 			f.parent.abandonBrokerConnection(f.broker)
 			_ = f.broker.Close()
 			_ = f.broker.Close()
 			closing = err
 			closing = err
-			f.parent.retryMessages(batch, err)
+			set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+				f.parent.retryMessages(msgs, err)
+			})
 			continue
 			continue
 		}
 		}
 
 
 		if response == nil {
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			f.parent.returnSuccesses(batch)
+			set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+				f.parent.returnSuccesses(msgs)
+			})
 			continue
 			continue
 		}
 		}
 
 
-		f.parseResponse(msgSets, response)
+		f.parseResponse(set, response)
 	}
 	}
 	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
 	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
 }
 }
 
 
-func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
-	var err error
-	msgSets := make(map[string]map[int32][]*ProducerMessage)
+func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet {
+	set := newProduceSet(f.parent)
 
 
-	for i, msg := range batch {
+	for _, msg := range batch {
 
 
 		if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
 		if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
 			// we're currently retrying this partition so we need to filter out this message
 			// we're currently retrying this partition so we need to filter out this message
 			f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
 			f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
-			batch[i] = nil
 
 
 			if msg.flags&chaser == chaser {
 			if msg.flags&chaser == chaser {
 				// ...but now we can start processing future messages again
 				// ...but now we can start processing future messages again
@@ -700,68 +702,47 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32]
 			continue
 			continue
 		}
 		}
 
 
-		if msg.Key != nil {
-			if msg.keyCache, err = msg.Key.Encode(); err != nil {
-				f.parent.returnError(msg, err)
-				batch[i] = nil
-				continue
-			}
-		}
-
-		if msg.Value != nil {
-			if msg.valueCache, err = msg.Value.Encode(); err != nil {
-				f.parent.returnError(msg, err)
-				batch[i] = nil
-				continue
-			}
-		}
-
-		partitionSet := msgSets[msg.Topic]
-		if partitionSet == nil {
-			partitionSet = make(map[int32][]*ProducerMessage)
-			msgSets[msg.Topic] = partitionSet
+		if err := set.add(msg); err != nil {
+			f.parent.returnError(msg, err)
+			continue
 		}
 		}
-
-		partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
 	}
 	}
 
 
-	return msgSets
+	return set
 }
 }
 
 
-func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
+func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) {
 	// we iterate through the blocks in the request set, not the response, so that we notice
 	// we iterate through the blocks in the request set, not the response, so that we notice
 	// if the response is missing a block completely
 	// if the response is missing a block completely
-	for topic, partitionSet := range msgSets {
-		for partition, msgs := range partitionSet {
-			block := response.GetBlock(topic, partition)
-			if block == nil {
-				f.parent.returnErrors(msgs, ErrIncompleteResponse)
-				continue
-			}
+	set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+		block := response.GetBlock(topic, partition)
+		if block == nil {
+			f.parent.returnErrors(msgs, ErrIncompleteResponse)
+			return
+		}
 
 
-			switch block.Err {
-			// Success
-			case ErrNoError:
-				for i := range msgs {
-					msgs[i].Offset = block.Offset + int64(i)
-				}
-				f.parent.returnSuccesses(msgs)
-			// Retriable errors
-			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
-				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-				Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
-					f.broker.ID(), topic, partition, block.Err)
-				if f.currentRetries[topic] == nil {
-					f.currentRetries[topic] = make(map[int32]error)
-				}
-				f.currentRetries[topic][partition] = block.Err
-				f.parent.retryMessages(msgs, block.Err)
-			// Other non-retriable errors
-			default:
-				f.parent.returnErrors(msgs, block.Err)
+		switch block.Err {
+		// Success
+		case ErrNoError:
+			for i := range msgs {
+				msgs[i].Offset = block.Offset + int64(i)
+			}
+			f.parent.returnSuccesses(msgs)
+		// Retriable errors
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
+			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
+			Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
+				f.broker.ID(), topic, partition, block.Err)
+			if f.currentRetries[topic] == nil {
+				f.currentRetries[topic] = make(map[int32]error)
 			}
 			}
+			f.currentRetries[topic][partition] = block.Err
+			f.parent.retryMessages(msgs, block.Err)
+		// Other non-retriable errors
+		default:
+			f.parent.returnErrors(msgs, block.Err)
 		}
 		}
-	}
+	})
 }
 }
 
 
 // singleton
 // singleton
@@ -791,6 +772,112 @@ func (p *asyncProducer) retryHandler() {
 	}
 	}
 }
 }
 
 
+// produceSet
+
+type partitionSet struct {
+	msgs        []*ProducerMessage
+	setToSend   *MessageSet
+	bufferBytes int
+}
+
+type produceSet struct {
+	parent *asyncProducer
+	msgs   map[string]map[int32]*partitionSet
+
+	bufferBytes int
+	bufferCount int
+}
+
+func newProduceSet(parent *asyncProducer) *produceSet {
+	return &produceSet{
+		msgs:   make(map[string]map[int32]*partitionSet),
+		parent: parent,
+	}
+}
+
+func (ps *produceSet) add(msg *ProducerMessage) error {
+	var err error
+	var key, val []byte
+
+	if msg.Key != nil {
+		if key, err = msg.Key.Encode(); err != nil {
+			return err
+		}
+	}
+
+	if msg.Value != nil {
+		if val, err = msg.Value.Encode(); err != nil {
+			return err
+		}
+	}
+
+	partitions := ps.msgs[msg.Topic]
+	if partitions == nil {
+		partitions = make(map[int32]*partitionSet)
+		ps.msgs[msg.Topic] = partitions
+	}
+
+	set := partitions[msg.Partition]
+	if set == nil {
+		set = &partitionSet{setToSend: new(MessageSet)}
+		partitions[msg.Partition] = set
+	}
+
+	set.msgs = append(set.msgs, msg)
+	set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val})
+
+	size := producerMessageOverhead + len(key) + len(val)
+	set.bufferBytes += size
+	ps.bufferBytes += size
+	ps.bufferCount++
+
+	return nil
+}
+
+func (ps *produceSet) buildRequest() *ProduceRequest {
+	req := &ProduceRequest{
+		RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
+		Timeout:      int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
+	}
+
+	for topic, partitionSet := range ps.msgs {
+		for partition, set := range partitionSet {
+			if ps.parent.conf.Producer.Compression == CompressionNone {
+				req.AddSet(topic, partition, set.setToSend)
+			} else {
+				// When compression is enabled, the entire set for each partition is compressed
+				// and sent as the payload of a single fake "message" with the appropriate codec
+				// set and no key. When the server sees a message with a compression codec, it
+				// decompresses the payload and treats the result as its message set.
+				payload, err := encode(set.setToSend)
+				if err != nil {
+					Logger.Println(err) // if this happens, it's basically our fault.
+					panic(err)
+				}
+				req.AddMessage(topic, partition, &Message{
+					Codec: ps.parent.conf.Producer.Compression,
+					Key:   nil,
+					Value: payload,
+				})
+			}
+		}
+	}
+
+	return req
+}
+
+func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) {
+	for topic, partitionSet := range ps.msgs {
+		for partition, set := range partitionSet {
+			cb(topic, partition, set.msgs)
+		}
+	}
+}
+
+func (ps *produceSet) empty() bool {
+	return ps.bufferCount == 0
+}
+
 // utility functions
 // utility functions
 
 
 func (p *asyncProducer) shutdown() {
 func (p *asyncProducer) shutdown() {
@@ -813,53 +900,6 @@ func (p *asyncProducer) shutdown() {
 	close(p.successes)
 	close(p.successes)
 }
 }
 
 
-func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
-
-	req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
-	empty := true
-
-	for topic, partitionSet := range batch {
-		for partition, msgSet := range partitionSet {
-			setToSend := new(MessageSet)
-			setSize := 0
-			for _, msg := range msgSet {
-				if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
-					// compression causes message-sets to be wrapped as single messages, which have tighter
-					// size requirements, so we have to respect those limits
-					valBytes, err := encode(setToSend)
-					if err != nil {
-						Logger.Println(err) // if this happens, it's basically our fault.
-						panic(err)
-					}
-					req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
-					setToSend = new(MessageSet)
-					setSize = 0
-				}
-				setSize += msg.byteSize()
-
-				setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache})
-				empty = false
-			}
-
-			if p.conf.Producer.Compression == CompressionNone {
-				req.AddSet(topic, partition, setToSend)
-			} else {
-				valBytes, err := encode(setToSend)
-				if err != nil {
-					Logger.Println(err) // if this happens, it's basically our fault.
-					panic(err)
-				}
-				req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
-			}
-		}
-	}
-
-	if empty {
-		return nil
-	}
-	return req
-}
-
 func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
 func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
 	msg.clear()
 	msg.clear()
 	pErr := &ProducerError{Msg: msg, Err: err}
 	pErr := &ProducerError{Msg: msg, Err: err}
@@ -873,17 +913,12 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
 
 
 func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
 func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
 	for _, msg := range batch {
 	for _, msg := range batch {
-		if msg != nil {
-			p.returnError(msg, err)
-		}
+		p.returnError(msg, err)
 	}
 	}
 }
 }
 
 
 func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
 func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
 	for _, msg := range batch {
 	for _, msg := range batch {
-		if msg == nil {
-			continue
-		}
 		if p.conf.Producer.Return.Successes {
 		if p.conf.Producer.Return.Successes {
 			msg.clear()
 			msg.clear()
 			p.successes <- msg
 			p.successes <- msg
@@ -892,17 +927,18 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
 	}
 	}
 }
 }
 
 
+func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
+	if msg.retries >= p.conf.Producer.Retry.Max {
+		p.returnError(msg, err)
+	} else {
+		msg.retries++
+		p.retries <- msg
+	}
+}
+
 func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
 func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
 	for _, msg := range batch {
 	for _, msg := range batch {
-		if msg == nil {
-			continue
-		}
-		if msg.retries >= p.conf.Producer.Retry.Max {
-			p.returnError(msg, err)
-		} else {
-			msg.retries++
-			p.retries <- msg
-		}
+		p.retryMessage(msg, err)
 	}
 	}
 }
 }