Browse Source

Address code inspection comments

Maxim Vladimirskiy 7 years ago
parent
commit
0a875d8931
2 changed files with 84 additions and 50 deletions
  1. 7 14
      find_coordinator_response.go
  2. 77 36
      find_coordinator_response_test.go

+ 7 - 14
find_coordinator_response.go

@@ -4,6 +4,8 @@ import (
 	"time"
 )
 
+var NoNode = &Broker{id: -1, addr: ":-1"}
+
 type FindCoordinatorResponse struct {
 	Version      int16
 	ThrottleTime time.Duration
@@ -36,7 +38,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
 	}
 
 	coordinator := new(Broker)
-	if err := coordinator.decode(pd, 0); err != nil {
+	if err := coordinator.decode(pd, version); err != nil {
 		return err
 	}
 	if coordinator.addr == ":0" {
@@ -60,20 +62,11 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
 		}
 	}
 
-	if f.Coordinator == nil {
-		pe.putInt32(0)
-		if err := pe.putString(""); err != nil {
-			return nil
-		}
-		pe.putInt32(0)
-		if f.Version >= 1 {
-			if err := pe.putNullableString(nil); err != nil {
-				return nil
-			}
-		}
-		return nil
+	coordinator := f.Coordinator
+	if coordinator == nil {
+		coordinator = NoNode
 	}
-	if err := f.Coordinator.encode(pe, 0); err != nil {
+	if err := coordinator.encode(pe, f.Version); err != nil {
 		return err
 	}
 	return nil

+ 77 - 36
find_coordinator_response_test.go

@@ -5,42 +5,83 @@ import (
 	"time"
 )
 
-var (
-	findCoordinatorResponse = []byte{
-		0, 0, 0, 100,
-		0, 0,
-		255, 255, // empty ErrMsg
-		0, 0, 0, 1,
-		0, 4, 'h', 'o', 's', 't',
-		0, 0, 35, 132,
-	}
-
-	findCoordinatorResponseError = []byte{
-		0, 0, 0, 100,
-		0, 15,
-		0, 3, 'm', 's', 'g',
-		0, 0, 0, 1,
-		0, 4, 'h', 'o', 's', 't',
-		0, 0, 35, 132,
-	}
-)
-
 func TestFindCoordinatorResponse(t *testing.T) {
-	broker := NewBroker("host:9092")
-	broker.id = 1
-	resp := &FindCoordinatorResponse{
-		Version:      1,
-		ThrottleTime: 100 * time.Millisecond,
-		Err:          ErrNoError,
-		ErrMsg:       nil,
-		Coordinator:  broker,
-	}
-
-	testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)
+	errMsg := "kaboom"
+	brokerRack := "foo"
 
-	msg := "msg"
-	resp.Err = ErrConsumerCoordinatorNotAvailable
-	resp.ErrMsg = &msg
-
-	testResponse(t, "version 1 - error", resp, findCoordinatorResponseError)
+	for _, tc := range []struct {
+		desc     string
+		response *FindCoordinatorResponse
+		encoded  []byte
+	}{{
+		desc: "version 0 - no error",
+		response: &FindCoordinatorResponse{
+			Version: 0,
+			Err:     ErrNoError,
+			Coordinator: &Broker{
+				id:   7,
+				addr: "host:9092",
+			},
+		},
+		encoded: []byte{
+			0, 0, // Err
+			0, 0, 0, 7, // Coordinator.ID
+			0, 4, 'h', 'o', 's', 't', // Coordinator.Host
+			0, 0, 35, 132, // Coordinator.Port
+		},
+	}, {
+		desc: "version 1 - no error",
+		response: &FindCoordinatorResponse{
+			Version:      1,
+			ThrottleTime: 100 * time.Millisecond,
+			Err:          ErrNoError,
+			Coordinator: &Broker{
+				id:   7,
+				addr: "host:9092",
+				rack: &brokerRack,
+			},
+		},
+		encoded: []byte{
+			0, 0, 0, 100, // ThrottleTime
+			0, 0, // Err
+			255, 255, // ErrMsg: empty
+			0, 0, 0, 7, // Coordinator.ID
+			0, 4, 'h', 'o', 's', 't', // Coordinator.Host
+			0, 0, 35, 132, // Coordinator.Port
+			0, 3, 'f', 'o', 'o', // Coordinator.Rack
+		},
+	}, {
+		desc: "version 0 - error",
+		response: &FindCoordinatorResponse{
+			Version:     0,
+			Err:         ErrConsumerCoordinatorNotAvailable,
+			Coordinator: NoNode,
+		},
+		encoded: []byte{
+			0, 15, // Err
+			255, 255, 255, 255, // Coordinator.ID: -1
+			0, 0, // Coordinator.Host: ""
+			255, 255, 255, 255, // Coordinator.Port: -1
+		},
+	}, {
+		desc: "version 1 - error",
+		response: &FindCoordinatorResponse{
+			Version:      1,
+			ThrottleTime: 100 * time.Millisecond,
+			Err:          ErrConsumerCoordinatorNotAvailable,
+			ErrMsg:       &errMsg,
+			Coordinator:  NoNode,
+		},
+		encoded: []byte{
+			0, 0, 0, 100, // ThrottleTime
+			0, 15, // Err
+			0, 6, 'k', 'a', 'b', 'o', 'o', 'm', // ErrMsg
+			255, 255, 255, 255, // Coordinator.ID: -1
+			0, 0, // Coordinator.Host: ""
+			255, 255, 255, 255, // Coordinator.Port: -1
+			255, 255, // Coordinator.Rack: empty
+		},
+	}} {
+		testResponse(t, tc.desc, tc.response, tc.encoded)
+	}
 }