Prechádzať zdrojové kódy

Fix test raciness

Running the tests with -race indicated a race condition in the mockbroker around
reading/writing b.expecting. Get rid of that value entirely and just use the len
of the channel, which is simpler and less racy.

With that fixed, also fix the hang revealed in TestProducerFailureRetry due to a
mis-ordering and deferal of Close calls. We'd been getting away with it only
because of the race in the mockbroker.

Once this is merged we can do another PR to have CI run with -race.
Evan Huus 10 rokov pred
rodič
commit
f77e4b63b8
2 zmenil súbory, kde vykonal 10 pridanie a 17 odobranie
  1. 3 11
      mockbroker.go
  2. 7 6
      producer_test.go

+ 3 - 11
mockbroker.go

@@ -12,6 +12,7 @@ import (
 type TestState interface {
 	Error(args ...interface{})
 	Fatal(args ...interface{})
+	Errorf(format string, args ...interface{})
 	Fatalf(format string, args ...interface{})
 }
 
@@ -32,7 +33,6 @@ type MockBroker struct {
 	expectations chan encoder
 	listener     net.Listener
 	t            TestState
-	expecting    encoder
 }
 
 func (b *MockBroker) BrokerID() int32 {
@@ -47,15 +47,9 @@ func (b *MockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
-type rawExpectation []byte
-
-func (r rawExpectation) ResponseBytes() []byte {
-	return r
-}
-
 func (b *MockBroker) Close() {
-	if b.expecting != nil {
-		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %#v", b.BrokerID(), b.expecting)
+	if len(b.expectations) > 0 {
+		b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
 	}
 	close(b.expectations)
 	<-b.stopper
@@ -74,9 +68,7 @@ func (b *MockBroker) serverLoop() (ok bool) {
 	reqHeader := make([]byte, 4)
 	resHeader := make([]byte, 8)
 	for expectation := range b.expectations {
-		b.expecting = expectation
 		_, err = io.ReadFull(conn, reqHeader)
-		b.expecting = nil
 		if err != nil {
 			return b.serverError(err, conn)
 		}

+ 7 - 6
producer_test.go

@@ -194,8 +194,6 @@ func TestProducerFailureRetry(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)
 	broker3 := NewMockBroker(t, 3)
-	defer broker1.Close()
-	defer broker3.Close()
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
@@ -206,7 +204,6 @@ func TestProducerFailureRetry(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
 	config := NewProducerConfig()
 	config.FlushMsgCount = 10
@@ -215,7 +212,7 @@ func TestProducerFailureRetry(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, producer)
+	broker1.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
@@ -223,12 +220,11 @@ func TestProducerFailureRetry(t *testing.T) {
 	response2 := new(ProduceResponse)
 	response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
 	broker2.Returns(response2)
-	broker2.Close()
 
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
 	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
-	broker1.Returns(response3)
+	broker2.Returns(response3)
 
 	response4 := new(ProduceResponse)
 	response4.AddTopicPartition("my_topic", 0, NoError)
@@ -240,6 +236,7 @@ func TestProducerFailureRetry(t *testing.T) {
 		case <-producer.Successes():
 		}
 	}
+	broker2.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
@@ -252,6 +249,10 @@ func TestProducerFailureRetry(t *testing.T) {
 		case <-producer.Successes():
 		}
 	}
+
+	broker3.Close()
+	safeClose(t, producer)
+	safeClose(t, client)
 }
 
 func ExampleProducer() {