Parcourir la source

Merge pull request #189 from Shopify/fix-mockbroker-race

Fix test raciness
Willem van Bergen il y a 11 ans
Parent
commit
69691f45f2
2 fichiers modifiés avec 10 ajouts et 17 suppressions
  1. 3 11
      mockbroker.go
  2. 7 6
      producer_test.go

+ 3 - 11
mockbroker.go

@@ -12,6 +12,7 @@ import (
 type TestState interface {
 	Error(args ...interface{})
 	Fatal(args ...interface{})
+	Errorf(format string, args ...interface{})
 	Fatalf(format string, args ...interface{})
 }
 
@@ -32,7 +33,6 @@ type MockBroker struct {
 	expectations chan encoder
 	listener     net.Listener
 	t            TestState
-	expecting    encoder
 }
 
 func (b *MockBroker) BrokerID() int32 {
@@ -47,15 +47,9 @@ func (b *MockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
-type rawExpectation []byte
-
-func (r rawExpectation) ResponseBytes() []byte {
-	return r
-}
-
 func (b *MockBroker) Close() {
-	if b.expecting != nil {
-		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %#v", b.BrokerID(), b.expecting)
+	if len(b.expectations) > 0 {
+		b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
 	}
 	close(b.expectations)
 	<-b.stopper
@@ -74,9 +68,7 @@ func (b *MockBroker) serverLoop() (ok bool) {
 	reqHeader := make([]byte, 4)
 	resHeader := make([]byte, 8)
 	for expectation := range b.expectations {
-		b.expecting = expectation
 		_, err = io.ReadFull(conn, reqHeader)
-		b.expecting = nil
 		if err != nil {
 			return b.serverError(err, conn)
 		}

+ 7 - 6
producer_test.go

@@ -194,8 +194,6 @@ func TestProducerFailureRetry(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)
 	broker3 := NewMockBroker(t, 3)
-	defer broker1.Close()
-	defer broker3.Close()
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
@@ -206,7 +204,6 @@ func TestProducerFailureRetry(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
 	config := NewProducerConfig()
 	config.FlushMsgCount = 10
@@ -215,7 +212,7 @@ func TestProducerFailureRetry(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, producer)
+	broker1.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
@@ -223,12 +220,11 @@ func TestProducerFailureRetry(t *testing.T) {
 	response2 := new(ProduceResponse)
 	response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
 	broker2.Returns(response2)
-	broker2.Close()
 
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
 	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
-	broker1.Returns(response3)
+	broker2.Returns(response3)
 
 	response4 := new(ProduceResponse)
 	response4.AddTopicPartition("my_topic", 0, NoError)
@@ -240,6 +236,7 @@ func TestProducerFailureRetry(t *testing.T) {
 		case <-producer.Successes():
 		}
 	}
+	broker2.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
@@ -252,6 +249,10 @@ func TestProducerFailureRetry(t *testing.T) {
 		case <-producer.Successes():
 		}
 	}
+
+	broker3.Close()
+	safeClose(t, producer)
+	safeClose(t, client)
 }
 
 func ExampleProducer() {