Kaynağa Gözat

fix brokerProducer goroutine leak (#1442)

* fix brokerProducer goroutine leak

Every create a of a brokerProducer spins off a goroutine for the run()
loop func, but this appeared to just be abandoned if the brokerProducer
was shutdown and would manifest as orphaned goroutine(s) with a long
select time leaking over time:

e.g.,
```
goroutine 6982 [select, 1868 minutes]:
github.com/Shopify/sarama.(*brokerProducer).run(0xc420c4ac00)
	/home/travis/build/org/repo/vendor/src/github.com/Shopify/sarama/async_producer.go:672 +0x258
github.com/Shopify/sarama.(*brokerProducer).(github.com/Shopify/sarama.run)-fm()
	/home/travis/build/org/repo/vendor/src/github.com/Shopify/sarama/async_producer.go:622 +0x2a
github.com/Shopify/sarama.withRecover(0xc4207aa3f0)
	/home/travis/build/org/repo/vendor/src/github.com/Shopify/sarama/utils.go:45 +0x43
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
	/home/travis/build/org/repo/vendor/src/github.com/Shopify/sarama/async_producer.go:622 +0x1b8
```

Tidied up some chan waits that weren't checking the ok state and also
added an explicit stopchan for the run loop and a unittest to cover the
leak case.

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>

* Bump test timeout (for functional tests really) up

The functional tests seem to have hovered around the 4 minute mark for a
while now. I think bumping the timeout up to 6m should make the Travis
runs less brittle.

* Fix coverage concat and show per-func in travis

- the existing coverage.txt concatenation included multiple `mode:
  atomic` lines which fail to parse correctly in the `go tool cover`
  tooling, so update Makefile so only one modeline exists

- add a call to `go tool cover -func coverage.txt` to output a
  per-function summary of the coverage in the Travis build log
Dominic Evans 6 yıl önce
ebeveyn
işleme
9f8650a8fa
6 değiştirilmiş dosya ile 79 ekleme ve 10 silme
  1. 1 0
      .travis.yml
  2. 3 3
      Makefile
  3. 18 6
      async_producer.go
  4. 51 1
      async_producer_test.go
  5. 2 0
      go.mod
  6. 4 0
      go.sum

+ 1 - 0
.travis.yml

@@ -32,6 +32,7 @@ script:
 - if [[ "$TRAVIS_GO_VERSION" == 1.12* ]]; then make fmt; fi
 - if [[ "$TRAVIS_GO_VERSION" == 1.12* ]]; then make fmt; fi
 
 
 after_success:
 after_success:
+- go tool cover -func coverage.txt
 - bash <(curl -s https://codecov.io/bash)
 - bash <(curl -s https://codecov.io/bash)
 
 
 after_script: vagrant/halt_cluster.sh
 after_script: vagrant/halt_cluster.sh

+ 3 - 3
Makefile

@@ -5,11 +5,11 @@ default: fmt vet errcheck test lint
 # Taken from https://github.com/codecov/example-go#caveat-multiple-files
 # Taken from https://github.com/codecov/example-go#caveat-multiple-files
 .PHONY: test
 .PHONY: test
 test:
 test:
-	echo "" > coverage.txt
+	echo "mode: atomic" > coverage.txt
 	for d in `go list ./...`; do \
 	for d in `go list ./...`; do \
-		go test -p 1 -v -timeout 240s -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
+		go test -p 1 -v -timeout 6m -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
 		if [ -f profile.out ]; then \
 		if [ -f profile.out ]; then \
-			cat profile.out >> coverage.txt; \
+			tail +2 profile.out >> coverage.txt; \
 			rm profile.out; \
 			rm profile.out; \
 		fi \
 		fi \
 	done
 	done

+ 18 - 6
async_producer.go

@@ -520,7 +520,6 @@ func (pp *partitionProducer) dispatch() {
 	}()
 	}()
 
 
 	for msg := range pp.input {
 	for msg := range pp.input {
-
 		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
 		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
 			select {
 			select {
 			case <-pp.brokerProducer.abandoned:
 			case <-pp.brokerProducer.abandoned:
@@ -652,6 +651,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
 		input:          input,
 		input:          input,
 		output:         bridge,
 		output:         bridge,
 		responses:      responses,
 		responses:      responses,
+		stopchan:       make(chan struct{}),
 		buffer:         newProduceSet(p),
 		buffer:         newProduceSet(p),
 		currentRetries: make(map[string]map[int32]error),
 		currentRetries: make(map[string]map[int32]error),
 	}
 	}
@@ -696,6 +696,7 @@ type brokerProducer struct {
 	output    chan<- *produceSet
 	output    chan<- *produceSet
 	responses <-chan *brokerProducerResponse
 	responses <-chan *brokerProducerResponse
 	abandoned chan struct{}
 	abandoned chan struct{}
+	stopchan  chan struct{}
 
 
 	buffer     *produceSet
 	buffer     *produceSet
 	timer      <-chan time.Time
 	timer      <-chan time.Time
@@ -711,12 +712,17 @@ func (bp *brokerProducer) run() {
 
 
 	for {
 	for {
 		select {
 		select {
-		case msg := <-bp.input:
-			if msg == nil {
+		case msg, ok := <-bp.input:
+			if !ok {
+				Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
 				bp.shutdown()
 				bp.shutdown()
 				return
 				return
 			}
 			}
 
 
+			if msg == nil {
+				continue
+			}
+
 			if msg.flags&syn == syn {
 			if msg.flags&syn == syn {
 				Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
 				Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
 					bp.broker.ID(), msg.Topic, msg.Partition)
 					bp.broker.ID(), msg.Topic, msg.Partition)
@@ -760,8 +766,14 @@ func (bp *brokerProducer) run() {
 			bp.timerFired = true
 			bp.timerFired = true
 		case output <- bp.buffer:
 		case output <- bp.buffer:
 			bp.rollOver()
 			bp.rollOver()
-		case response := <-bp.responses:
-			bp.handleResponse(response)
+		case response, ok := <-bp.responses:
+			if ok {
+				bp.handleResponse(response)
+			}
+		case <-bp.stopchan:
+			Logger.Printf(
+				"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
+			return
 		}
 		}
 
 
 		if bp.timerFired || bp.buffer.readyToFlush() {
 		if bp.timerFired || bp.buffer.readyToFlush() {
@@ -785,7 +797,7 @@ func (bp *brokerProducer) shutdown() {
 	for response := range bp.responses {
 	for response := range bp.responses {
 		bp.handleResponse(response)
 		bp.handleResponse(response)
 	}
 	}
-
+	close(bp.stopchan)
 	Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
 	Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
 }
 }
 
 

+ 51 - 1
async_producer_test.go

@@ -9,6 +9,9 @@ import (
 	"sync/atomic"
 	"sync/atomic"
 	"testing"
 	"testing"
 	"time"
 	"time"
+
+	"github.com/fortytw2/leaktest"
+	metrics "github.com/rcrowley/go-metrics"
 )
 )
 
 
 const TestMessage = "ABC THE MESSAGE"
 const TestMessage = "ABC THE MESSAGE"
@@ -308,6 +311,22 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 	closeProducer(t, producer)
 }
 }
 
 
+type testLogger struct {
+	t *testing.T
+}
+
+func (l *testLogger) Print(v ...interface{}) {
+	l.t.Log(v...)
+}
+
+func (l *testLogger) Printf(format string, v ...interface{}) {
+	l.t.Logf(format, v...)
+}
+
+func (l *testLogger) Println(v ...interface{}) {
+	l.t.Log(v...)
+}
+
 func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 
 
 	tt := func(t *testing.T, kErr KError) {
 	tt := func(t *testing.T, kErr KError) {
@@ -331,7 +350,6 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 		if err != nil {
 		if err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-		seedBroker.Close()
 
 
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
@@ -356,6 +374,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 		leader2.Returns(prodSuccess)
 		leader2.Returns(prodSuccess)
 		expectResults(t, producer, 2, 0)
 		expectResults(t, producer, 2, 0)
 
 
+		seedBroker.Close()
 		leader1.Close()
 		leader1.Close()
 		leader2.Close()
 		leader2.Close()
 		closeProducer(t, producer)
 		closeProducer(t, producer)
@@ -1128,6 +1147,37 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
 	closeProducer(t, producer)
 	closeProducer(t, producer)
 }
 }
 
 
+// TestBrokerProducerShutdown ensures that a call to shutdown stops the
+// brokerProducer run() loop and doesn't leak any goroutines
+func TestBrokerProducerShutdown(t *testing.T) {
+	defer leaktest.Check(t)()
+	metrics.UseNilMetrics = true // disable Sarama's go-metrics library
+	defer func() {
+		metrics.UseNilMetrics = false
+	}()
+
+	mockBroker := NewMockBroker(t, 1)
+	metadataResponse := &MetadataResponse{}
+	metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID())
+	metadataResponse.AddTopicPartition(
+		"my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError)
+	mockBroker.Returns(metadataResponse)
+
+	producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	broker := &Broker{
+		addr: mockBroker.Addr(),
+		id:   mockBroker.BrokerID(),
+	}
+	bp := producer.(*asyncProducer).newBrokerProducer(broker)
+
+	bp.shutdown()
+	_ = producer.Close()
+	mockBroker.Close()
+}
+
 // This example shows how to use the producer while simultaneously
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 // reading the Errors channel to know about any failures.
 func ExampleAsyncProducer_select() {
 func ExampleAsyncProducer_select() {

+ 2 - 0
go.mod

@@ -7,6 +7,7 @@ require (
 	github.com/eapache/go-resiliency v1.1.0
 	github.com/eapache/go-resiliency v1.1.0
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
 	github.com/eapache/queue v1.1.0
 	github.com/eapache/queue v1.1.0
+	github.com/fortytw2/leaktest v1.3.0
 	github.com/golang/snappy v0.0.1 // indirect
 	github.com/golang/snappy v0.0.1 // indirect
 	github.com/hashicorp/go-uuid v1.0.1 // indirect
 	github.com/hashicorp/go-uuid v1.0.1 // indirect
 	github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
 	github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
@@ -19,6 +20,7 @@ require (
 	golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
 	golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
 	gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
 	gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
 	gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
 	gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
+	gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
 	gopkg.in/jcmturner/gokrb5.v7 v7.2.3
 	gopkg.in/jcmturner/gokrb5.v7 v7.2.3
 	gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
 	gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
 )
 )

+ 4 - 0
go.sum

@@ -11,6 +11,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
 github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
@@ -45,6 +47,8 @@ gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hr
 gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
 gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
 gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
 gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
 gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
 gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
+gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
+gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
 gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
 gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
 gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
 gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
 gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
 gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=