Pārlūkot izejas kodu

Merge pull request #673 from Shopify/versioned-responses

Versioned responses
Evan Huus 9 gadi atpakaļ
vecāks
revīzija
2ff1d9e3b5
48 mainītis faili ar 229 papildinājumiem un 77 dzēšanām
  1. 1 1
      api_versions_request.go
  2. 9 1
      api_versions_response.go
  3. 1 1
      api_versions_response_test.go
  4. 3 3
      broker.go
  5. 1 1
      consumer_metadata_request.go
  6. 9 1
      consumer_metadata_response.go
  7. 1 1
      describe_groups_request.go
  8. 9 1
      describe_groups_response.go
  9. 2 2
      describe_groups_response_test.go
  10. 22 0
      encoder_decoder.go
  11. 1 1
      fetch_request.go
  12. 9 1
      fetch_response.go
  13. 2 2
      fetch_response_test.go
  14. 1 1
      heartbeat_request.go
  15. 9 1
      heartbeat_response.go
  16. 1 1
      heartbeat_response_test.go
  17. 1 1
      join_group_request.go
  18. 9 1
      join_group_response.go
  19. 3 3
      join_group_response_test.go
  20. 1 1
      leave_group_request.go
  21. 9 1
      leave_group_response.go
  22. 2 2
      leave_group_response_test.go
  23. 1 1
      list_groups_request.go
  24. 9 1
      list_groups_response.go
  25. 3 3
      list_groups_response_test.go
  26. 1 1
      metadata_request.go
  27. 9 1
      metadata_response.go
  28. 3 3
      metadata_response_test.go
  29. 1 1
      mockbroker.go
  30. 10 10
      mockresponses.go
  31. 3 1
      offset_commit_request.go
  32. 9 1
      offset_commit_response.go
  33. 2 1
      offset_fetch_request.go
  34. 9 1
      offset_fetch_response.go
  35. 1 1
      offset_request.go
  36. 9 1
      offset_response.go
  37. 2 2
      offset_response_test.go
  38. 1 1
      produce_request.go
  39. 9 1
      produce_response.go
  40. 2 2
      produce_response_test.go
  41. 5 5
      request.go
  42. 11 4
      request_test.go
  43. 1 1
      sasl_handshake_request.go
  44. 9 1
      sasl_handshake_response.go
  45. 1 1
      sasl_handshake_response_test.go
  46. 1 1
      sync_group_request.go
  47. 9 1
      sync_group_response.go
  48. 2 2
      sync_group_response_test.go

+ 1 - 1
api_versions_request.go

@@ -7,7 +7,7 @@ func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
+func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 

+ 9 - 1
api_versions_response.go

@@ -49,7 +49,7 @@ func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
+func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -72,3 +72,11 @@ func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
 
 	return nil
 }
+
+func (r *ApiVersionsResponse) key() int16 {
+	return 18
+}
+
+func (r *ApiVersionsResponse) version() int16 {
+	return 0
+}

+ 1 - 1
api_versions_response_test.go

@@ -16,7 +16,7 @@ func TestApiVersionsResponse(t *testing.T) {
 	var response *ApiVersionsResponse
 
 	response = new(ApiVersionsResponse)
-	testDecodable(t, "no error", response, apiVersionResponse)
+	testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding error failed: no error expected but found", response.Err)
 	}

+ 3 - 3
broker.go

@@ -317,7 +317,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
 	return response, nil
 }
 
-func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
+func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
@@ -355,7 +355,7 @@ func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, e
 	return &promise, nil
 }
 
-func (b *Broker) sendAndReceive(req requestBody, res decoder) error {
+func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
 	promise, err := b.send(req, res != nil)
 
 	if err != nil {
@@ -368,7 +368,7 @@ func (b *Broker) sendAndReceive(req requestBody, res decoder) error {
 
 	select {
 	case buf := <-promise.packets:
-		return decode(buf, res)
+		return versionedDecode(buf, res, req.version())
 	case err = <-promise.errors:
 		return err
 	}

+ 1 - 1
consumer_metadata_request.go

@@ -8,7 +8,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
 	return pe.putString(r.ConsumerGroup)
 }
 
-func (r *ConsumerMetadataRequest) decode(pd packetDecoder) (err error) {
+func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
 	r.ConsumerGroup, err = pd.getString()
 	return err
 }

+ 9 - 1
consumer_metadata_response.go

@@ -13,7 +13,7 @@ type ConsumerMetadataResponse struct {
 	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
 }
 
-func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
+func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
@@ -71,3 +71,11 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
 	pe.putInt32(r.CoordinatorPort)
 	return nil
 }
+
+func (r *ConsumerMetadataResponse) key() int16 {
+	return 10
+}
+
+func (r *ConsumerMetadataResponse) version() int16 {
+	return 0
+}

+ 1 - 1
describe_groups_request.go

@@ -8,7 +8,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
 	return pe.putStringArray(r.Groups)
 }
 
-func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
+func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
 	r.Groups, err = pd.getStringArray()
 	return
 }

+ 9 - 1
describe_groups_response.go

@@ -18,7 +18,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
+func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -35,6 +35,14 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (r *DescribeGroupsResponse) key() int16 {
+	return 15
+}
+
+func (r *DescribeGroupsResponse) version() int16 {
+	return 0
+}
+
 type GroupDescription struct {
 	Err          KError
 	GroupId      string

+ 2 - 2
describe_groups_response_test.go

@@ -38,13 +38,13 @@ func TestDescribeGroupsResponse(t *testing.T) {
 	var response *DescribeGroupsResponse
 
 	response = new(DescribeGroupsResponse)
-	testDecodable(t, "empty", response, describeGroupsResponseEmpty)
+	testVersionDecodable(t, "empty", response, describeGroupsResponseEmpty, 0)
 	if len(response.Groups) != 0 {
 		t.Error("Expected no groups")
 	}
 
 	response = new(DescribeGroupsResponse)
-	testDecodable(t, "populated", response, describeGroupsResponsePopulated)
+	testVersionDecodable(t, "populated", response, describeGroupsResponsePopulated, 0)
 	if len(response.Groups) != 2 {
 		t.Error("Expected two groups")
 	}

+ 22 - 0
encoder_decoder.go

@@ -41,6 +41,10 @@ type decoder interface {
 	decode(pd packetDecoder) error
 }
 
+type versionedDecoder interface {
+	decode(pd packetDecoder, version int16) error
+}
+
 // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
 // interpreted using Kafka's encoding rules.
 func decode(buf []byte, in decoder) error {
@@ -60,3 +64,21 @@ func decode(buf []byte, in decoder) error {
 
 	return nil
 }
+
+func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
+	if buf == nil {
+		return nil
+	}
+
+	helper := realDecoder{raw: buf}
+	err := in.decode(&helper, version)
+	if err != nil {
+		return err
+	}
+
+	if helper.off != len(buf) {
+		return PacketDecodingError{"invalid length"}
+	}
+
+	return nil
+}

+ 1 - 1
fetch_request.go

@@ -55,7 +55,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
 	return nil
 }
 
-func (f *FetchRequest) decode(pd packetDecoder) (err error) {
+func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	if _, err = pd.getInt32(); err != nil {
 		return err
 	}

+ 9 - 1
fetch_response.go

@@ -49,7 +49,7 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
 	return pe.pop()
 }
 
-func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
+func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -116,6 +116,14 @@ func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
 	return nil
 }
 
+func (r *FetchResponse) key() int16 {
+	return 1
+}
+
+func (r *FetchResponse) version() int16 {
+	return 0
+}
+
 func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
 	if fr.Blocks == nil {
 		return nil

+ 2 - 2
fetch_response_test.go

@@ -30,7 +30,7 @@ var (
 
 func TestEmptyFetchResponse(t *testing.T) {
 	response := FetchResponse{}
-	testDecodable(t, "empty", &response, emptyFetchResponse)
+	testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
 
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced topic blocks where there were none.")
@@ -40,7 +40,7 @@ func TestEmptyFetchResponse(t *testing.T) {
 
 func TestOneMessageFetchResponse(t *testing.T) {
 	response := FetchResponse{}
-	testDecodable(t, "one message", &response, oneMessageFetchResponse)
+	testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
 
 	if len(response.Blocks) != 1 {
 		t.Fatal("Decoding produced incorrect number of topic blocks.")

+ 1 - 1
heartbeat_request.go

@@ -20,7 +20,7 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) {
+func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.GroupId, err = pd.getString(); err != nil {
 		return
 	}

+ 9 - 1
heartbeat_response.go

@@ -9,7 +9,7 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *HeartbeatResponse) decode(pd packetDecoder) error {
+func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -18,3 +18,11 @@ func (r *HeartbeatResponse) decode(pd packetDecoder) error {
 
 	return nil
 }
+
+func (r *HeartbeatResponse) key() int16 {
+	return 12
+}
+
+func (r *HeartbeatResponse) version() int16 {
+	return 0
+}

+ 1 - 1
heartbeat_response_test.go

@@ -11,7 +11,7 @@ func TestHeartbeatResponse(t *testing.T) {
 	var response *HeartbeatResponse
 
 	response = new(HeartbeatResponse)
-	testDecodable(t, "no error", response, heartbeatResponseNoError)
+	testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding error failed: no error expected but found", response.Err)
 	}

+ 1 - 1
join_group_request.go

@@ -35,7 +35,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
+func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.GroupId, err = pd.getString(); err != nil {
 		return
 	}

+ 9 - 1
join_group_response.go

@@ -52,7 +52,7 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
+func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -100,3 +100,11 @@ func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *JoinGroupResponse) key() int16 {
+	return 11
+}
+
+func (r *JoinGroupResponse) version() int16 {
+	return 0
+}

+ 3 - 3
join_group_response_test.go

@@ -40,7 +40,7 @@ func TestJoinGroupResponse(t *testing.T) {
 	var response *JoinGroupResponse
 
 	response = new(JoinGroupResponse)
-	testDecodable(t, "no error", response, joinGroupResponseNoError)
+	testVersionDecodable(t, "no error", response, joinGroupResponseNoError, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding Err failed: no error expected but found", response.Err)
 	}
@@ -58,7 +58,7 @@ func TestJoinGroupResponse(t *testing.T) {
 	}
 
 	response = new(JoinGroupResponse)
-	testDecodable(t, "with error", response, joinGroupResponseWithError)
+	testVersionDecodable(t, "with error", response, joinGroupResponseWithError, 0)
 	if response.Err != ErrInconsistentGroupProtocol {
 		t.Error("Decoding Err failed: ErrInconsistentGroupProtocol expected but found", response.Err)
 	}
@@ -76,7 +76,7 @@ func TestJoinGroupResponse(t *testing.T) {
 	}
 
 	response = new(JoinGroupResponse)
-	testDecodable(t, "with error", response, joinGroupResponseLeader)
+	testVersionDecodable(t, "with error", response, joinGroupResponseLeader, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding Err failed: ErrNoError expected but found", response.Err)
 	}

+ 1 - 1
leave_group_request.go

@@ -16,7 +16,7 @@ func (r *LeaveGroupRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) {
+func (r *LeaveGroupRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.GroupId, err = pd.getString(); err != nil {
 		return
 	}

+ 9 - 1
leave_group_response.go

@@ -9,7 +9,7 @@ func (r *LeaveGroupResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {
+func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -18,3 +18,11 @@ func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *LeaveGroupResponse) key() int16 {
+	return 13
+}
+
+func (r *LeaveGroupResponse) version() int16 {
+	return 0
+}

+ 2 - 2
leave_group_response_test.go

@@ -11,13 +11,13 @@ func TestLeaveGroupResponse(t *testing.T) {
 	var response *LeaveGroupResponse
 
 	response = new(LeaveGroupResponse)
-	testDecodable(t, "no error", response, leaveGroupResponseNoError)
+	testVersionDecodable(t, "no error", response, leaveGroupResponseNoError, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding error failed: no error expected but found", response.Err)
 	}
 
 	response = new(LeaveGroupResponse)
-	testDecodable(t, "with error", response, leaveGroupResponseWithError)
+	testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0)
 	if response.Err != ErrUnknownMemberId {
 		t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err)
 	}

+ 1 - 1
list_groups_request.go

@@ -7,7 +7,7 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) {
+func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 

+ 9 - 1
list_groups_response.go

@@ -23,7 +23,7 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *ListGroupsResponse) decode(pd packetDecoder) error {
+func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -54,3 +54,11 @@ func (r *ListGroupsResponse) decode(pd packetDecoder) error {
 
 	return nil
 }
+
+func (r *ListGroupsResponse) key() int16 {
+	return 16
+}
+
+func (r *ListGroupsResponse) version() int16 {
+	return 0
+}

+ 3 - 3
list_groups_response_test.go

@@ -27,7 +27,7 @@ func TestListGroupsResponse(t *testing.T) {
 	var response *ListGroupsResponse
 
 	response = new(ListGroupsResponse)
-	testDecodable(t, "no error", response, listGroupsResponseEmpty)
+	testVersionDecodable(t, "no error", response, listGroupsResponseEmpty, 0)
 	if response.Err != ErrNoError {
 		t.Error("Expected no gerror, found:", response.Err)
 	}
@@ -36,7 +36,7 @@ func TestListGroupsResponse(t *testing.T) {
 	}
 
 	response = new(ListGroupsResponse)
-	testDecodable(t, "no error", response, listGroupsResponseError)
+	testVersionDecodable(t, "no error", response, listGroupsResponseError, 0)
 	if response.Err != ErrClusterAuthorizationFailed {
 		t.Error("Expected no gerror, found:", response.Err)
 	}
@@ -45,7 +45,7 @@ func TestListGroupsResponse(t *testing.T) {
 	}
 
 	response = new(ListGroupsResponse)
-	testDecodable(t, "no error", response, listGroupsResponseWithConsumer)
+	testVersionDecodable(t, "no error", response, listGroupsResponseWithConsumer, 0)
 	if response.Err != ErrNoError {
 		t.Error("Expected no gerror, found:", response.Err)
 	}

+ 1 - 1
metadata_request.go

@@ -19,7 +19,7 @@ func (mr *MetadataRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (mr *MetadataRequest) decode(pd packetDecoder) error {
+func (mr *MetadataRequest) decode(pd packetDecoder, version int16) error {
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 		return err

+ 9 - 1
metadata_response.go

@@ -118,7 +118,7 @@ type MetadataResponse struct {
 	Topics  []*TopicMetadata
 }
 
-func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
+func (m *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -176,6 +176,14 @@ func (m *MetadataResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
+func (r *MetadataResponse) key() int16 {
+	return 3
+}
+
+func (r *MetadataResponse) version() int16 {
+	return 0
+}
+
 // testing API
 
 func (m *MetadataResponse) AddBroker(addr string, id int32) {

+ 3 - 3
metadata_response_test.go

@@ -41,7 +41,7 @@ var (
 func TestEmptyMetadataResponse(t *testing.T) {
 	response := MetadataResponse{}
 
-	testDecodable(t, "empty", &response, emptyMetadataResponse)
+	testVersionDecodable(t, "empty", &response, emptyMetadataResponse, 0)
 	if len(response.Brokers) != 0 {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
@@ -53,7 +53,7 @@ func TestEmptyMetadataResponse(t *testing.T) {
 func TestMetadataResponseWithBrokers(t *testing.T) {
 	response := MetadataResponse{}
 
-	testDecodable(t, "brokers, no topics", &response, brokersNoTopicsMetadataResponse)
+	testVersionDecodable(t, "brokers, no topics", &response, brokersNoTopicsMetadataResponse, 0)
 	if len(response.Brokers) != 2 {
 		t.Fatal("Decoding produced", len(response.Brokers), "brokers where there were two!")
 	}
@@ -79,7 +79,7 @@ func TestMetadataResponseWithBrokers(t *testing.T) {
 func TestMetadataResponseWithTopics(t *testing.T) {
 	response := MetadataResponse{}
 
-	testDecodable(t, "topics, no brokers", &response, topicsNoBrokersMetadataResponse)
+	testVersionDecodable(t, "topics, no brokers", &response, topicsNoBrokersMetadataResponse, 0)
 	if len(response.Brokers) != 0 {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}

+ 1 - 1
mockbroker.go

@@ -60,7 +60,7 @@ type MockBroker struct {
 
 // RequestResponse represents a Request/Response pair processed by MockBroker.
 type RequestResponse struct {
-	Request  requestBody
+	Request  protocolBody
 	Response encoder
 }
 

+ 10 - 10
mockresponses.go

@@ -17,7 +17,7 @@ type TestReporter interface {
 // allows generating a response based on a request body. MockResponses are used
 // to program behavior of MockBroker in tests.
 type MockResponse interface {
-	For(reqBody decoder) (res encoder)
+	For(reqBody versionedDecoder) (res encoder)
 }
 
 // MockWrapper is a mock response builder that returns a particular concrete
@@ -26,7 +26,7 @@ type MockWrapper struct {
 	res encoder
 }
 
-func (mw *MockWrapper) For(reqBody decoder) (res encoder) {
+func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
 	return mw.res
 }
 
@@ -58,7 +58,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence {
 	return ms
 }
 
-func (mc *MockSequence) For(reqBody decoder) (res encoder) {
+func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
 	res = mc.responses[0].For(reqBody)
 	if len(mc.responses) > 1 {
 		mc.responses = mc.responses[1:]
@@ -96,7 +96,7 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet
 	return mmr
 }
 
-func (mmr *MockMetadataResponse) For(reqBody decoder) encoder {
+func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 	metadataRequest := reqBody.(*MetadataRequest)
 	metadataResponse := &MetadataResponse{}
 	for addr, brokerID := range mmr.brokers {
@@ -146,7 +146,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of
 	return mor
 }
 
-func (mor *MockOffsetResponse) For(reqBody decoder) encoder {
+func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
 	offsetRequest := reqBody.(*OffsetRequest)
 	offsetResponse := &OffsetResponse{}
 	for topic, partitions := range offsetRequest.blocks {
@@ -216,7 +216,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of
 	return mfr
 }
 
-func (mfr *MockFetchResponse) For(reqBody decoder) encoder {
+func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
 	fetchRequest := reqBody.(*FetchRequest)
 	res := &FetchResponse{}
 	for topic, partitions := range fetchRequest.blocks {
@@ -298,7 +298,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M
 	return mr
 }
 
-func (mr *MockConsumerMetadataResponse) For(reqBody decoder) encoder {
+func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*ConsumerMetadataRequest)
 	group := req.ConsumerGroup
 	res := &ConsumerMetadataResponse{}
@@ -340,7 +340,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3
 	return mr
 }
 
-func (mr *MockOffsetCommitResponse) For(reqBody decoder) encoder {
+func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*OffsetCommitRequest)
 	group := req.ConsumerGroup
 	res := &OffsetCommitResponse{}
@@ -391,7 +391,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
 	return mr
 }
 
-func (mr *MockProduceResponse) For(reqBody decoder) encoder {
+func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*ProduceRequest)
 	res := &ProduceResponse{}
 	for topic, partitions := range req.msgSets {
@@ -442,7 +442,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 	return mr
 }
 
-func (mr *MockOffsetFetchResponse) For(reqBody decoder) encoder {
+func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*OffsetFetchRequest)
 	group := req.ConsumerGroup
 	res := &OffsetFetchResponse{}

+ 3 - 1
offset_commit_request.go

@@ -103,7 +103,9 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) {
+func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
 	if r.ConsumerGroup, err = pd.getString(); err != nil {
 		return err
 	}

+ 9 - 1
offset_commit_response.go

@@ -35,7 +35,7 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
+func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil || numTopics == 0 {
 		return err
@@ -71,3 +71,11 @@ func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *OffsetCommitResponse) key() int16 {
+	return 8
+}
+
+func (r *OffsetCommitResponse) version() int16 {
+	return 0
+}

+ 2 - 1
offset_fetch_request.go

@@ -28,7 +28,8 @@ func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
 	return nil
 }
 
-func (r *OffsetFetchRequest) decode(pd packetDecoder) (err error) {
+func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 	if r.ConsumerGroup, err = pd.getString(); err != nil {
 		return err
 	}

+ 9 - 1
offset_fetch_response.go

@@ -64,7 +64,7 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
+func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil || numTopics == 0 {
 		return err
@@ -106,6 +106,14 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (r *OffsetFetchResponse) key() int16 {
+	return 9
+}
+
+func (r *OffsetFetchResponse) version() int16 {
+	return 0
+}
+
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
 	if r.Blocks == nil {
 		return nil

+ 1 - 1
offset_request.go

@@ -50,7 +50,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *OffsetRequest) decode(pd packetDecoder) error {
+func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 	// Ignore replica ID
 	if _, err := pd.getInt32(); err != nil {
 		return err

+ 9 - 1
offset_response.go

@@ -27,7 +27,7 @@ type OffsetResponse struct {
 	Blocks map[string]map[int32]*OffsetResponseBlock
 }
 
-func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
+func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -115,6 +115,14 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
 	return nil
 }
 
+func (r *OffsetResponse) key() int16 {
+	return 2
+}
+
+func (r *OffsetResponse) version() int16 {
+	return 0
+}
+
 // testing API
 
 func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {

+ 2 - 2
offset_response_test.go

@@ -24,7 +24,7 @@ var (
 func TestEmptyOffsetResponse(t *testing.T) {
 	response := OffsetResponse{}
 
-	testDecodable(t, "empty", &response, emptyOffsetResponse)
+	testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 0)
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
 	}
@@ -33,7 +33,7 @@ func TestEmptyOffsetResponse(t *testing.T) {
 func TestNormalOffsetResponse(t *testing.T) {
 	response := OffsetResponse{}
 
-	testDecodable(t, "normal", &response, normalOffsetResponse)
+	testVersionDecodable(t, "normal", &response, normalOffsetResponse, 0)
 
 	if len(response.Blocks) != 2 {
 		t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")

+ 1 - 1
produce_request.go

@@ -54,7 +54,7 @@ func (p *ProduceRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (p *ProduceRequest) decode(pd packetDecoder) error {
+func (p *ProduceRequest) decode(pd packetDecoder, version int16) error {
 	requiredAcks, err := pd.getInt16()
 	if err != nil {
 		return err

+ 9 - 1
produce_response.go

@@ -24,7 +24,7 @@ type ProduceResponse struct {
 	Blocks map[string]map[int32]*ProduceResponseBlock
 }
 
-func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
+func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -85,6 +85,14 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
+func (r *ProduceResponse) key() int16 {
+	return 0
+}
+
+func (r *ProduceResponse) version() int16 {
+	return 0
+}
+
 func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if pr.Blocks == nil {
 		return nil

+ 2 - 2
produce_response_test.go

@@ -27,12 +27,12 @@ var (
 func TestProduceResponse(t *testing.T) {
 	response := ProduceResponse{}
 
-	testDecodable(t, "no blocks", &response, produceResponseNoBlocks)
+	testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0)
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
 	}
 
-	testDecodable(t, "many blocks", &response, produceResponseManyBlocks)
+	testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0)
 	if len(response.Blocks) != 2 {
 		t.Error("Decoding produced", len(response.Blocks), "topics where there were 2")
 	}

+ 5 - 5
request.go

@@ -6,9 +6,9 @@ import (
 	"io"
 )
 
-type requestBody interface {
+type protocolBody interface {
 	encoder
-	decoder
+	versionedDecoder
 	key() int16
 	version() int16
 }
@@ -16,7 +16,7 @@ type requestBody interface {
 type request struct {
 	correlationID int32
 	clientID      string
-	body          requestBody
+	body          protocolBody
 }
 
 func (r *request) encode(pe packetEncoder) (err error) {
@@ -53,7 +53,7 @@ func (r *request) decode(pd packetDecoder) (err error) {
 	if r.body == nil {
 		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
 	}
-	return r.body.decode(pd)
+	return r.body.decode(pd, version)
 }
 
 func decodeRequest(r io.Reader) (req *request, err error) {
@@ -79,7 +79,7 @@ func decodeRequest(r io.Reader) (req *request, err error) {
 	return req, nil
 }
 
-func allocateBody(key, version int16) requestBody {
+func allocateBody(key, version int16) protocolBody {
 	switch key {
 	case 0:
 		return &ProduceRequest{}

+ 11 - 4
request_test.go

@@ -40,7 +40,14 @@ func testDecodable(t *testing.T, name string, out decoder, in []byte) {
 	}
 }
 
-func testRequest(t *testing.T, name string, rb requestBody, expected []byte) {
+func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []byte, version int16) {
+	err := versionedDecode(in, out, version)
+	if err != nil {
+		t.Error("Decoding", name, "version", version, "failed:", err)
+	}
+}
+
+func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 	// Encoder request
 	req := &request{correlationID: 123, clientID: "foo", body: rb}
 	packet, err := encode(req)
@@ -61,7 +68,7 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) {
 	}
 }
 
-func testResponse(t *testing.T, name string, res encoder, expected []byte) {
+func testResponse(t *testing.T, name string, res protocolBody, expected []byte) {
 	encoded, err := encode(res)
 	if err != nil {
 		t.Error(err)
@@ -69,8 +76,8 @@ func testResponse(t *testing.T, name string, res encoder, expected []byte) {
 		t.Error("Encoding", name, "failed\ngot ", encoded, "\nwant", expected)
 	}
 
-	decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(decoder)
-	if err := decode(encoded, decoded); err != nil {
+	decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(versionedDecoder)
+	if err := versionedDecode(encoded, decoded, res.version()); err != nil {
 		t.Error("Decoding", name, "failed:", err)
 	}
 

+ 1 - 1
sasl_handshake_request.go

@@ -12,7 +12,7 @@ func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *SaslHandshakeRequest) decode(pd packetDecoder) (err error) {
+func (r *SaslHandshakeRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.Mechanism, err = pd.getString(); err != nil {
 		return err
 	}

+ 9 - 1
sasl_handshake_response.go

@@ -10,7 +10,7 @@ func (r *SaslHandshakeResponse) encode(pe packetEncoder) error {
 	return pe.putStringArray(r.EnabledMechanisms)
 }
 
-func (r *SaslHandshakeResponse) decode(pd packetDecoder) error {
+func (r *SaslHandshakeResponse) decode(pd packetDecoder, version int16) error {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -24,3 +24,11 @@ func (r *SaslHandshakeResponse) decode(pd packetDecoder) error {
 
 	return nil
 }
+
+func (r *SaslHandshakeResponse) key() int16 {
+	return 17
+}
+
+func (r *SaslHandshakeResponse) version() int16 {
+	return 0
+}

+ 1 - 1
sasl_handshake_response_test.go

@@ -14,7 +14,7 @@ func TestSaslHandshakeResponse(t *testing.T) {
 	var response *SaslHandshakeResponse
 
 	response = new(SaslHandshakeResponse)
-	testDecodable(t, "no error", response, saslHandshakeResponse)
+	testVersionDecodable(t, "no error", response, saslHandshakeResponse, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding error failed: no error expected but found", response.Err)
 	}

+ 1 - 1
sync_group_request.go

@@ -33,7 +33,7 @@ func (r *SyncGroupRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) {
+func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
 	if r.GroupId, err = pd.getString(); err != nil {
 		return
 	}

+ 9 - 1
sync_group_response.go

@@ -16,7 +16,7 @@ func (r *SyncGroupResponse) encode(pe packetEncoder) error {
 	return pe.putBytes(r.MemberAssignment)
 }
 
-func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) {
+func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) {
 	if kerr, err := pd.getInt16(); err != nil {
 		return err
 	} else {
@@ -26,3 +26,11 @@ func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) {
 	r.MemberAssignment, err = pd.getBytes()
 	return
 }
+
+func (r *SyncGroupResponse) key() int16 {
+	return 14
+}
+
+func (r *SyncGroupResponse) version() int16 {
+	return 0
+}

+ 2 - 2
sync_group_response_test.go

@@ -21,7 +21,7 @@ func TestSyncGroupResponse(t *testing.T) {
 	var response *SyncGroupResponse
 
 	response = new(SyncGroupResponse)
-	testDecodable(t, "no error", response, syncGroupResponseNoError)
+	testVersionDecodable(t, "no error", response, syncGroupResponseNoError, 0)
 	if response.Err != ErrNoError {
 		t.Error("Decoding Err failed: no error expected but found", response.Err)
 	}
@@ -30,7 +30,7 @@ func TestSyncGroupResponse(t *testing.T) {
 	}
 
 	response = new(SyncGroupResponse)
-	testDecodable(t, "no error", response, syncGroupResponseWithError)
+	testVersionDecodable(t, "no error", response, syncGroupResponseWithError, 0)
 	if response.Err != ErrRebalanceInProgress {
 		t.Error("Decoding Err failed: ErrRebalanceInProgress expected but found", response.Err)
 	}