浏览代码

remove deprecated fields

Robin 8 年之前
父节点
当前提交
d0340489e8
共有 4 个文件被更改,包括 55 次插入62 次删除
  1. 32 9
      consumer_metadata_response.go
  2. 11 2
      consumer_metadata_response_test.go
  3. 7 43
      find_coordinator_response.go
  4. 5 8
      find_coordinator_response_test.go

+ 32 - 9
consumer_metadata_response.go

@@ -1,5 +1,10 @@
 package sarama
 
+import (
+	"net"
+	"strconv"
+)
+
 type ConsumerMetadataResponse struct {
 	Err             KError
 	Coordinator     *Broker
@@ -16,22 +21,40 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err
 	}
 
 	r.Err = tmp.Err
+
 	r.Coordinator = tmp.Coordinator
-	r.CoordinatorID = tmp.CoordinatorID
-	r.CoordinatorHost = tmp.CoordinatorHost
-	r.CoordinatorPort = tmp.CoordinatorPort
+	if tmp.Coordinator == nil {
+		return nil
+	}
+
+	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
+	// backwards compatibility
+	host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
+	if err != nil {
+		return err
+	}
+	port, err := strconv.ParseInt(portstr, 10, 32)
+	if err != nil {
+		return err
+	}
+	r.CoordinatorID = r.Coordinator.ID()
+	r.CoordinatorHost = host
+	r.CoordinatorPort = int32(port)
 
 	return nil
 }
 
 func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
+	if r.Coordinator == nil {
+		r.Coordinator = new(Broker)
+		r.Coordinator.id = r.CoordinatorID
+		r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
+	}
+
 	tmp := &FindCoordinatorResponse{
-		Version:         0,
-		Err:             r.Err,
-		Coordinator:     r.Coordinator,
-		CoordinatorID:   r.CoordinatorID,
-		CoordinatorHost: r.CoordinatorHost,
-		CoordinatorPort: r.CoordinatorPort,
+		Version:     0,
+		Err:         r.Err,
+		Coordinator: r.Coordinator,
 	}
 
 	if err := tmp.encode(pe); err != nil {

+ 11 - 2
consumer_metadata_response_test.go

@@ -17,8 +17,17 @@ var (
 )
 
 func TestConsumerMetadataResponseError(t *testing.T) {
-	response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
-	testResponse(t, "error", &response, consumerMetadataResponseError)
+	response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
+	testEncodable(t, "", response, consumerMetadataResponseError)
+
+	decodedResp := &ConsumerMetadataResponse{}
+	if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
+		t.Error("could not decode: ", err)
+	}
+
+	if decodedResp.Err != ErrOffsetsLoadInProgress {
+		t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
+	}
 }
 
 func TestConsumerMetadataResponseSuccess(t *testing.T) {

+ 7 - 43
find_coordinator_response.go

@@ -1,20 +1,15 @@
 package sarama
 
 import (
-	"net"
-	"strconv"
 	"time"
 )
 
 type FindCoordinatorResponse struct {
-	Version         int16
-	ThrottleTime    time.Duration
-	Err             KError
-	ErrMsg          *string
-	Coordinator     *Broker
-	CoordinatorID   int32  // deprecated: use Coordinator.ID()
-	CoordinatorHost string // deprecated: use Coordinator.Addr()
-	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
+	Version      int16
+	ThrottleTime time.Duration
+	Err          KError
+	ErrMsg       *string
+	Coordinator  *Broker
 }
 
 func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -49,20 +44,6 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
 	}
 	f.Coordinator = coordinator
 
-	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
-	// backwards compatibility
-	host, portstr, err := net.SplitHostPort(f.Coordinator.Addr())
-	if err != nil {
-		return err
-	}
-	port, err := strconv.ParseInt(portstr, 10, 32)
-	if err != nil {
-		return err
-	}
-	f.CoordinatorID = f.Coordinator.ID()
-	f.CoordinatorHost = host
-	f.CoordinatorPort = int32(port)
-
 	return nil
 }
 
@@ -79,27 +60,10 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
 		}
 	}
 
-	if f.Coordinator != nil {
-		host, portstr, err := net.SplitHostPort(f.Coordinator.Addr())
-		if err != nil {
-			return err
-		}
-		port, err := strconv.ParseInt(portstr, 10, 32)
-		if err != nil {
-			return err
-		}
-		pe.putInt32(f.Coordinator.ID())
-		if err := pe.putString(host); err != nil {
-			return err
-		}
-		pe.putInt32(int32(port))
-		return nil
-	}
-	pe.putInt32(f.CoordinatorID)
-	if err := pe.putString(f.CoordinatorHost); err != nil {
+	if err := f.Coordinator.encode(pe); err != nil {
 		return err
 	}
-	pe.putInt32(f.CoordinatorPort)
+
 	return nil
 }
 

+ 5 - 8
find_coordinator_response_test.go

@@ -29,14 +29,11 @@ func TestFindCoordinatorResponse(t *testing.T) {
 	broker := NewBroker("host:9092")
 	broker.id = 1
 	resp := &FindCoordinatorResponse{
-		Version:         1,
-		ThrottleTime:    100 * time.Millisecond,
-		Err:             ErrNoError,
-		ErrMsg:          nil,
-		CoordinatorID:   1,
-		CoordinatorHost: "host",
-		CoordinatorPort: 9092,
-		Coordinator:     broker,
+		Version:      1,
+		ThrottleTime: 100 * time.Millisecond,
+		Err:          ErrNoError,
+		ErrMsg:       nil,
+		Coordinator:  broker,
 	}
 
 	testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)