Burke Libbey 12 gadi atpakaļ
vecāks
revīzija
e5d35d0be8
3 mainītis faili ar 63 papildinājumiem un 213 dzēšanām
  1. 1 1
      mockbroker/metadata_request_expectation.go
  2. 2 0
      producer.go
  3. 60 212
      producer_test.go

+ 1 - 1
mockbroker/metadata_request_expectation.go

@@ -60,7 +60,7 @@ func (e *MetadataRequestExpectation) ResponseBytes() []byte {
 		// we don't support mocking errors at topic-level; only partition-level
 		binary.Write(buf, binary.BigEndian, uint16(0))
 		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
-		buf.Write([]byte(topic)) // TODO: Does this write a null?
+		buf.Write([]byte(topic))
 		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
 		for _, tp := range tps {
 			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: Write the error code instead

+ 2 - 0
producer.go

@@ -236,7 +236,9 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		for {
 			select {
 			case <-bp.flushNow:
+				println("{{")
 				bp.flush(p)
+				println("}}")
 			case <-timer.C:
 				bp.flushIfAnyMessages(p)
 			case <-bp.stopper:

+ 60 - 212
producer_test.go

@@ -77,47 +77,23 @@ func TestSimpleSyncProducer(t *testing.T) {
 	}
 }
 
-/*
 func TestMultipleFlushes(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00,
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
-		}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		extraResponses <- msg
-		extraResponses <- msg
-	}()
+	t.Fatal("pending")
+
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	mb2.ExpectProduceRequest().
+		AddTopicPartition("my_topic", 0, 1, nil).
+		AddTopicPartition("my_topic", 0, 1, nil)
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -137,103 +113,30 @@ func TestMultipleFlushes(t *testing.T) {
 }
 
 func TestMultipleProducer(t *testing.T) {
+	t.Fatal("pending")
 
-	responses := make(chan []byte, 1)
-	responsesA := make(chan []byte)
-	responsesB := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	mockBrokerA := mockbroker.New(t, responsesA)
-	mockBrokerB := mockbroker.New(t, responsesB)
-	defer mockBroker.Close()
-	defer mockBrokerA.Close()
-	defer mockBrokerB.Close()
-
-	// We're going to return:
-	// topic: topic_a; partition: 0; brokerID: 1
-	// topic: topic_b; partition: 0; brokerID: 2
-	// topic: topic_c; partition: 0; brokerID: 2
-
-	// Return the extra broker metadata so that the producer will send
-	// requests to mockBrokerA and mockBrokerB.
-	response := []byte{
-		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
-
-		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
-
-		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
-
-		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	mb3 := mockbroker.New(t, 3)
+	defer mb1.Close()
+	defer mb2.Close()
+	defer mb3.Close()
 
-	}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockBrokerA.Port()))
-	binary.BigEndian.PutUint32(response[38:], uint32(mockBrokerB.Port()))
-	responses <- response
-
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-		}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		responsesA <- msg
-	}()
-
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-		}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		responsesB <- msg
-	}()
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddBroker(mb3).
+		AddTopicPartition("topic_a", 0, 1).
+		AddTopicPartition("topic_b", 0, 2).
+		AddTopicPartition("topic_c", 0, 2)
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb2.ExpectProduceRequest().
+		AddTopicPartition("topic_a", 0, 1, nil)
+
+	mb3.ExpectProduceRequest().
+		AddTopicPartition("topic_b", 0, 1, nil).
+		AddTopicPartition("topic_c", 0, 1, nil)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -270,86 +173,32 @@ func TestMultipleProducer(t *testing.T) {
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
 func TestFailureRetry(t *testing.T) {
-	responses := make(chan []byte, 1)
-	responsesA := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	mockBrokerA := mockbroker.New(t, responsesA)
-	defer mockBroker.Close()
-	defer mockBrokerA.Close()
-
-	// We're going to return:
-	// topic: topic_a; partition: 0; brokerID: 1
-	// topic: topic_b; partition: 0; brokerID: 2
-	// topic: topic_c; partition: 0; brokerID: 2
-
-	// Return the extra broker metadata so that the producer will send
-	// requests to mockBrokerA and mockBrokerB.
-	metadataResponse := []byte{
-		0x00, 0x00, 0x00, 0x01, // 0:3 number of brokers
-
-		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
-
-		0x00, 0x00, 0x00, 0x02, // number of topic metadata records
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
+	t.Fatal("pending")
 
-	}
-	binary.BigEndian.PutUint32(metadataResponse[19:], uint32(mockBrokerA.Port()))
-	responses <- metadataResponse
-
-	go func() {
-		responses <- metadataResponse
-		responses <- metadataResponse
-	}()
-
-	go func() {
-		responsesA <- []byte{
-			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x03, // 21:22 error: UnknownTopicOrPartition
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
-		}
-		responsesA <- metadataResponse
-		successResponse := []byte{
-			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: no error
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
-		}
-		_ = successResponse
-	}()
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("topic_a", 0, 1).
+		AddTopicPartition("topic_b", 0, 2).
+		AddTopicPartition("topic_c", 0, 2)
+
+	mb2.ExpectProduceRequest().
+		AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition).
+		AddTopicPartition("topic_c", 0, 1, nil)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("topic_b", 0, 1)
+
+	mb1.ExpectProduceRequest().
+		AddTopicPartition("topic_a", 0, 1, nil).
+		AddTopicPartition("topic_b", 0, 1, nil)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -385,7 +234,6 @@ func TestFailureRetry(t *testing.T) {
 	// message (which previously failed). This forces a flush.
 
 }
-*/
 
 func readMessage(t *testing.T, ch chan error) {
 	select {