Browse Source

Merge pull request #1661 from zendesk/ktsanaktsidis/fix_seq_out_of_order

Fix "broker received out of order sequence" when brokers die
Dominic Evans 5 years ago
parent
commit
f7b64cfa12
4 changed files with 213 additions and 18 deletions
  1. 49 11
      async_producer.go
  2. 69 0
      async_producer_test.go
  3. 78 0
      functional_producer_test.go
  4. 17 7
      produce_set.go

+ 49 - 11
async_producer.go

@@ -60,13 +60,28 @@ const (
 	noProducerEpoch = -1
 )
 
-func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
 	key := fmt.Sprintf("%s-%d", topic, partition)
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
 	sequence := t.sequenceNumbers[key]
 	t.sequenceNumbers[key] = sequence + 1
-	return sequence
+	return sequence, t.producerEpoch
+}
+
+func (t *transactionManager) bumpEpoch() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	t.producerEpoch++
+	for k := range t.sequenceNumbers {
+		t.sequenceNumbers[k] = 0
+	}
+}
+
+func (t *transactionManager) getProducerID() (int64, int16) {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	return t.producerID, t.producerEpoch
 }
 
 func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
@@ -208,6 +223,8 @@ type ProducerMessage struct {
 	flags          flagSet
 	expectation    chan *ProducerError
 	sequenceNumber int32
+	producerEpoch  int16
+	hasSequence    bool
 }
 
 const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
@@ -234,6 +251,9 @@ func (m *ProducerMessage) byteSize(version int) int {
 func (m *ProducerMessage) clear() {
 	m.flags = 0
 	m.retries = 0
+	m.sequenceNumber = 0
+	m.producerEpoch = 0
+	m.hasSequence = false
 }
 
 // ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -388,10 +408,6 @@ func (tp *topicProducer) dispatch() {
 				continue
 			}
 		}
-		// All messages being retried (sent or not) have already had their retry count updated
-		if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
-			msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
-		}
 
 		handler := tp.handlers[msg.Partition]
 		if handler == nil {
@@ -570,6 +586,15 @@ func (pp *partitionProducer) dispatch() {
 			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
 		}
 
+		// Now that we know we have a broker to actually try and send this message to, generate the sequence
+		// number for it.
+		// All messages being retried (sent or not) have already had their retry count updated
+		// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
+		if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
+			msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
+			msg.hasSequence = true
+		}
+
 		pp.brokerProducer.input <- msg
 	}
 }
@@ -748,12 +773,21 @@ func (bp *brokerProducer) run() {
 			}
 
 			if bp.buffer.wouldOverflow(msg) {
-				if err := bp.waitForSpace(msg); err != nil {
+				Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
+				if err := bp.waitForSpace(msg, false); err != nil {
 					bp.parent.retryMessage(msg, err)
 					continue
 				}
 			}
 
+			if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
+				// The epoch was reset, need to roll the buffer over
+				Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
+				if err := bp.waitForSpace(msg, true); err != nil {
+					bp.parent.retryMessage(msg, err)
+					continue
+				}
+			}
 			if err := bp.buffer.add(msg); err != nil {
 				bp.parent.returnError(msg, err)
 				continue
@@ -809,9 +843,7 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
 	return bp.currentRetries[msg.Topic][msg.Partition]
 }
 
-func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
-	Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
-
+func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
 	for {
 		select {
 		case response := <-bp.responses:
@@ -819,7 +851,7 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
 			// handling a response can change our state, so re-check some things
 			if reason := bp.needsRetry(msg); reason != nil {
 				return reason
-			} else if !bp.buffer.wouldOverflow(msg) {
+			} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
 				return nil
 			}
 		case bp.output <- bp.buffer:
@@ -1030,6 +1062,12 @@ func (p *asyncProducer) shutdown() {
 }
 
 func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
+	// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
+	// will never see a message with this number, so we can never continue the sequence.
+	if msg.hasSequence {
+		Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
+		p.txnmgr.bumpEpoch()
+	}
 	msg.clear()
 	pErr := &ProducerError{Msg: msg, Err: err}
 	if p.conf.Producer.Return.Errors {

+ 69 - 0
async_producer_test.go

@@ -1130,6 +1130,75 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
+	broker := NewMockBroker(t, 1)
+	defer broker.Close()
+
+	metadataResponse := &MetadataResponse{
+		Version:      1,
+		ControllerID: 1,
+	}
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	initProducerID := &InitProducerIDResponse{
+		ThrottleTime:  0,
+		ProducerID:    1000,
+		ProducerEpoch: 1,
+	}
+	broker.Returns(initProducerID)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.Flush.Frequency = 10 * time.Millisecond
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Idempotent = true
+	config.Net.MaxOpenRequests = 1
+	config.Version = V0_11_0_0
+
+	producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer closeProducer(t, producer)
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
+	prodError := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
+	broker.Returns(prodError)
+	<-producer.Errors()
+
+	lastReqRes := broker.history[len(broker.history)-1]
+	lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
+	if lastProduceBatch.FirstSequence != 0 {
+		t.Error("first sequence not zero")
+	}
+	if lastProduceBatch.ProducerEpoch != 1 {
+		t.Error("first epoch was not one")
+	}
+
+	// Now if we produce again, the epoch should have rolled over.
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
+	broker.Returns(prodError)
+	<-producer.Errors()
+
+	lastReqRes = broker.history[len(broker.history)-1]
+	lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
+	if lastProduceBatch.FirstSequence != 0 {
+		t.Error("second sequence not zero")
+	}
+	if lastProduceBatch.ProducerEpoch <= 1 {
+		t.Error("second epoch was not > 1")
+	}
+}
+
 // TestBrokerProducerShutdown ensures that a call to shutdown stops the
 // brokerProducer run() loop and doesn't leak any goroutines
 func TestBrokerProducerShutdown(t *testing.T) {

+ 78 - 0
functional_producer_test.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"fmt"
 	"os"
+	"strings"
 	"sync"
 	"testing"
 	"time"
@@ -96,6 +97,83 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
 	safeClose(t, producer)
 }
 
+func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	config := NewConfig()
+	config.Producer.Flush.Frequency = 250 * time.Millisecond
+	config.Producer.Idempotent = true
+	config.Producer.Timeout = 500 * time.Millisecond
+	config.Producer.Retry.Max = 1
+	config.Producer.Retry.Backoff = 500 * time.Millisecond
+	config.Producer.Return.Successes = true
+	config.Producer.Return.Errors = true
+	config.Producer.RequiredAcks = WaitForAll
+	config.Net.MaxOpenRequests = 1
+	config.Version = V0_11_0_0
+
+	producer, err := NewSyncProducer(kafkaBrokers, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, producer)
+
+	// Successfully publish a few messages
+	for i := 0; i < 10; i++ {
+		_, _, err = producer.SendMessage(&ProducerMessage{
+			Topic: "test.1",
+			Value: StringEncoder(fmt.Sprintf("%d message", i)),
+		})
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// break the brokers.
+	for proxyName, proxy := range Proxies {
+		if !strings.Contains(proxyName, "kafka") {
+			continue
+		}
+		if err := proxy.Disable(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// This should fail hard now
+	for i := 10; i < 20; i++ {
+		_, _, err = producer.SendMessage(&ProducerMessage{
+			Topic: "test.1",
+			Value: StringEncoder(fmt.Sprintf("%d message", i)),
+		})
+		if err == nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Now bring the proxy back up
+	for proxyName, proxy := range Proxies {
+		if !strings.Contains(proxyName, "kafka") {
+			continue
+		}
+		if err := proxy.Enable(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// We should be able to publish again (once everything calms down)
+	// (otherwise it times out)
+	for {
+		_, _, err = producer.SendMessage(&ProducerMessage{
+			Topic: "test.1",
+			Value: StringEncoder("comeback message"),
+		})
+		if err == nil {
+			break
+		}
+	}
+}
+
 func testProducingMessages(t *testing.T, config *Config) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)

+ 17 - 7
produce_set.go

@@ -13,17 +13,22 @@ type partitionSet struct {
 }
 
 type produceSet struct {
-	parent *asyncProducer
-	msgs   map[string]map[int32]*partitionSet
+	parent        *asyncProducer
+	msgs          map[string]map[int32]*partitionSet
+	producerID    int64
+	producerEpoch int16
 
 	bufferBytes int
 	bufferCount int
 }
 
 func newProduceSet(parent *asyncProducer) *produceSet {
+	pid, epoch := parent.txnmgr.getProducerID()
 	return &produceSet{
-		msgs:   make(map[string]map[int32]*partitionSet),
-		parent: parent,
+		msgs:          make(map[string]map[int32]*partitionSet),
+		parent:        parent,
+		producerID:    pid,
+		producerEpoch: epoch,
 	}
 }
 
@@ -65,8 +70,8 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 				Version:          2,
 				Codec:            ps.parent.conf.Producer.Compression,
 				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
-				ProducerID:       ps.parent.txnmgr.producerID,
-				ProducerEpoch:    ps.parent.txnmgr.producerEpoch,
+				ProducerID:       ps.producerID,
+				ProducerEpoch:    ps.producerEpoch,
 			}
 			if ps.parent.conf.Producer.Idempotent {
 				batch.FirstSequence = msg.sequenceNumber
@@ -78,12 +83,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		}
 		partitions[msg.Partition] = set
 	}
-	set.msgs = append(set.msgs, msg)
 
 	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
 			return errors.New("assertion failed: message out of sequence added to a batch")
 		}
+	}
+
+	// Past this point we can't return an error, because we've already added the message to the set.
+	set.msgs = append(set.msgs, msg)
+
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		// We are being conservative here to avoid having to prep encode the record
 		size += maximumRecordOverhead
 		rec := &Record{