Jelajahi Sumber

Fix "broker received out of order sequence" when brokers die

When the following three conditions are satisfied, the producer code can
skip message sequence numbers and cause the broker to complain that the
sequences are out of order:

* config.Producer.Idempotent is set
* The producer loses, and then regains, its connection to a broker
* The client code continues to attempt to produce messages whilst the
broker is unavailable.

For every message the client attempted to send while the broker is
unavailable, the transaction manager sequence number will be
incremented, however these messages will eventually fail and return an
error to the caller. When the broker re-appears, and another message is
published, it's sequence number is higher than the last one the broker
remembered - the values that were attempted while it was down were never
seen. Thus, from the broker's perspective, it's seeing out-of-order
sequence numbers.

The fix to this has a few parts:

* Don't obtain a sequence number from the transaction manager until
we're sure we want to try publishing the message
* Affix the producer ID and epoch to the message once the sequence is
generated
* Increment the transaction manager epoch (and reset all sequence
numbers to zero) when we permenantly fail to publish a message. That
represents a sequence that the broker will never see, so the only safe
thing to do is to roll over the epoch number.
* Ensure we don't publish message sets that contain messages from
multiple transaction manager epochs.
KJ Tsanaktsidis 4 tahun lalu
induk
melakukan
9df3038ad4
3 mengubah file dengan 135 tambahan dan 18 penghapusan
  1. 49 11
      async_producer.go
  2. 69 0
      async_producer_test.go
  3. 17 7
      produce_set.go

+ 49 - 11
async_producer.go

@@ -60,13 +60,28 @@ const (
 	noProducerEpoch = -1
 )
 
-func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
 	key := fmt.Sprintf("%s-%d", topic, partition)
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
 	sequence := t.sequenceNumbers[key]
 	t.sequenceNumbers[key] = sequence + 1
-	return sequence
+	return sequence, t.producerEpoch
+}
+
+func (t *transactionManager) bumpEpoch() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	t.producerEpoch++
+	for k := range t.sequenceNumbers {
+		t.sequenceNumbers[k] = 0
+	}
+}
+
+func (t *transactionManager) getProducerID() (int64, int16) {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	return t.producerID, t.producerEpoch
 }
 
 func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
@@ -208,6 +223,8 @@ type ProducerMessage struct {
 	flags          flagSet
 	expectation    chan *ProducerError
 	sequenceNumber int32
+	producerEpoch  int16
+	hasSequence    bool
 }
 
 const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
@@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int {
 func (m *ProducerMessage) clear() {
 	m.flags = 0
 	m.retries = 0
+	m.sequenceNumber = 0
+	m.producerEpoch = 0
+	m.hasSequence = false
 }
 
 // ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() {
 				continue
 			}
 		}
-		// All messages being retried (sent or not) have already had their retry count updated
-		if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
-			msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
-		}
 
 		handler := tp.handlers[msg.Partition]
 		if handler == nil {
@@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() {
 			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
 		}
 
+		// Now that we know we have a broker to actually try and send this message to, generate the sequence
+		// number for it.
+		// All messages being retried (sent or not) have already had their retry count updated
+		// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
+		if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
+			msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
+			msg.hasSequence = true
+		}
+
 		pp.brokerProducer.input <- msg
 	}
 }
@@ -748,12 +773,21 @@ func (bp *brokerProducer) run() {
 			}
 
 			if bp.buffer.wouldOverflow(msg) {
-				if err := bp.waitForSpace(msg); err != nil {
+				Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
+				if err := bp.waitForSpace(msg, false); err != nil {
 					bp.parent.retryMessage(msg, err)
 					continue
 				}
 			}
 
+			if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
+				// The epoch was reset, need to roll the buffer over
+				Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
+				if err := bp.waitForSpace(msg, true); err != nil {
+					bp.parent.retryMessage(msg, err)
+					continue
+				}
+			}
 			if err := bp.buffer.add(msg); err != nil {
 				bp.parent.returnError(msg, err)
 				continue
@@ -809,9 +843,7 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
 	return bp.currentRetries[msg.Topic][msg.Partition]
 }
 
-func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
-	Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
-
+func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
 	for {
 		select {
 		case response := <-bp.responses:
@@ -819,7 +851,7 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
 			// handling a response can change our state, so re-check some things
 			if reason := bp.needsRetry(msg); reason != nil {
 				return reason
-			} else if !bp.buffer.wouldOverflow(msg) {
+			} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
 				return nil
 			}
 		case bp.output <- bp.buffer:
@@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() {
 }
 
 func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
+	// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
+	// will never see a message with this number, so we can never continue the sequence.
+	if msg.hasSequence {
+		Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
+		p.txnmgr.bumpEpoch()
+	}
 	msg.clear()
 	pErr := &ProducerError{Msg: msg, Err: err}
 	if p.conf.Producer.Return.Errors {

+ 69 - 0
async_producer_test.go

@@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
+	broker := NewMockBroker(t, 1)
+	defer broker.Close()
+
+	metadataResponse := &MetadataResponse{
+		Version:      1,
+		ControllerID: 1,
+	}
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	initProducerID := &InitProducerIDResponse{
+		ThrottleTime:  0,
+		ProducerID:    1000,
+		ProducerEpoch: 1,
+	}
+	broker.Returns(initProducerID)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.Flush.Frequency = 10 * time.Millisecond
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Idempotent = true
+	config.Net.MaxOpenRequests = 1
+	config.Version = V0_11_0_0
+
+	producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer closeProducer(t, producer)
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
+	prodError := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
+	broker.Returns(prodError)
+	<-producer.Errors()
+
+	lastReqRes := broker.history[len(broker.history)-1]
+	lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
+	if lastProduceBatch.FirstSequence != 0 {
+		t.Error("first sequence not zero")
+	}
+	if lastProduceBatch.ProducerEpoch != 1 {
+		t.Error("first epoch was not one")
+	}
+
+	// Now if we produce again, the epoch should have rolled over.
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
+	broker.Returns(prodError)
+	<-producer.Errors()
+
+	lastReqRes = broker.history[len(broker.history)-1]
+	lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
+	if lastProduceBatch.FirstSequence != 0 {
+		t.Error("second sequence not zero")
+	}
+	if lastProduceBatch.ProducerEpoch <= 1 {
+		t.Error("second epoch was not > 1")
+	}
+}
+
 // TestBrokerProducerShutdown ensures that a call to shutdown stops the
 // brokerProducer run() loop and doesn't leak any goroutines
 func TestBrokerProducerShutdown(t *testing.T) {

+ 17 - 7
produce_set.go

@@ -13,17 +13,22 @@ type partitionSet struct {
 }
 
 type produceSet struct {
-	parent *asyncProducer
-	msgs   map[string]map[int32]*partitionSet
+	parent        *asyncProducer
+	msgs          map[string]map[int32]*partitionSet
+	producerID    int64
+	producerEpoch int16
 
 	bufferBytes int
 	bufferCount int
 }
 
 func newProduceSet(parent *asyncProducer) *produceSet {
+	pid, epoch := parent.txnmgr.getProducerID()
 	return &produceSet{
-		msgs:   make(map[string]map[int32]*partitionSet),
-		parent: parent,
+		msgs:          make(map[string]map[int32]*partitionSet),
+		parent:        parent,
+		producerID:    pid,
+		producerEpoch: epoch,
 	}
 }
 
@@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 				Version:          2,
 				Codec:            ps.parent.conf.Producer.Compression,
 				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
-				ProducerID:       ps.parent.txnmgr.producerID,
-				ProducerEpoch:    ps.parent.txnmgr.producerEpoch,
+				ProducerID:       ps.producerID,
+				ProducerEpoch:    ps.producerEpoch,
 			}
 			if ps.parent.conf.Producer.Idempotent {
 				batch.FirstSequence = msg.sequenceNumber
@@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		}
 		partitions[msg.Partition] = set
 	}
-	set.msgs = append(set.msgs, msg)
 
 	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
 			return errors.New("assertion failed: message out of sequence added to a batch")
 		}
+	}
+
+	// Past this point we can't return an error, because we've already added the message to the set.
+	set.msgs = append(set.msgs, msg)
+
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		// We are being conservative here to avoid having to prep encode the record
 		size += maximumRecordOverhead
 		rec := &Record{