Browse Source

Merge pull request #214 from Shopify/retryable-producer

producer: ensure returned messages have no flags
Evan Huus 11 years ago
parent
commit
b5c1471686
2 changed files with 50 additions and 14 deletions
  1. 15 9
      producer.go
  2. 35 5
      producer_test.go

+ 15 - 9
producer.go

@@ -257,7 +257,7 @@ func (p *Producer) topicDispatcher() {
 		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
 			(msg.byteSize() > p.config.MaxMessageBytes) {
 
-			p.errors <- &ProduceError{Msg: msg, Err: MessageSizeTooLarge}
+			p.returnError(msg, MessageSizeTooLarge)
 			continue
 		}
 
@@ -281,7 +281,7 @@ func (p *Producer) topicDispatcher() {
 	p.retries <- &MessageToSend{flags: shutdown}
 
 	for msg := range p.input {
-		p.errors <- &ProduceError{Msg: msg, Err: ShuttingDown}
+		p.returnError(msg, ShuttingDown)
 	}
 
 	close(p.errors)
@@ -297,7 +297,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 		if msg.flags&retried == 0 {
 			err := p.assignPartition(partitioner, msg)
 			if err != nil {
-				p.errors <- &ProduceError{Msg: msg, Err: err}
+				p.returnError(msg, err)
 				continue
 			}
 		}
@@ -383,14 +383,14 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			if backlog != nil {
 				err = p.client.RefreshTopicMetadata(topic)
 				if err != nil {
-					p.errors <- &ProduceError{Msg: msg, Err: err}
+					p.returnError(msg, err)
 					continue
 				}
 			}
 
 			leader, err = p.client.Leader(topic, partition)
 			if err != nil {
-				p.errors <- &ProduceError{Msg: msg, Err: err}
+				p.returnError(msg, err)
 				continue
 			}
 
@@ -653,13 +653,13 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
 				var err error
 				if msg.Key != nil {
 					if keyBytes, err = msg.Key.Encode(); err != nil {
-						p.errors <- &ProduceError{Msg: msg, Err: err}
+						p.returnError(msg, err)
 						continue
 					}
 				}
 				if msg.Value != nil {
 					if valBytes, err = msg.Value.Encode(); err != nil {
-						p.errors <- &ProduceError{Msg: msg, Err: err}
+						p.returnError(msg, err)
 						continue
 					}
 				}
@@ -701,10 +701,15 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
 	return req
 }
 
+func (p *Producer) returnError(msg *MessageToSend, err error) {
+	msg.flags = 0
+	p.errors <- &ProduceError{Msg: msg, Err: err}
+}
+
 func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
 	for _, msg := range batch {
 		if msg != nil {
-			p.errors <- &ProduceError{Msg: msg, Err: err}
+			p.returnError(msg, err)
 		}
 	}
 }
@@ -712,6 +717,7 @@ func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
 func (p *Producer) returnSuccesses(batch []*MessageToSend) {
 	for _, msg := range batch {
 		if msg != nil {
+			msg.flags = 0
 			p.successes <- msg
 		}
 	}
@@ -723,7 +729,7 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
 			continue
 		}
 		if msg.flags&retried == retried {
-			p.errors <- &ProduceError{Msg: msg, Err: err}
+			p.returnError(msg, err)
 		} else {
 			msg.flags |= retried
 			p.retries <- msg

+ 35 - 5
producer_test.go

@@ -88,7 +88,13 @@ func TestProducer(t *testing.T) {
 		select {
 		case msg := <-producer.Errors():
 			t.Error(msg.Err)
-		case <-producer.Successes():
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
 		}
 	}
 }
@@ -133,7 +139,13 @@ func TestProducerMultipleFlushes(t *testing.T) {
 			select {
 			case msg := <-producer.Errors():
 				t.Error(msg.Err)
-			case <-producer.Successes():
+				if msg.Msg.flags != 0 {
+					t.Error("Message had flags set")
+				}
+			case msg := <-producer.Successes():
+				if msg.flags != 0 {
+					t.Error("Message had flags set")
+				}
 			}
 		}
 	}
@@ -185,7 +197,13 @@ func TestProducerMultipleBrokers(t *testing.T) {
 		select {
 		case msg := <-producer.Errors():
 			t.Error(msg.Err)
-		case <-producer.Successes():
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
 		}
 	}
 }
@@ -233,7 +251,13 @@ func TestProducerFailureRetry(t *testing.T) {
 		select {
 		case msg := <-producer.Errors():
 			t.Error(msg.Err)
-		case <-producer.Successes():
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
 		}
 	}
 	broker2.Close()
@@ -246,7 +270,13 @@ func TestProducerFailureRetry(t *testing.T) {
 		select {
 		case msg := <-producer.Errors():
 			t.Error(msg.Err)
-		case <-producer.Successes():
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
 		}
 	}