Jelajahi Sumber

Flatten nested ifs with DeMorgan.

Antoine Grondin 12 tahun lalu
induk
melakukan
b08596183c
7 mengubah file dengan 189 tambahan dan 170 penghapusan
  1. 10 10
      client.go
  2. 11 11
      consumer.go
  3. 38 39
      fetch_response_test.go
  4. 67 57
      metadata_response_test.go
  5. 16 13
      offset_commit_response_test.go
  6. 23 19
      offset_fetch_response_test.go
  7. 24 21
      offset_response_test.go

+ 10 - 10
client.go

@@ -34,7 +34,7 @@ type Client struct {
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
 // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
 // be retrieved from any of the given broker addresses, the client is not created.
-func NewClient(id string, addrs []string, config *ClientConfig) (client *Client, err error) {
+func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) {
 	if config == nil {
 		config = new(ClientConfig)
 	}
@@ -47,18 +47,18 @@ func NewClient(id string, addrs []string, config *ClientConfig) (client *Client,
 		return nil, ConfigurationError("You must provide at least one broker address")
 	}
 
-	client = new(Client)
-	client.id = id
-	client.config = *config
-	client.extraBrokerAddrs = addrs
-	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
+	client := &Client{
+		id:               id,
+		config:           *config,
+		extraBrokerAddrs: addrs,
+		extraBroker:      NewBroker(addrs[0]),
+		brokers:          make(map[int32]*Broker),
+		leaders:          make(map[string]map[int32]int32),
+	}
 	client.extraBroker.Open()
 
-	client.brokers = make(map[int32]*Broker)
-	client.leaders = make(map[string]map[int32]int32)
-
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = client.RefreshAllMetadata()
+	err := client.RefreshAllMetadata()
 	if err != nil {
 		client.Close() // this closes tmp, since it's still in the brokers hash
 		return nil, err

+ 11 - 11
consumer.go

@@ -101,13 +101,17 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, err
 	}
 
-	c := new(Consumer)
-	c.client = client
-	c.topic = topic
-	c.partition = partition
-	c.group = group
-	c.config = *config
-	c.broker = broker
+	c := &Consumer{
+		client:    client,
+		topic:     topic,
+		partition: partition,
+		group:     group,
+		config:    *config,
+		broker:    broker,
+		stopper:   make(chan bool),
+		done:      make(chan bool),
+		events:    make(chan *ConsumerEvent, config.EventBufferSize),
+	}
 
 	switch config.OffsetMethod {
 	case OffsetMethodManual:
@@ -129,10 +133,6 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid OffsetMethod")
 	}
 
-	c.stopper = make(chan bool)
-	c.done = make(chan bool)
-	c.events = make(chan *ConsumerEvent, config.EventBufferSize)
-
 	go c.fetchMessages()
 
 	return c, nil

+ 38 - 39
fetch_response_test.go

@@ -42,44 +42,43 @@ func TestOneMessageFetchResponse(t *testing.T) {
 	response := FetchResponse{}
 	testDecodable(t, "one message", &response, oneMessageFetchResponse)
 
-	if len(response.Blocks) == 1 {
-		if len(response.Blocks["topic"]) == 1 {
-			block := response.GetBlock("topic", 5)
-			if block != nil {
-				if block.Err != OffsetOutOfRange {
-					t.Error("Decoding didn't produce correct error code.")
-				}
-				if block.HighWaterMarkOffset != 0x10101010 {
-					t.Error("Decoding didn't produce correct high water mark offset.")
-				}
-				if block.MsgSet.PartialTrailingMessage {
-					t.Error("Decoding detected a partial trailing message where there wasn't one.")
-				}
-				if len(block.MsgSet.Messages) == 1 {
-					msgBlock := block.MsgSet.Messages[0]
-					if msgBlock.Offset != 0x550000 {
-						t.Error("Decoding produced incorrect message offset.")
-					}
-					msg := msgBlock.Msg
-					if msg.Codec != CompressionNone {
-						t.Error("Decoding produced incorrect message compression.")
-					}
-					if msg.Key != nil {
-						t.Error("Decoding produced message key where there was none.")
-					}
-					if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
-						t.Error("Decoding produced incorrect message value.")
-					}
-				} else {
-					t.Error("Decoding produced incorrect number of messages.")
-				}
-			} else {
-				t.Error("GetBlock didn't return block.")
-			}
-		} else {
-			t.Error("Decoding produced incorrect number of partition blocks for topic.")
-		}
-	} else {
-		t.Error("Decoding produced incorrect number of topic blocks.")
+	if len(response.Blocks) != 1 {
+		t.Fatal("Decoding produced incorrect number of topic blocks.")
+	}
+
+	if len(response.Blocks["topic"]) != 1 {
+		t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
+	}
+
+	block := response.GetBlock("topic", 5)
+	if block == nil {
+		t.Fatal("GetBlock didn't return block.")
+	}
+	if block.Err != OffsetOutOfRange {
+		t.Error("Decoding didn't produce correct error code.")
+	}
+	if block.HighWaterMarkOffset != 0x10101010 {
+		t.Error("Decoding didn't produce correct high water mark offset.")
+	}
+	if block.MsgSet.PartialTrailingMessage {
+		t.Error("Decoding detected a partial trailing message where there wasn't one.")
+	}
+
+	if len(block.MsgSet.Messages) != 1 {
+		t.Fatal("Decoding produced incorrect number of messages.")
+	}
+	msgBlock := block.MsgSet.Messages[0]
+	if msgBlock.Offset != 0x550000 {
+		t.Error("Decoding produced incorrect message offset.")
+	}
+	msg := msgBlock.Msg
+	if msg.Codec != CompressionNone {
+		t.Error("Decoding produced incorrect message compression.")
+	}
+	if msg.Key != nil {
+		t.Error("Decoding produced message key where there was none.")
+	}
+	if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
+		t.Error("Decoding produced incorrect message value.")
 	}
 }

+ 67 - 57
metadata_response_test.go

@@ -54,22 +54,23 @@ func TestMetadataResponseWithBrokers(t *testing.T) {
 	response := MetadataResponse{}
 
 	testDecodable(t, "brokers, no topics", &response, brokersNoTopicsMetadataResponse)
-	if len(response.Brokers) == 2 {
-		if response.Brokers[0].id != 0xabff {
-			t.Error("Decoding produced invalid broker 0 id.")
-		}
-		if response.Brokers[0].addr != "localhost:51" {
-			t.Error("Decoding produced invalid broker 0 address.")
-		}
-		if response.Brokers[1].id != 0x010203 {
-			t.Error("Decoding produced invalid broker 1 id.")
-		}
-		if response.Brokers[1].addr != "google.com:273" {
-			t.Error("Decoding produced invalid broker 1 address.")
-		}
-	} else {
-		t.Error("Decoding produced", len(response.Brokers), "brokers where there were two!")
+	if len(response.Brokers) != 2 {
+		t.Fatal("Decoding produced", len(response.Brokers), "brokers where there were two!")
+	}
+
+	if response.Brokers[0].id != 0xabff {
+		t.Error("Decoding produced invalid broker 0 id.")
+	}
+	if response.Brokers[0].addr != "localhost:51" {
+		t.Error("Decoding produced invalid broker 0 address.")
+	}
+	if response.Brokers[1].id != 0x010203 {
+		t.Error("Decoding produced invalid broker 1 id.")
 	}
+	if response.Brokers[1].addr != "google.com:273" {
+		t.Error("Decoding produced invalid broker 1 address.")
+	}
+
 	if len(response.Topics) != 0 {
 		t.Error("Decoding produced", len(response.Topics), "topics where there were none!")
 	}
@@ -82,48 +83,57 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 	if len(response.Brokers) != 0 {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
-	if len(response.Topics) == 2 {
-		if response.Topics[0].Err != NoError {
-			t.Error("Decoding produced invalid topic 0 error.")
-		}
-		if response.Topics[0].Name != "foo" {
-			t.Error("Decoding produced invalid topic 0 name.")
-		}
-		if len(response.Topics[0].Partitions) == 1 {
-			if response.Topics[0].Partitions[0].Err != InvalidMessageSize {
-				t.Error("Decoding produced invalid topic 0 partition 0 error.")
-			}
-			if response.Topics[0].Partitions[0].ID != 0x01 {
-				t.Error("Decoding produced invalid topic 0 partition 0 id.")
-			}
-			if response.Topics[0].Partitions[0].Leader != 0x07 {
-				t.Error("Decoding produced invalid topic 0 partition 0 leader.")
-			}
-			if len(response.Topics[0].Partitions[0].Replicas) == 3 {
-				for i := 0; i < 3; i++ {
-					if response.Topics[0].Partitions[0].Replicas[i] != int32(i+1) {
-						t.Error("Decoding produced invalid topic 0 partition 0 replica", i)
-					}
-				}
-			} else {
-				t.Error("Decoding produced invalid topic 0 partition 0 replicas.")
-			}
-			if len(response.Topics[0].Partitions[0].Isr) != 0 {
-				t.Error("Decoding produced invalid topic 0 partition 0 isr length.")
-			}
-		} else {
-			t.Error("Decoding produced invalid partition count for topic 0.")
-		}
-		if response.Topics[1].Err != NoError {
-			t.Error("Decoding produced invalid topic 1 error.")
-		}
-		if response.Topics[1].Name != "bar" {
-			t.Error("Decoding produced invalid topic 0 name.")
-		}
-		if len(response.Topics[1].Partitions) != 0 {
-			t.Error("Decoding produced invalid partition count for topic 1.")
+
+	if len(response.Topics) != 2 {
+		t.Fatal("Decoding produced", len(response.Topics), "topics where there were two!")
+	}
+
+	if response.Topics[0].Err != NoError {
+		t.Error("Decoding produced invalid topic 0 error.")
+	}
+
+	if response.Topics[0].Name != "foo" {
+		t.Error("Decoding produced invalid topic 0 name.")
+	}
+
+	if len(response.Topics[0].Partitions) != 1 {
+		t.Fatal("Decoding produced invalid partition count for topic 0.")
+	}
+
+	if response.Topics[0].Partitions[0].Err != InvalidMessageSize {
+		t.Error("Decoding produced invalid topic 0 partition 0 error.")
+	}
+
+	if response.Topics[0].Partitions[0].ID != 0x01 {
+		t.Error("Decoding produced invalid topic 0 partition 0 id.")
+	}
+
+	if response.Topics[0].Partitions[0].Leader != 0x07 {
+		t.Error("Decoding produced invalid topic 0 partition 0 leader.")
+	}
+
+	if len(response.Topics[0].Partitions[0].Replicas) != 3 {
+		t.Fatal("Decoding produced invalid topic 0 partition 0 replicas.")
+	}
+	for i := 0; i < 3; i++ {
+		if response.Topics[0].Partitions[0].Replicas[i] != int32(i+1) {
+			t.Error("Decoding produced invalid topic 0 partition 0 replica", i)
 		}
-	} else {
-		t.Error("Decoding produced", len(response.Topics), "topics where there were two!")
+	}
+
+	if len(response.Topics[0].Partitions[0].Isr) != 0 {
+		t.Error("Decoding produced invalid topic 0 partition 0 isr length.")
+	}
+
+	if response.Topics[1].Err != NoError {
+		t.Error("Decoding produced invalid topic 1 error.")
+	}
+
+	if response.Topics[1].Name != "bar" {
+		t.Error("Decoding produced invalid topic 0 name.")
+	}
+
+	if len(response.Topics[1].Partitions) != 0 {
+		t.Error("Decoding produced invalid partition count for topic 1.")
 	}
 }

+ 16 - 13
offset_commit_response_test.go

@@ -39,18 +39,21 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 	if response.ClientID != "az" {
 		t.Error("Decoding produced wrong client ID.")
 	}
-	if len(response.Errors) == 2 {
-		if len(response.Errors["m"]) != 0 {
-			t.Error("Decoding produced errors for topic 'm' where there were none.")
-		}
-		if len(response.Errors["t"]) == 1 {
-			if response.Errors["t"][0] != NotLeaderForPartition {
-				t.Error("Decoding produced wrong error for topic 't' partition 0.")
-			}
-		} else {
-			t.Error("Decoding produced wrong number of errors for topic 't'.")
-		}
-	} else {
-		t.Error("Decoding produced wrong number of errors.")
+
+	if len(response.Errors) != 2 {
+		t.Fatal("Decoding produced wrong number of errors.")
+	}
+
+	if len(response.Errors["m"]) != 0 {
+		t.Error("Decoding produced errors for topic 'm' where there were none.")
+	}
+
+	if len(response.Errors["t"]) != 1 {
+		t.Fatal("Decoding produced wrong number of errors for topic 't'.")
+	}
+
+	if response.Errors["t"][0] != NotLeaderForPartition {
+		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
+
 }

+ 23 - 19
offset_fetch_response_test.go

@@ -41,24 +41,28 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 	if response.ClientID != "za" {
 		t.Error("Decoding produced wrong client ID.")
 	}
-	if len(response.Blocks) == 2 {
-		if len(response.Blocks["m"]) != 0 {
-			t.Error("Decoding produced partitions for topic 'm' where there were none.")
-		}
-		if len(response.Blocks["t"]) == 1 {
-			if response.Blocks["t"][0].Offset != 0 {
-				t.Error("Decoding produced wrong offset for topic 't' partition 0.")
-			}
-			if response.Blocks["t"][0].Metadata != "md" {
-				t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
-			}
-			if response.Blocks["t"][0].Err != RequestTimedOut {
-				t.Error("Decoding produced wrong error for topic 't' partition 0.")
-			}
-		} else {
-			t.Error("Decoding produced wrong number of blocks for topic 't'.")
-		}
-	} else {
-		t.Error("Decoding produced wrong number of blocks.")
+
+	if len(response.Blocks) != 2 {
+		t.Fatal("Decoding produced wrong number of blocks.")
+	}
+
+	if len(response.Blocks["m"]) != 0 {
+		t.Error("Decoding produced partitions for topic 'm' where there were none.")
+	}
+
+	if len(response.Blocks["t"]) != 1 {
+		t.Fatal("Decoding produced wrong number of blocks for topic 't'.")
+	}
+
+	if response.Blocks["t"][0].Offset != 0 {
+		t.Error("Decoding produced wrong offset for topic 't' partition 0.")
+	}
+
+	if response.Blocks["t"][0].Metadata != "md" {
+		t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
+	}
+
+	if response.Blocks["t"][0].Err != RequestTimedOut {
+		t.Error("Decoding produced wrong error for topic 't' partition 0.")
 	}
 }

+ 24 - 21
offset_response_test.go

@@ -34,26 +34,29 @@ func TestNormalOffsetResponse(t *testing.T) {
 	response := OffsetResponse{}
 
 	testDecodable(t, "normal", &response, normalOffsetResponse)
-	if len(response.Blocks) == 2 {
-		if len(response.Blocks["a"]) != 0 {
-			t.Error("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
-		}
-
-		if len(response.Blocks["z"]) == 1 {
-			if response.Blocks["z"][2].Err != NoError {
-				t.Error("Decoding produced invalid error for topic z partition 2.")
-			}
-			if len(response.Blocks["z"][2].Offsets) == 2 {
-				if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
-					t.Error("Decoding produced invalid offsets for topic z partition 2.")
-				}
-			} else {
-				t.Error("Decoding produced invalid number of offsets for topic z partition 2.")
-			}
-		} else {
-			t.Error("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
-		}
-	} else {
-		t.Error("Decoding produced", len(response.Blocks), "topics where there were two.")
+
+	if len(response.Blocks) != 2 {
+		t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
+	}
+
+	if len(response.Blocks["a"]) != 0 {
+		t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
+	}
+
+	if len(response.Blocks["z"]) != 1 {
+		t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
+	}
+
+	if response.Blocks["z"][2].Err != NoError {
+		t.Fatal("Decoding produced invalid error for topic z partition 2.")
+	}
+
+	if len(response.Blocks["z"][2].Offsets) != 2 {
+		t.Fatal("Decoding produced invalid number of offsets for topic z partition 2.")
 	}
+
+	if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
+		t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
+	}
+
 }