Browse Source

Merge pull request #451 from Shopify/wait-group-shutdown-error

Fix another hypothetical wait group issue
Evan Huus 10 years ago
parent
commit
e8ad5e2bb9
1 changed files with 9 additions and 2 deletions
  1. 9 2
      async_producer.go

+ 9 - 2
async_producer.go

@@ -224,11 +224,18 @@ func (p *asyncProducer) topicDispatcher() {
 			p.inFlight.Done()
 			continue
 		} else if msg.retries == 0 {
-			p.inFlight.Add(1)
 			if shuttingDown {
-				p.returnError(msg, ErrShuttingDown)
+				// we can't just call returnError here because that decrements the wait group,
+				// which hasn't been incremented yet for this message, and shouldn't be
+				pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
+				if p.conf.Producer.Return.Errors {
+					p.errors <- pErr
+				} else {
+					Logger.Println(pErr)
+				}
 				continue
 			}
+			p.inFlight.Add(1)
 		}
 
 		if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||