Browse Source

Got last test passing.

Burke Libbey 12 years ago
parent
commit
74d42616ce
4 changed files with 63 additions and 40 deletions
  1. 4 3
      fetch_request_expectation.go
  2. 1 0
      mockbroker.go
  3. 2 2
      produce_request_expectation_test.go
  4. 56 35
      producer_test.go

+ 4 - 3
fetch_request_expectation.go

@@ -10,19 +10,20 @@ type FetchRequestExpectation struct {
 	messages []fetchResponseMessage
 }
 
-type encoder interface {
+// this is why single-namespace projects are literally retarded.
+type encoder2 interface {
 	Encode() ([]byte, error)
 }
 
 type fetchResponseMessage struct {
 	topic      string
 	partition  int32
-	key, value encoder
+	key, value encoder2
 	offset     uint64
 }
 
 func (e *FetchRequestExpectation) AddMessage(
-	topic string, partition int32, key, value encoder, offset uint64,
+	topic string, partition int32, key, value encoder2, offset uint64,
 ) *FetchRequestExpectation {
 	e.messages = append(e.messages, fetchResponseMessage{
 		topic:     topic,

+ 1 - 0
mockbroker.go

@@ -89,6 +89,7 @@ func (b *MockBroker) serverLoop() (ok bool) {
 		if _, err = io.ReadFull(conn, body); err != nil {
 			return b.serverError(err, conn)
 		}
+
 		response := expectation.ResponseBytes()
 		if len(response) == 0 {
 			continue

+ 2 - 2
produce_request_expectation_test.go

@@ -8,8 +8,8 @@ import (
 func TestProduceRequestSerialization(t *testing.T) {
 
 	exp := new(ProduceRequestExpectation).
-		AddTopicPartition("topic_b", 0, 1, nil).
-		AddTopicPartition("topic_c", 0, 1, nil)
+		AddTopicPartition("topic_b", 0, 1, NoError).
+		AddTopicPartition("topic_c", 0, 1, NoError)
 
 	expected := []byte{
 		0x00, 0x00, 0x00, 0x02, // 0:3 number of topics

+ 56 - 35
producer_test.go

@@ -20,7 +20,7 @@ func TestSimpleProducer(t *testing.T) {
 		AddTopicPartition("my_topic", 0, 2)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, nil)
+		AddTopicPartition("my_topic", 0, 1, NoError)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -55,7 +55,7 @@ func TestSimpleSyncProducer(t *testing.T) {
 
 	for i := 0; i < 10; i++ {
 		mb2.ExpectProduceRequest().
-			AddTopicPartition("my_topic", 1, 10, nil)
+			AddTopicPartition("my_topic", 1, 10, NoError)
 	}
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -88,12 +88,12 @@ func TestMultipleFlushes(t *testing.T) {
 		AddTopicPartition("my_topic", 0, 2)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, nil).
-		AddTopicPartition("my_topic", 0, 1, nil)
+		AddTopicPartition("my_topic", 0, 1, NoError).
+		AddTopicPartition("my_topic", 0, 1, NoError)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, nil).
-		AddTopicPartition("my_topic", 0, 1, nil)
+		AddTopicPartition("my_topic", 0, 1, NoError).
+		AddTopicPartition("my_topic", 0, 1, NoError)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -131,11 +131,11 @@ func TestMultipleProducer(t *testing.T) {
 		AddTopicPartition("topic_c", 0, 3)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("topic_a", 0, 1, nil)
+		AddTopicPartition("topic_a", 0, 1, NoError)
 
 	mb3.ExpectProduceRequest().
-		AddTopicPartition("topic_b", 0, 1, nil).
-		AddTopicPartition("topic_c", 0, 1, nil)
+		AddTopicPartition("topic_b", 0, 1, NoError).
+		AddTopicPartition("topic_c", 0, 1, NoError)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -174,28 +174,33 @@ func TestMultipleProducer(t *testing.T) {
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
 func TestFailureRetry(t *testing.T) {
+	println("=============================================")
 
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
-	defer mb1.Close()
-	defer mb2.Close()
+	mb3 := NewMockBroker(t, 3)
 
 	mb1.ExpectMetadataRequest().
 		AddBroker(mb2).
-		AddTopicPartition("topic_a", 0, 1).
+		AddBroker(mb3).
+		AddTopicPartition("topic_a", 0, 2).
 		AddTopicPartition("topic_b", 0, 2).
-		AddTopicPartition("topic_c", 0, 2)
+		AddTopicPartition("topic_c", 0, 3)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition).
-		AddTopicPartition("topic_c", 0, 1, NoError)
+		AddTopicPartition("topic_a", 0, 1, NoError).
+		AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("topic_b", 0, 1)
+	// The fact that mb2 is chosen here is not well-defined. In theory,
+	// it's a random choice between mb1, mb2, and mb3. Go's hash iteration
+	// isn't quite as random as claimed, though, it seems. Maybe because
+	// the same random seed is used each time?
+	mb2.ExpectMetadataRequest().
+		AddBroker(mb3).
+		AddTopicPartition("topic_b", 0, 3)
 
-	mb1.ExpectProduceRequest().
-		AddTopicPartition("topic_a", 0, 1, NoError).
+	mb3.ExpectProduceRequest().
+		AddTopicPartition("topic_c", 0, 1, NoError).
 		AddTopicPartition("topic_b", 0, 1, NoError)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -216,23 +221,39 @@ func TestFailureRetry(t *testing.T) {
 	}
 	defer producer.Close()
 
-	// Sent to the first BP; does not flush because it's only half the cap.
-	println("WTF1")
+	// Sent to mb3; does not flush because it's only half the cap.
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [__]
+	sendMessage(t, producer, "topic_c", TestMessage, 0)
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [X_]
+
+	// Sent to mb2; does not flush because it's only half the cap.
 	sendMessage(t, producer, "topic_a", TestMessage, 0)
-	// Sent to the first BP; flushes, errors (retriable).
-	// There's a delay, during which the next message is enqueued to the first BP,
-	// after which the BP is closed and the message is re-enqueued to the second
-	// BP. This BP is not flushed immediately because it is only at half-cap.
-	println("WTF2")
-	sendMessage(t, producer, "topic_b", TestMessage, 1)
-	// This happens before the BP is terminated, and the message is enqueued to
-	// the first BP. It is not immediately flushed, because it is at half-cap.
-	println("WTF")
-	sendMessage(t, producer, "topic_b", TestMessage, 1)
-
-	// Now the Close() runs on the first BP. The BP has buffered the second
-	// message (which previously failed). This forces a flush.
+	// mb1: [__]
+	// mb2: [X_]
+	// mb3: [X_]
+
+	// Sent to mb2; flushes, errors (retriable).
+	// Three messages will be received:
+	//   * NoError for topic_a;
+	//   * NoError for topic_b;
+	//   * NoError for topic_c.
+	sendMessage(t, producer, "topic_b", TestMessage, 3)
+	// mb1: [__]
+	// mb2: [XX] <- flush!
+	// mb3: [X_]
+
+	// The topic_b message errors, and we should wind up here:
+
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [XX] <- topic_b reassigned to mb3 by metadata refresh, flushes.
 
+	defer mb1.Close()
+	defer mb2.Close()
 }
 
 func readMessage(t *testing.T, ch chan error) {