Browse Source

Merge pull request #1050 from mailgun/maxim/develop

 Fix FindCoordinatorResponse.encode to allow nil Coordinator
Evan Huus 7 years ago
parent
commit
f93325f470
3 changed files with 139 additions and 39 deletions
  1. 8 3
      find_coordinator_response.go
  2. 77 36
      find_coordinator_response_test.go
  3. 54 0
      mockresponses.go

+ 8 - 3
find_coordinator_response.go

@@ -4,6 +4,8 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+var NoNode = &Broker{id: -1, addr: ":-1"}
+
 type FindCoordinatorResponse struct {
 type FindCoordinatorResponse struct {
 	Version      int16
 	Version      int16
 	ThrottleTime time.Duration
 	ThrottleTime time.Duration
@@ -36,7 +38,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
 	}
 	}
 
 
 	coordinator := new(Broker)
 	coordinator := new(Broker)
-	if err := coordinator.decode(pd, 0); err != nil {
+	if err := coordinator.decode(pd, version); err != nil {
 		return err
 		return err
 	}
 	}
 	if coordinator.addr == ":0" {
 	if coordinator.addr == ":0" {
@@ -60,10 +62,13 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
 		}
 		}
 	}
 	}
 
 
-	if err := f.Coordinator.encode(pe, 0); err != nil {
+	coordinator := f.Coordinator
+	if coordinator == nil {
+		coordinator = NoNode
+	}
+	if err := coordinator.encode(pe, f.Version); err != nil {
 		return err
 		return err
 	}
 	}
-
 	return nil
 	return nil
 }
 }
 
 

+ 77 - 36
find_coordinator_response_test.go

@@ -5,42 +5,83 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-var (
-	findCoordinatorResponse = []byte{
-		0, 0, 0, 100,
-		0, 0,
-		255, 255, // empty ErrMsg
-		0, 0, 0, 1,
-		0, 4, 'h', 'o', 's', 't',
-		0, 0, 35, 132,
-	}
-
-	findCoordinatorResponseError = []byte{
-		0, 0, 0, 100,
-		0, 15,
-		0, 3, 'm', 's', 'g',
-		0, 0, 0, 1,
-		0, 4, 'h', 'o', 's', 't',
-		0, 0, 35, 132,
-	}
-)
-
 func TestFindCoordinatorResponse(t *testing.T) {
 func TestFindCoordinatorResponse(t *testing.T) {
-	broker := NewBroker("host:9092")
-	broker.id = 1
-	resp := &FindCoordinatorResponse{
-		Version:      1,
-		ThrottleTime: 100 * time.Millisecond,
-		Err:          ErrNoError,
-		ErrMsg:       nil,
-		Coordinator:  broker,
-	}
-
-	testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)
+	errMsg := "kaboom"
+	brokerRack := "foo"
 
 
-	msg := "msg"
-	resp.Err = ErrConsumerCoordinatorNotAvailable
-	resp.ErrMsg = &msg
-
-	testResponse(t, "version 1 - error", resp, findCoordinatorResponseError)
+	for _, tc := range []struct {
+		desc     string
+		response *FindCoordinatorResponse
+		encoded  []byte
+	}{{
+		desc: "version 0 - no error",
+		response: &FindCoordinatorResponse{
+			Version: 0,
+			Err:     ErrNoError,
+			Coordinator: &Broker{
+				id:   7,
+				addr: "host:9092",
+			},
+		},
+		encoded: []byte{
+			0, 0, // Err
+			0, 0, 0, 7, // Coordinator.ID
+			0, 4, 'h', 'o', 's', 't', // Coordinator.Host
+			0, 0, 35, 132, // Coordinator.Port
+		},
+	}, {
+		desc: "version 1 - no error",
+		response: &FindCoordinatorResponse{
+			Version:      1,
+			ThrottleTime: 100 * time.Millisecond,
+			Err:          ErrNoError,
+			Coordinator: &Broker{
+				id:   7,
+				addr: "host:9092",
+				rack: &brokerRack,
+			},
+		},
+		encoded: []byte{
+			0, 0, 0, 100, // ThrottleTime
+			0, 0, // Err
+			255, 255, // ErrMsg: empty
+			0, 0, 0, 7, // Coordinator.ID
+			0, 4, 'h', 'o', 's', 't', // Coordinator.Host
+			0, 0, 35, 132, // Coordinator.Port
+			0, 3, 'f', 'o', 'o', // Coordinator.Rack
+		},
+	}, {
+		desc: "version 0 - error",
+		response: &FindCoordinatorResponse{
+			Version:     0,
+			Err:         ErrConsumerCoordinatorNotAvailable,
+			Coordinator: NoNode,
+		},
+		encoded: []byte{
+			0, 15, // Err
+			255, 255, 255, 255, // Coordinator.ID: -1
+			0, 0, // Coordinator.Host: ""
+			255, 255, 255, 255, // Coordinator.Port: -1
+		},
+	}, {
+		desc: "version 1 - error",
+		response: &FindCoordinatorResponse{
+			Version:      1,
+			ThrottleTime: 100 * time.Millisecond,
+			Err:          ErrConsumerCoordinatorNotAvailable,
+			ErrMsg:       &errMsg,
+			Coordinator:  NoNode,
+		},
+		encoded: []byte{
+			0, 0, 0, 100, // ThrottleTime
+			0, 15, // Err
+			0, 6, 'k', 'a', 'b', 'o', 'o', 'm', // ErrMsg
+			255, 255, 255, 255, // Coordinator.ID: -1
+			0, 0, // Coordinator.Host: ""
+			255, 255, 255, 255, // Coordinator.Port: -1
+			255, 255, // Coordinator.Rack: empty
+		},
+	}} {
+		testResponse(t, tc.desc, tc.response, tc.encoded)
+	}
 }
 }

+ 54 - 0
mockresponses.go

@@ -326,6 +326,60 @@ func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
 	return res
 	return res
 }
 }
 
 
+// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
+type MockFindCoordinatorResponse struct {
+	groupCoordinators map[string]interface{}
+	transCoordinators map[string]interface{}
+	t                 TestReporter
+}
+
+func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
+	return &MockFindCoordinatorResponse{
+		groupCoordinators: make(map[string]interface{}),
+		transCoordinators: make(map[string]interface{}),
+		t:                 t,
+	}
+}
+
+func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
+	switch coordinatorType {
+	case CoordinatorGroup:
+		mr.groupCoordinators[group] = broker
+	case CoordinatorTransaction:
+		mr.transCoordinators[group] = broker
+	}
+	return mr
+}
+
+func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
+	switch coordinatorType {
+	case CoordinatorGroup:
+		mr.groupCoordinators[group] = kerror
+	case CoordinatorTransaction:
+		mr.transCoordinators[group] = kerror
+	}
+	return mr
+}
+
+func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*FindCoordinatorRequest)
+	res := &FindCoordinatorResponse{}
+	var v interface{}
+	switch req.CoordinatorType {
+	case CoordinatorGroup:
+		v = mr.groupCoordinators[req.CoordinatorKey]
+	case CoordinatorTransaction:
+		v = mr.transCoordinators[req.CoordinatorKey]
+	}
+	switch v := v.(type) {
+	case *MockBroker:
+		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
+	case KError:
+		res.Err = v
+	}
+	return res
+}
+
 // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
 // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
 type MockOffsetCommitResponse struct {
 type MockOffsetCommitResponse struct {
 	errors map[string]map[string]map[int32]KError
 	errors map[string]map[string]map[int32]KError