浏览代码

Merge pull request #77 from snaury/protocol-0.8.1

Fix offset commit/fetch compatibility with kafka 0.8.1
Willem van Bergen 11 年之前
父节点
当前提交
5cf9fad3e8

+ 2 - 2
broker_test.go

@@ -128,7 +128,7 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
 			response, err := broker.FetchOffset("clientID", &request)
@@ -140,7 +140,7 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
 			response, err := broker.CommitOffset("clientID", &request)

+ 1 - 1
offset_commit_request.go

@@ -45,7 +45,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
 }
 
 func (r *OffsetCommitRequest) key() int16 {
-	return 6
+	return 8
 }
 
 func (r *OffsetCommitRequest) version() int16 {

+ 1 - 7
offset_commit_response.go

@@ -1,16 +1,10 @@
 package sarama
 
 type OffsetCommitResponse struct {
-	ClientID string
-	Errors   map[string]map[int32]KError
+	Errors map[string]map[int32]KError
 }
 
 func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
-	r.ClientID, err = pd.getString()
-	if err != nil {
-		return err
-	}
-
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err

+ 1 - 8
offset_commit_response_test.go

@@ -4,11 +4,9 @@ import "testing"
 
 var (
 	emptyOffsetCommitResponse = []byte{
-		0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x00}
 
 	normalOffsetCommitResponse = []byte{
-		0x00, 0x02, 'a', 'z',
 		0x00, 0x00, 0x00, 0x02,
 
 		0x00, 0x01, 'm',
@@ -24,9 +22,7 @@ func TestEmptyOffsetCommitResponse(t *testing.T) {
 	response := OffsetCommitResponse{}
 
 	testDecodable(t, "empty", &response, emptyOffsetCommitResponse)
-	if response.ClientID != "" {
-		t.Error("Decoding produced client ID where there was none.")
-	}
+
 	if len(response.Errors) != 0 {
 		t.Error("Decoding produced errors where there were none.")
 	}
@@ -36,9 +32,6 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 	response := OffsetCommitResponse{}
 
 	testDecodable(t, "normal", &response, normalOffsetCommitResponse)
-	if response.ClientID != "az" {
-		t.Error("Decoding produced wrong client ID.")
-	}
 
 	if len(response.Errors) != 2 {
 		t.Fatal("Decoding produced wrong number of errors.")

+ 1 - 1
offset_fetch_request.go

@@ -25,7 +25,7 @@ func (r *OffsetFetchRequest) encode(pe packetEncoder) error {
 }
 
 func (r *OffsetFetchRequest) key() int16 {
-	return 7
+	return 9
 }
 
 func (r *OffsetFetchRequest) version() int16 {

+ 1 - 7
offset_fetch_response.go

@@ -40,16 +40,10 @@ func (r *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 
 type OffsetFetchResponse struct {
-	ClientID string
-	Blocks   map[string]map[int32]*OffsetFetchResponseBlock
+	Blocks map[string]map[int32]*OffsetFetchResponseBlock
 }
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
-	r.ClientID, err = pd.getString()
-	if err != nil {
-		return err
-	}
-
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err

+ 1 - 8
offset_fetch_response_test.go

@@ -4,11 +4,9 @@ import "testing"
 
 var (
 	emptyOffsetFetchResponse = []byte{
-		0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x00}
 
 	normalOffsetFetchResponse = []byte{
-		0x00, 0x02, 'z', 'a',
 		0x00, 0x00, 0x00, 0x02,
 
 		0x00, 0x01, 'm',
@@ -26,9 +24,7 @@ func TestEmptyOffsetFetchResponse(t *testing.T) {
 	response := OffsetFetchResponse{}
 
 	testDecodable(t, "empty", &response, emptyOffsetFetchResponse)
-	if response.ClientID != "" {
-		t.Error("Decoding produced client ID where there was none.")
-	}
+
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced topic blocks where there were none.")
 	}
@@ -38,9 +34,6 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 	response := OffsetFetchResponse{}
 
 	testDecodable(t, "normal", &response, normalOffsetFetchResponse)
-	if response.ClientID != "za" {
-		t.Error("Decoding produced wrong client ID.")
-	}
 
 	if len(response.Blocks) != 2 {
 		t.Fatal("Decoding produced wrong number of blocks.")