Browse Source

Add support for latest protocol messages

Specifically `SaslHandshake` (17) and `ApiVersions` (18) as well as related
errors.
Evan Huus 8 years ago
parent
commit
c5f4248429

+ 20 - 0
api_versions_request.go

@@ -0,0 +1,20 @@
+package sarama
+
+type ApiVersionsRequest struct {
+}
+
+func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
+	return nil
+}
+
+func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
+	return nil
+}
+
+func (r *ApiVersionsRequest) key() int16 {
+	return 18
+}
+
+func (r *ApiVersionsRequest) version() int16 {
+	return 0
+}

+ 14 - 0
api_versions_request_test.go

@@ -0,0 +1,14 @@
+package sarama
+
+import "testing"
+
+var (
+	apiVersionRequest = []byte{}
+)
+
+func TestApiVersionsRequest(t *testing.T) {
+	var request *ApiVersionsRequest
+
+	request = new(ApiVersionsRequest)
+	testRequest(t, "basic", request, apiVersionRequest)
+}

+ 74 - 0
api_versions_response.go

@@ -0,0 +1,74 @@
+package sarama
+
+type ApiVersionsResponseBlock struct {
+	ApiKey     int16
+	MinVersion int16
+	MaxVersion int16
+}
+
+func (r *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
+	pe.putInt16(r.ApiKey)
+	pe.putInt16(r.MinVersion)
+	pe.putInt16(r.MaxVersion)
+	return nil
+}
+
+func (r *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
+	var err error
+
+	if r.ApiKey, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	if r.MinVersion, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	if r.MaxVersion, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+type ApiVersionsResponse struct {
+	Err         KError
+	ApiVersions []*ApiVersionsResponseBlock
+}
+
+func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
+		return err
+	}
+	for _, apiVersion := range r.ApiVersions {
+		if err := apiVersion.encode(pe); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	numBlocks, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
+	for i := 0; i < numBlocks; i++ {
+		block := new(ApiVersionsResponseBlock)
+		if err := block.decode(pd); err != nil {
+			return err
+		}
+		r.ApiVersions[i] = block
+	}
+
+	return nil
+}

+ 32 - 0
api_versions_response_test.go

@@ -0,0 +1,32 @@
+package sarama
+
+import "testing"
+
+var (
+	apiVersionResponse = []byte{
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x03,
+		0x00, 0x02,
+		0x00, 0x01,
+	}
+)
+
+func TestApiVersionsResponse(t *testing.T) {
+	var response *ApiVersionsResponse
+
+	response = new(ApiVersionsResponse)
+	testDecodable(t, "no error", response, apiVersionResponse)
+	if response.Err != ErrNoError {
+		t.Error("Decoding error failed: no error expected but found", response.Err)
+	}
+	if response.ApiVersions[0].ApiKey != 0x03 {
+		t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
+	}
+	if response.ApiVersions[0].MinVersion != 0x02 {
+		t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
+	}
+	if response.ApiVersions[0].MaxVersion != 0x01 {
+		t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
+	}
+}

+ 15 - 0
errors.go

@@ -85,6 +85,7 @@ const (
 	ErrMessageSizeTooLarge             KError = 10
 	ErrStaleControllerEpochCode        KError = 11
 	ErrOffsetMetadataTooLarge          KError = 12
+	ErrNetworkException                KError = 13
 	ErrOffsetsLoadInProgress           KError = 14
 	ErrConsumerCoordinatorNotAvailable KError = 15
 	ErrNotCoordinatorForConsumer       KError = 16
@@ -103,6 +104,10 @@ const (
 	ErrTopicAuthorizationFailed        KError = 29
 	ErrGroupAuthorizationFailed        KError = 30
 	ErrClusterAuthorizationFailed      KError = 31
+	ErrInvalidTimestamp                KError = 32
+	ErrUnsupportedSASLMechanism        KError = 33
+	ErrIllegalSASLState                KError = 34
+	ErrUnsupportedVersion              KError = 35
 )
 
 func (err KError) Error() string {
@@ -137,6 +142,8 @@ func (err KError) Error() string {
 		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
 	case ErrOffsetMetadataTooLarge:
 		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
+	case ErrNetworkException:
+		return "kafka server: The server disconnected before a response was received."
 	case ErrOffsetsLoadInProgress:
 		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
 	case ErrConsumerCoordinatorNotAvailable:
@@ -173,6 +180,14 @@ func (err KError) Error() string {
 		return "kafka server: The client is not authorized to access this group."
 	case ErrClusterAuthorizationFailed:
 		return "kafka server: The client is not authorized to send this request type."
+	case ErrInvalidTimestamp:
+		return "kafka server: The timestamp of the message is out of acceptable range."
+	case ErrUnsupportedSASLMechanism:
+		return "kafka server: The broker does not support the requested SASL mechanism."
+	case ErrIllegalSASLState:
+		return "kafka server: Request is not valid given the current SASL state."
+	case ErrUnsupportedVersion:
+		return "kafka server: The version of API is not supported."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

+ 4 - 0
request.go

@@ -107,6 +107,10 @@ func allocateBody(key, version int16) requestBody {
 		return &DescribeGroupsRequest{}
 	case 16:
 		return &ListGroupsRequest{}
+	case 17:
+		return &SaslHandshakeRequest{}
+	case 18:
+		return &ApiVersionsRequest{}
 	}
 	return nil
 }

+ 29 - 0
sasl_handshake_request.go

@@ -0,0 +1,29 @@
+package sarama
+
+type SaslHandshakeRequest struct {
+	Mechanism string
+}
+
+func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.Mechanism); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *SaslHandshakeRequest) decode(pd packetDecoder) (err error) {
+	if r.Mechanism, err = pd.getString(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *SaslHandshakeRequest) key() int16 {
+	return 17
+}
+
+func (r *SaslHandshakeRequest) version() int16 {
+	return 0
+}

+ 17 - 0
sasl_handshake_request_test.go

@@ -0,0 +1,17 @@
+package sarama
+
+import "testing"
+
+var (
+	baseSaslRequest = []byte{
+		0, 3, 'f', 'o', 'o', // Mechanism
+	}
+)
+
+func TestSaslHandshakeRequest(t *testing.T) {
+	var request *SaslHandshakeRequest
+
+	request = new(SaslHandshakeRequest)
+	request.Mechanism = "foo"
+	testRequest(t, "basic", request, baseSaslRequest)
+}

+ 26 - 0
sasl_handshake_response.go

@@ -0,0 +1,26 @@
+package sarama
+
+type SaslHandshakeResponse struct {
+	Err               KError
+	EnabledMechanisms []string
+}
+
+func (r *SaslHandshakeResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	return pe.putStringArray(r.EnabledMechanisms)
+}
+
+func (r *SaslHandshakeResponse) decode(pd packetDecoder) error {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	var err error
+	if r.EnabledMechanisms, err = pd.getStringArray(); err != nil {
+		return err
+	}
+
+	return nil
+}

+ 24 - 0
sasl_handshake_response_test.go

@@ -0,0 +1,24 @@
+package sarama
+
+import "testing"
+
+var (
+	saslHandshakeResponse = []byte{
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x03, 'f', 'o', 'o',
+	}
+)
+
+func TestSaslHandshakeResponse(t *testing.T) {
+	var response *SaslHandshakeResponse
+
+	response = new(SaslHandshakeResponse)
+	testDecodable(t, "no error", response, saslHandshakeResponse)
+	if response.Err != ErrNoError {
+		t.Error("Decoding error failed: no error expected but found", response.Err)
+	}
+	if response.EnabledMechanisms[0] != "foo" {
+		t.Error("Decoding error failed: expected 'foo' but found", response.EnabledMechanisms)
+	}
+}