Browse Source

Fix another hypothetical wait group issue

Don't increment and immediately decrement the wait group on a message which we
just return with ErrShuttingDown. Otherwise:
```
prod.AsyncClose()
prod.Input() <- msg
```
1. shutdown() goroutine calls Wait()
2. while in the call to Wait(), Go switches contexts
3. the message is sent on Input(), is received, and causes the wait counter to
   increment from 0 while being waited on

Where the WaitGroup docs say "calls with a positive delta that occur when the
counter is zero must happen before a Wait".
Evan Huus 10 năm trước cách đây
mục cha
commit
6b85c47d32
1 tập tin đã thay đổi với 9 bổ sung2 xóa
  1. 9 2
      async_producer.go

+ 9 - 2
async_producer.go

@@ -224,11 +224,18 @@ func (p *asyncProducer) topicDispatcher() {
 			p.inFlight.Done()
 			continue
 		} else if msg.retries == 0 {
-			p.inFlight.Add(1)
 			if shuttingDown {
-				p.returnError(msg, ErrShuttingDown)
+				// we can't just call returnError here because that decrements the wait group,
+				// which hasn't been incremented yet for this message, and shouldn't be
+				pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
+				if p.conf.Producer.Return.Errors {
+					p.errors <- pErr
+				} else {
+					Logger.Println(pErr)
+				}
 				continue
 			}
+			p.inFlight.Add(1)
 		}
 
 		if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||