Browse Source

better test

Burke Libbey 12 năm trước cách đây
mục cha
commit
11f0f9f698
2 tập tin đã thay đổi với 40 bổ sung28 xóa
  1. 1 0
      .gitignore
  2. 39 28
      multiproducer_test.go

+ 1 - 0
.gitignore

@@ -2,6 +2,7 @@
 *.o
 *.a
 *.so
+*.test
 
 # Folders
 _obj

+ 39 - 28
multiproducer_test.go

@@ -64,20 +64,8 @@ func TestSimpleMultiProducer(t *testing.T) {
 		}
 	}
 
-	select {
-	case err = <-producer.Errors():
-		if err != nil {
-			t.Error(err)
-		}
-	case <-time.After(1 * time.Second):
-		t.Error(fmt.Errorf("Message was never received"))
-	}
-
-	select {
-	case <-producer.Errors():
-		t.Error(fmt.Errorf("too many values returned"))
-	default:
-	}
+	readMessage(t, producer.Errors())
+	assertNoMessages(t, producer.Errors())
 
 	// TODO: This doesn't really test that we ONLY flush once.
 	// For example, change the MaxBufferBytes to be much lower.
@@ -94,12 +82,10 @@ func TestMultipleMultiProducer(t *testing.T) {
 	defer mockBrokerA.Close()
 	defer mockBrokerB.Close()
 
-	// TODO: remove this.
-	time.Sleep(10 * time.Millisecond)
-
 	// We're going to return:
 	// topic: topic_a; partition: 0; brokerID: 1
 	// topic: topic_b; partition: 0; brokerID: 2
+	// topic: topic_c; partition: 0; brokerID: 2
 
 	// Return the extra broker metadata so that the producer will send
 	// requests to mockBrokerA and mockBrokerB.
@@ -114,7 +100,7 @@ func TestMultipleMultiProducer(t *testing.T) {
 		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
 		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
 
-		0x00, 0x00, 0x00, 0x02, // number of topic metadata records
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
 
 		0x00, 0x00, // error: 0 means no error
 		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
@@ -133,6 +119,16 @@ func TestMultipleMultiProducer(t *testing.T) {
 		0x00, 0x00, 0x00, 0x02, // broker ID of leader
 		0x00, 0x00, 0x00, 0x00, // replica set
 		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
 	}
 	binary.BigEndian.PutUint32(response[19:], uint32(mockBrokerA.Port()))
 	binary.BigEndian.PutUint32(response[38:], uint32(mockBrokerB.Port()))
@@ -153,12 +149,19 @@ func TestMultipleMultiProducer(t *testing.T) {
 
 	go func() {
 		msg := []byte{
-			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
+			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
 			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
 			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
 			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
 			0x00, 0x00, // 21:22 error: 0 means no error
 			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: 0 means no error
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
 		}
 		binary.BigEndian.PutUint64(msg[23:], 0)
 		responsesB <- msg
@@ -189,35 +192,43 @@ func TestMultipleMultiProducer(t *testing.T) {
 		}
 	}
 
-	for i := 0; i < 10; i++ {
+	for i := 0; i < 5; i++ {
 		err = producer.SendMessage("topic_b", nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
 			t.Error(err)
 		}
 	}
 
-	select {
-	case err = <-producer.Errors():
+	for i := 0; i < 5; i++ {
+		err = producer.SendMessage("topic_c", nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
 			t.Error(err)
 		}
-	case <-time.After(1 * time.Second):
-		t.Error(fmt.Errorf("Message was never received"))
 	}
 
+	// read three messages for topics A, B, and C. Assert they are nil.
+	readMessage(t, producer.Errors())
+	readMessage(t, producer.Errors())
+	readMessage(t, producer.Errors())
+
+	assertNoMessages(t, producer.Errors())
+}
+
+func readMessage(t *testing.T, ch chan error) {
 	select {
-	case err = <-producer.Errors():
+	case err := <-ch:
 		if err != nil {
 			t.Error(err)
 		}
 	case <-time.After(1 * time.Second):
 		t.Error(fmt.Errorf("Message was never received"))
 	}
+}
 
+func assertNoMessages(t *testing.T, ch chan error) {
 	select {
-	case <-producer.Errors():
+	case <-ch:
 		t.Error(fmt.Errorf("too many values returned"))
-	default:
+	case <-time.After(5 * time.Millisecond):
 	}
-
 }