Procházet zdrojové kódy

refactor ConsumerMetadataRequest/Response to FindCoordinatorRequest/Response

Robin před 6 roky
rodič
revize
a5a9b835af

+ 12 - 0
broker.go

@@ -230,6 +230,18 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume
 	return response, nil
 }
 
+func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
+	response := new(FindCoordinatorResponse)
+
+	err := b.sendAndReceive(request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 

+ 6 - 5
client.go

@@ -735,8 +735,8 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
 	return nil
 }
 
-func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) {
-	retry := func(err error) (*ConsumerMetadataResponse, error) {
+func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
+	retry := func(err error) (*FindCoordinatorResponse, error) {
 		if attemptsRemaining > 0 {
 			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
 			time.Sleep(client.conf.Metadata.Retry.Backoff)
@@ -748,10 +748,11 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
 	for broker := client.any(); broker != nil; broker = client.any() {
 		Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
 
-		request := new(ConsumerMetadataRequest)
-		request.ConsumerGroup = consumerGroup
+		request := new(FindCoordinatorRequest)
+		request.CoordinatorKey = consumerGroup
+		request.CoordinatorType = CoordinatorGroup
 
-		response, err := broker.GetConsumerMetadata(request)
+		response, err := broker.FindCoordinator(request)
 
 		if err != nil {
 			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)

+ 10 - 3
consumer_metadata_request.go

@@ -5,12 +5,19 @@ type ConsumerMetadataRequest struct {
 }
 
 func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
-	return pe.putString(r.ConsumerGroup)
+	tmp := new(FindCoordinatorRequest)
+	tmp.CoordinatorKey = r.ConsumerGroup
+	tmp.CoordinatorType = CoordinatorGroup
+	return tmp.encode(pe)
 }
 
 func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
-	r.ConsumerGroup, err = pd.getString()
-	return err
+	tmp := new(FindCoordinatorRequest)
+	if err := tmp.decode(pd, version); err != nil {
+		return err
+	}
+	r.ConsumerGroup = tmp.CoordinatorKey
+	return nil
 }
 
 func (r *ConsumerMetadataRequest) key() int16 {

+ 7 - 3
consumer_metadata_request_test.go

@@ -1,6 +1,8 @@
 package sarama
 
-import "testing"
+import (
+	"testing"
+)
 
 var (
 	consumerMetadataRequestEmpty = []byte{
@@ -12,8 +14,10 @@ var (
 
 func TestConsumerMetadataRequest(t *testing.T) {
 	request := new(ConsumerMetadataRequest)
-	testRequest(t, "empty string", request, consumerMetadataRequestEmpty)
+	testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
+	testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0)
 
 	request.ConsumerGroup = "foobar"
-	testRequest(t, "with string", request, consumerMetadataRequestString)
+	testEncodable(t, "with string", request, consumerMetadataRequestString)
+	testVersionDecodable(t, "with string", request, consumerMetadataRequestString, 0)
 }

+ 17 - 48
consumer_metadata_response.go

@@ -1,10 +1,5 @@
 package sarama
 
-import (
-	"net"
-	"strconv"
-)
-
 type ConsumerMetadataResponse struct {
 	Err             KError
 	Coordinator     *Broker
@@ -14,61 +9,35 @@ type ConsumerMetadataResponse struct {
 }
 
 func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
-	tmp, err := pd.getInt16()
-	if err != nil {
-		return err
-	}
-	r.Err = KError(tmp)
+	tmp := new(FindCoordinatorResponse)
 
-	coordinator := new(Broker)
-	if err := coordinator.decode(pd); err != nil {
+	if err := tmp.decode(pd, version); err != nil {
 		return err
 	}
-	if coordinator.addr == ":0" {
-		return nil
-	}
-	r.Coordinator = coordinator
 
-	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
-	// backwards compatibility
-	host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
-	if err != nil {
-		return err
-	}
-	port, err := strconv.ParseInt(portstr, 10, 32)
-	if err != nil {
-		return err
-	}
-	r.CoordinatorID = r.Coordinator.ID()
-	r.CoordinatorHost = host
-	r.CoordinatorPort = int32(port)
+	r.Err = tmp.Err
+	r.Coordinator = tmp.Coordinator
+	r.CoordinatorID = tmp.CoordinatorID
+	r.CoordinatorHost = tmp.CoordinatorHost
+	r.CoordinatorPort = tmp.CoordinatorPort
 
 	return nil
 }
 
 func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
-	pe.putInt16(int16(r.Err))
-	if r.Coordinator != nil {
-		host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
-		if err != nil {
-			return err
-		}
-		port, err := strconv.ParseInt(portstr, 10, 32)
-		if err != nil {
-			return err
-		}
-		pe.putInt32(r.Coordinator.ID())
-		if err := pe.putString(host); err != nil {
-			return err
-		}
-		pe.putInt32(int32(port))
-		return nil
+	tmp := &FindCoordinatorResponse{
+		Version:         0,
+		Err:             r.Err,
+		Coordinator:     r.Coordinator,
+		CoordinatorID:   r.CoordinatorID,
+		CoordinatorHost: r.CoordinatorHost,
+		CoordinatorPort: r.CoordinatorPort,
 	}
-	pe.putInt32(r.CoordinatorID)
-	if err := pe.putString(r.CoordinatorHost); err != nil {
+
+	if err := tmp.encode(pe); err != nil {
 		return err
 	}
-	pe.putInt32(r.CoordinatorPort)
+
 	return nil
 }
 

+ 61 - 0
find_coordinator_request.go

@@ -0,0 +1,61 @@
+package sarama
+
+type CoordinatorType int8
+
+const (
+	CoordinatorGroup       CoordinatorType = 0
+	CoordinatorTransaction CoordinatorType = 1
+)
+
+type FindCoordinatorRequest struct {
+	Version         int16
+	CoordinatorKey  string
+	CoordinatorType CoordinatorType
+}
+
+func (f *FindCoordinatorRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(f.CoordinatorKey); err != nil {
+		return err
+	}
+
+	if f.Version >= 1 {
+		pe.putInt8(int8(f.CoordinatorType))
+	}
+
+	return nil
+}
+
+func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) {
+	if f.CoordinatorKey, err = pd.getString(); err != nil {
+		return err
+	}
+
+	if version >= 1 {
+		f.Version = version
+		coordinatorType, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+
+		f.CoordinatorType = CoordinatorType(coordinatorType)
+	}
+
+	return nil
+}
+
+func (f *FindCoordinatorRequest) key() int16 {
+	return 10
+}
+
+func (f *FindCoordinatorRequest) version() int16 {
+	return f.Version
+}
+
+func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
+	switch f.Version {
+	case 1:
+		return V0_11_0_0
+	default:
+		return V0_8_2_0
+	}
+}

+ 33 - 0
find_coordinator_request_test.go

@@ -0,0 +1,33 @@
+package sarama
+
+import "testing"
+
+var (
+	findCoordinatorRequestConsumerGroup = []byte{
+		0, 5, 'g', 'r', 'o', 'u', 'p',
+		0,
+	}
+
+	findCoordinatorRequestTransaction = []byte{
+		0, 13, 't', 'r', 'a', 'n', 's', 'a', 'c', 't', 'i', 'o', 'n', 'i', 'd',
+		1,
+	}
+)
+
+func TestFindCoordinatorRequest(t *testing.T) {
+	req := &FindCoordinatorRequest{
+		Version:         1,
+		CoordinatorKey:  "group",
+		CoordinatorType: CoordinatorGroup,
+	}
+
+	testRequest(t, "version 1 - group", req, findCoordinatorRequestConsumerGroup)
+
+	req = &FindCoordinatorRequest{
+		Version:         1,
+		CoordinatorKey:  "transactionid",
+		CoordinatorType: CoordinatorTransaction,
+	}
+
+	testRequest(t, "version 1 - transaction", req, findCoordinatorRequestTransaction)
+}

+ 121 - 0
find_coordinator_response.go

@@ -0,0 +1,121 @@
+package sarama
+
+import (
+	"net"
+	"strconv"
+	"time"
+)
+
+type FindCoordinatorResponse struct {
+	Version         int16
+	ThrottleTime    time.Duration
+	Err             KError
+	ErrMsg          *string
+	Coordinator     *Broker
+	CoordinatorID   int32  // deprecated: use Coordinator.ID()
+	CoordinatorHost string // deprecated: use Coordinator.Addr()
+	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
+}
+
+func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
+	if version >= 1 {
+		f.Version = version
+
+		throttleTime, err := pd.getInt32()
+		if err != nil {
+			return err
+		}
+		f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+	}
+
+	tmp, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	f.Err = KError(tmp)
+
+	if version >= 1 {
+		if f.ErrMsg, err = pd.getNullableString(); err != nil {
+			return err
+		}
+	}
+
+	coordinator := new(Broker)
+	if err := coordinator.decode(pd); err != nil {
+		return err
+	}
+	if coordinator.addr == ":0" {
+		return nil
+	}
+	f.Coordinator = coordinator
+
+	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
+	// backwards compatibility
+	host, portstr, err := net.SplitHostPort(f.Coordinator.Addr())
+	if err != nil {
+		return err
+	}
+	port, err := strconv.ParseInt(portstr, 10, 32)
+	if err != nil {
+		return err
+	}
+	f.CoordinatorID = f.Coordinator.ID()
+	f.CoordinatorHost = host
+	f.CoordinatorPort = int32(port)
+
+	return nil
+}
+
+func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
+	if f.Version >= 1 {
+		pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
+	}
+
+	pe.putInt16(int16(f.Err))
+
+	if f.Version >= 1 {
+		if err := pe.putNullableString(f.ErrMsg); err != nil {
+			return err
+		}
+	}
+
+	if f.Coordinator != nil {
+		host, portstr, err := net.SplitHostPort(f.Coordinator.Addr())
+		if err != nil {
+			return err
+		}
+		port, err := strconv.ParseInt(portstr, 10, 32)
+		if err != nil {
+			return err
+		}
+		pe.putInt32(f.Coordinator.ID())
+		if err := pe.putString(host); err != nil {
+			return err
+		}
+		pe.putInt32(int32(port))
+		return nil
+	}
+	pe.putInt32(f.CoordinatorID)
+	if err := pe.putString(f.CoordinatorHost); err != nil {
+		return err
+	}
+	pe.putInt32(f.CoordinatorPort)
+	return nil
+}
+
+func (f *FindCoordinatorResponse) key() int16 {
+	return 10
+}
+
+func (f *FindCoordinatorResponse) version() int16 {
+	return f.Version
+}
+
+func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
+	switch f.Version {
+	case 1:
+		return V0_11_0_0
+	default:
+		return V0_8_2_0
+	}
+}

+ 49 - 0
find_coordinator_response_test.go

@@ -0,0 +1,49 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	findCoordinatorResponse = []byte{
+		0, 0, 0, 100,
+		0, 0,
+		255, 255, // empty ErrMsg
+		0, 0, 0, 1,
+		0, 4, 'h', 'o', 's', 't',
+		0, 0, 35, 132,
+	}
+
+	findCoordinatorResponseError = []byte{
+		0, 0, 0, 100,
+		0, 15,
+		0, 3, 'm', 's', 'g',
+		0, 0, 0, 1,
+		0, 4, 'h', 'o', 's', 't',
+		0, 0, 35, 132,
+	}
+)
+
+func TestFindCoordinatorResponse(t *testing.T) {
+	broker := NewBroker("host:9092")
+	broker.id = 1
+	resp := &FindCoordinatorResponse{
+		Version:         1,
+		ThrottleTime:    100 * time.Millisecond,
+		Err:             ErrNoError,
+		ErrMsg:          nil,
+		CoordinatorID:   1,
+		CoordinatorHost: "host",
+		CoordinatorPort: 9092,
+		Coordinator:     broker,
+	}
+
+	testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)
+
+	msg := "msg"
+	resp.Err = ErrConsumerCoordinatorNotAvailable
+	resp.ErrMsg = &msg
+
+	testResponse(t, "version 1 - error", resp, findCoordinatorResponseError)
+}

+ 1 - 1
request.go

@@ -97,7 +97,7 @@ func allocateBody(key, version int16) protocolBody {
 	case 9:
 		return &OffsetFetchRequest{}
 	case 10:
-		return &ConsumerMetadataRequest{}
+		return &FindCoordinatorRequest{}
 	case 11:
 		return &JoinGroupRequest{}
 	case 12: