浏览代码

Test multiple flushes

Burke Libbey 12 年之前
父节点
当前提交
c1c656e90c
共有 2 个文件被更改,包括 96 次插入44 次删除
  1. 12 7
      producer.go
  2. 84 37
      producer_test.go

+ 12 - 7
producer.go

@@ -235,7 +235,6 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32,
 
 	bp.mapM.Unlock()
 	if bp.bufferedBytes > maxBufferBytes {
-		// TODO: decrement this later on
 		bp.tryFlush()
 	}
 }
@@ -292,6 +291,14 @@ func (bp *brokerProducer) Close() error {
 func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, messages []*produceMessage) {
 	response, err := bp.broker.Produce(p.client.id, request)
 
+	size := 0
+	for _, m := range messages {
+		size += len(m.key) + len(m.value)
+	}
+	bp.mapM.Lock()
+	bp.bufferedBytes -= uint32(size)
+	bp.mapM.Unlock()
+
 	switch err {
 	case nil:
 		break
@@ -302,8 +309,6 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 		p.errors <- err
 		goto releaseAllLocks
 	default:
-		// TODO: Now we have to sift through the messages and determine which should be retried.
-
 		p.client.disconnectBroker(bp.broker)
 		bp.Close()
 
@@ -335,7 +340,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 			if block == nil {
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
-				// Log angrily.
+				// TODO Log angrily.
 			}
 			switch block.Err {
 			case NoError:
@@ -343,7 +348,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 				// Unlock delivery for this topic-partition and discard the produceMessage objects.
 				p.errors <- nil
 			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-				// TODO: should we refresh metadata for this topic?
+				p.client.RefreshTopicMetadata(topic)
 
 				// ie. for msg := range reverse(messages)
 				for i := len(messages) - 1; i >= 0; i-- {
@@ -356,12 +361,12 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 							// to preserve ordering, we have to prepend the items starting from the last one.
 							p.addMessage(msg, true)
 						} else {
-							// dropping message; log angrily maybe.
+							// TODO dropping message; log angrily maybe.
 						}
 					}
 				}
 			default:
-				// non-retriable error. Drop the messages and log angrily.
+				// TODO non-retriable error. Drop the messages and log angrily.
 			}
 			p.releaseDeliveryLock(topic, partition)
 		}

+ 84 - 37
producer_test.go

@@ -57,18 +57,80 @@ func TestSimpleProducer(t *testing.T) {
 	})
 	defer producer.Close()
 
-	for i := 0; i < 10; i++ {
-		err = producer.SendMessage("my_topic", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
+	// flush only on 10th and final message
+	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "my_topic", "ABC THE MESSAGE", f)
 	}
+}
 
-	readMessage(t, producer.Errors())
+func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
+	err := producer.SendMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	for i := 0; i < expectedResponses; i++ {
+		readMessage(t, producer.Errors())
+	}
 	assertNoMessages(t, producer.Errors())
+}
+
+func TestMultipleFlushes(t *testing.T) {
+	responses := make(chan []byte, 1)
+	extraResponses := make(chan []byte)
+	mockBroker := NewMockBroker(t, responses)
+	mockExtra := NewMockBroker(t, extraResponses)
+	defer mockBroker.Close()
+	defer mockExtra.Close()
+
+	// return the extra mock as another available broker
+	response := []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00}
+	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
+	responses <- response
+	go func() {
+		msg := []byte{
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x00, 0x00, 0x00,
+			0x00, 0x00,
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
+		}
+		binary.BigEndian.PutUint64(msg[23:], 0)
+		extraResponses <- msg
+		extraResponses <- msg
+	}()
 
-	// TODO: This doesn't really test that we ONLY flush once.
-	// For example, change the MaxBufferBytes to be much lower.
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 5th message.
+		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 5) - 1),
+	})
+	defer producer.Close()
+
+	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "my_topic", "ABC THE MESSAGE", f)
+	}
 }
 
 func TestMultipleProducer(t *testing.T) {
@@ -167,11 +229,6 @@ func TestMultipleProducer(t *testing.T) {
 		responsesB <- msg
 	}()
 
-	// TODO: Submit events to 3 different topics on 2 different brokers.
-	// Need to figure out how the request format works to return the broker
-	// info for those two new brokers, and how to return multiple blocks in
-	// a ProduceRespose
-
 	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
@@ -185,33 +242,23 @@ func TestMultipleProducer(t *testing.T) {
 	})
 	defer producer.Close()
 
-	for i := 0; i < 10; i++ {
-		err = producer.SendMessage("topic_a", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
+	// flush only on 10th and final message
+	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_a", "ABC THE MESSAGE", f)
 	}
 
-	for i := 0; i < 5; i++ {
-		err = producer.SendMessage("topic_b", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
+	// no flushes
+	returns = []int{0, 0, 0, 0, 0}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_b", "ABC THE MESSAGE", f)
 	}
 
-	for i := 0; i < 5; i++ {
-		err = producer.SendMessage("topic_c", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
+	// flush both topic_b and topic_c on 5th (ie. 10th for this broker)
+	returns = []int{0, 0, 0, 0, 2}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_c", "ABC THE MESSAGE", f)
 	}
-
-	// read three messages for topics A, B, and C. Assert they are nil.
-	readMessage(t, producer.Errors())
-	readMessage(t, producer.Errors())
-	readMessage(t, producer.Errors())
-
-	assertNoMessages(t, producer.Errors())
 }
 
 func readMessage(t *testing.T, ch chan error) {
@@ -227,8 +274,8 @@ func readMessage(t *testing.T, ch chan error) {
 
 func assertNoMessages(t *testing.T, ch chan error) {
 	select {
-	case <-ch:
-		t.Error(fmt.Errorf("too many values returned"))
+	case x := <-ch:
+		t.Error(fmt.Errorf("unexpected value received: %#v", x))
 	case <-time.After(5 * time.Millisecond):
 	}
 }