Browse Source

Finish updating protocol package for encoding API

Evan Huus 12 years ago
parent
commit
6dd4dc2c85

+ 0 - 58
protocol/errors.go

@@ -2,64 +2,6 @@ package protocol
 
 import "errors"
 
-// KError is the type of error that can be returned directly by the Kafka broker.
-// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
-type KError int16
-
-const (
-	NO_ERROR                    KError = 0
-	UNKNOWN                     KError = -1
-	OFFSET_OUT_OF_RANGE         KError = 1
-	INVALID_MESSAGE             KError = 2
-	UNKNOWN_TOPIC_OR_PARTITION  KError = 3
-	INVALID_MESSAGE_SIZE        KError = 4
-	LEADER_NOT_AVAILABLE        KError = 5
-	NOT_LEADER_FOR_PARTITION    KError = 6
-	REQUEST_TIMED_OUT           KError = 7
-	BROKER_NOT_AVAILABLE        KError = 8
-	REPLICA_NOT_AVAILABLE       KError = 9
-	MESSAGE_SIZE_TOO_LARGE      KError = 10
-	STALE_CONTROLLER_EPOCH_CODE KError = 11
-	OFFSET_METADATA_TOO_LARGE   KError = 12
-)
-
-func (err KError) Error() string {
-	// Error messages stolen/adapted from
-	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
-	switch err {
-	case NO_ERROR:
-		return "kafka server: Not an error, why are you printing me?"
-	case UNKNOWN:
-		return "kafka server: Unexpected (unknown?) server error."
-	case OFFSET_OUT_OF_RANGE:
-		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
-	case INVALID_MESSAGE:
-		return "kafka server: Message contents does not match its CRC."
-	case UNKNOWN_TOPIC_OR_PARTITION:
-		return "kafka server: Request was for a topic or partition that does not exist on this broker."
-	case INVALID_MESSAGE_SIZE:
-		return "kafka server: The message has a negative size."
-	case LEADER_NOT_AVAILABLE:
-		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
-	case NOT_LEADER_FOR_PARTITION:
-		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
-	case REQUEST_TIMED_OUT:
-		return "kafka server: Request exceeded the user-specified time limit in the request."
-	case BROKER_NOT_AVAILABLE:
-		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
-	case REPLICA_NOT_AVAILABLE:
-		return "kafka server: Replica not available. What is the difference between this and LeaderNotAvailable?"
-	case MESSAGE_SIZE_TOO_LARGE:
-		return "kafka server: Message was too large, server rejected it to avoid allocation error."
-	case STALE_CONTROLLER_EPOCH_CODE:
-		return "kafka server: Stale controller epoch code. ???"
-	case OFFSET_METADATA_TOO_LARGE:
-		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
-	default:
-		return "Unknown error, how did this happen?"
-	}
-}
-
 // AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
 var AlreadyConnected = errors.New("kafka: broker: already connected")
 

+ 5 - 3
protocol/fetch_response.go

@@ -1,18 +1,20 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type FetchResponseBlock struct {
-	Err                 KError
+	Err                 types.KError
 	HighWaterMarkOffset int64
 	MsgSet              MessageSet
 }
 
 func (pr *FetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	pr.Err, err = pd.GetError()
+	tmp, err := pd.GetInt16()
 	if err != nil {
 		return err
 	}
+	pr.Err = types.KError(tmp)
 
 	pr.HighWaterMarkOffset, err = pd.GetInt64()
 	if err != nil {
@@ -64,7 +66,7 @@ func (fr *FetchResponse) Decode(pd enc.PacketDecoder) (err error) {
 			}
 
 			block := new(FetchResponseBlock)
-			err = block.decode(pd)
+			err = block.Decode(pd)
 			if err != nil {
 				return err
 			}

+ 3 - 3
protocol/message.go

@@ -24,7 +24,7 @@ func (m *Message) Encode(pe enc.PacketEncoder) error {
 	pe.PutInt8(message_format)
 
 	var attributes int8 = 0
-	attributes |= m.Codec & 0x07
+	attributes |= int8(m.Codec) & 0x07
 	pe.PutInt8(attributes)
 
 	err := pe.PutBytes(m.Key)
@@ -56,7 +56,7 @@ func (m *Message) Encode(pe enc.PacketEncoder) error {
 }
 
 func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
-	err = pd.Push(&CRC32Field{})
+	err = pd.Push(&enc.CRC32Field{})
 	if err != nil {
 		return err
 	}
@@ -73,7 +73,7 @@ func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	m.Codec = attribute & 0x07
+	m.Codec = types.CompressionCodec(attribute & 0x07)
 
 	m.Key, err = pd.GetBytes()
 	if err != nil {

+ 6 - 5
protocol/message_set.go

@@ -14,7 +14,7 @@ func (msb *MessageBlock) Encode(pe enc.PacketEncoder) error {
 	if err != nil {
 		return err
 	}
-	pe.Pop()
+	return pe.Pop()
 }
 
 func (msb *MessageBlock) Decode(pd enc.PacketDecoder) (err error) {
@@ -23,7 +23,7 @@ func (msb *MessageBlock) Decode(pd enc.PacketDecoder) (err error) {
 		return err
 	}
 
-	err = pd.Push(&enc.LengthField{})
+	pd.Push(&enc.LengthField{})
 	if err != nil {
 		return err
 	}
@@ -54,6 +54,7 @@ func (ms *MessageSet) Encode(pe enc.PacketEncoder) error {
 			return err
 		}
 	}
+	return nil
 }
 
 func (ms *MessageSet) Decode(pd enc.PacketDecoder) (err error) {
@@ -62,10 +63,10 @@ func (ms *MessageSet) Decode(pd enc.PacketDecoder) (err error) {
 	for pd.Remaining() > 0 {
 		msb := new(MessageBlock)
 		err = msb.Decode(pd)
-		switch err.(type) {
-		case nil:
+		switch {
+		case err == nil:
 			ms.Messages = append(ms.Messages, msb)
-		case enc.InsufficientData:
+		case err == enc.InsufficientData:
 			// As an optimization the server is allowed to return a partial message at the
 			// end of the message set. Clients should handle this case. So we just ignore such things.
 			ms.PartialTrailingMessage = true

+ 1 - 0
protocol/metadata_request.go

@@ -18,6 +18,7 @@ func (mr *MetadataRequest) Encode(pe enc.PacketEncoder) error {
 			return err
 		}
 	}
+	return nil
 }
 
 func (mr *MetadataRequest) key() int16 {

+ 8 - 5
protocol/metadata_response.go

@@ -1,20 +1,22 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type PartitionMetadata struct {
-	Err      KError
+	Err      types.KError
 	Id       int32
 	Leader   int32
 	Replicas []int32
 	Isr      []int32
 }
 
-func (pm *PartitionMetadata) decode(pd enc.PacketDecoder) (err error) {
-	pm.Err, err = pd.GetError()
+func (pm *PartitionMetadata) Decode(pd enc.PacketDecoder) (err error) {
+	tmp, err := pd.GetInt16()
 	if err != nil {
 		return err
 	}
+	pm.Err = types.KError(tmp)
 
 	pm.Id, err = pd.GetInt32()
 	if err != nil {
@@ -40,16 +42,17 @@ func (pm *PartitionMetadata) decode(pd enc.PacketDecoder) (err error) {
 }
 
 type TopicMetadata struct {
-	Err        KError
+	Err        types.KError
 	Name       string
 	Partitions []*PartitionMetadata
 }
 
 func (tm *TopicMetadata) Decode(pd enc.PacketDecoder) (err error) {
-	tm.Err, err = pd.GetError()
+	tmp, err := pd.GetInt16()
 	if err != nil {
 		return err
 	}
+	tm.Err = types.KError(tmp)
 
 	tm.Name, err = pd.GetString()
 	if err != nil {

+ 1 - 0
protocol/offset_commit_request.go

@@ -43,6 +43,7 @@ func (r *OffsetCommitRequest) Encode(pe enc.PacketEncoder) error {
 			}
 		}
 	}
+	return nil
 }
 
 func (r *OffsetCommitRequest) key() int16 {

+ 6 - 5
protocol/offset_commit_response.go

@@ -1,10 +1,11 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type OffsetCommitResponse struct {
 	ClientID string
-	Errors   map[string]map[int32]KError
+	Errors   map[string]map[int32]types.KError
 }
 
 func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) {
@@ -18,7 +19,7 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) {
 		return err
 	}
 
-	r.Errors = make(map[string]map[int32]KError, numTopics)
+	r.Errors = make(map[string]map[int32]types.KError, numTopics)
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.GetString()
 		if err != nil {
@@ -30,7 +31,7 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) {
 			return err
 		}
 
-		r.Errors[name] = make(map[int32]KError, numErrors)
+		r.Errors[name] = make(map[int32]types.KError, numErrors)
 
 		for j := 0; j < numErrors; j++ {
 			id, err := pd.GetInt32()
@@ -38,11 +39,11 @@ func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) {
 				return err
 			}
 
-			tmp, err := pd.GetError()
+			tmp, err := pd.GetInt16()
 			if err != nil {
 				return err
 			}
-			r.Errors[name][id] = tmp
+			r.Errors[name][id] = types.KError(tmp)
 		}
 	}
 

+ 1 - 0
protocol/offset_fetch_request.go

@@ -23,6 +23,7 @@ func (r *OffsetFetchRequest) Encode(pe enc.PacketEncoder) error {
 		}
 		pe.PutInt32Array(partitions)
 	}
+	return nil
 }
 
 func (r *OffsetFetchRequest) key() int16 {

+ 8 - 3
protocol/offset_fetch_response.go

@@ -1,11 +1,12 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type OffsetFetchResponseBlock struct {
 	Offset   int64
 	Metadata string
-	Err      KError
+	Err      types.KError
 }
 
 func (r *OffsetFetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
@@ -19,9 +20,13 @@ func (r *OffsetFetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
 		return err
 	}
 
-	r.Err, err = pd.GetError()
+	tmp, err := pd.GetInt16()
+	if err != nil {
+		return err
+	}
+	r.Err = types.KError(tmp)
 
-	return err
+	return nil
 }
 
 type OffsetFetchResponse struct {

+ 3 - 2
protocol/offset_request.go

@@ -8,8 +8,8 @@ type offsetRequestBlock struct {
 }
 
 func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
-	pe.putInt64(r.time)
-	pe.putInt32(r.maxOffsets)
+	pe.PutInt64(r.time)
+	pe.PutInt32(r.maxOffsets)
 	return nil
 }
 
@@ -40,6 +40,7 @@ func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
 			}
 		}
 	}
+	return nil
 }
 
 func (r *OffsetRequest) key() int16 {

+ 4 - 2
protocol/offset_response.go

@@ -1,17 +1,19 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type OffsetResponseBlock struct {
-	Err     KError
+	Err     types.KError
 	Offsets []int64
 }
 
 func (r *OffsetResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	r.Err, err = pd.GetError()
+	tmp, err := pd.GetInt16()
 	if err != nil {
 		return err
 	}
+	r.Err = types.KError(tmp)
 
 	r.Offsets, err = pd.GetInt64Array()
 

+ 3 - 2
protocol/produce_request.go

@@ -10,7 +10,7 @@ type ProduceRequest struct {
 }
 
 func (p *ProduceRequest) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt16(p.RequiredAcks)
+	pe.PutInt16(int16(p.RequiredAcks))
 	pe.PutInt32(p.Timeout)
 	err := pe.PutArrayLength(len(p.msgSets))
 	if err != nil {
@@ -27,7 +27,7 @@ func (p *ProduceRequest) Encode(pe enc.PacketEncoder) error {
 		}
 		for id, msgSet := range partitions {
 			pe.PutInt32(id)
-			pe.PushLength32()
+			pe.Push(&enc.LengthField{})
 			err = msgSet.Encode(pe)
 			if err != nil {
 				return err
@@ -38,6 +38,7 @@ func (p *ProduceRequest) Encode(pe enc.PacketEncoder) error {
 			}
 		}
 	}
+	return nil
 }
 
 func (p *ProduceRequest) key() int16 {

+ 4 - 2
protocol/produce_response.go

@@ -1,17 +1,19 @@
 package protocol
 
 import enc "sarama/encoding"
+import "sarama/types"
 
 type ProduceResponseBlock struct {
-	Err    KError
+	Err    types.KError
 	Offset int64
 }
 
 func (pr *ProduceResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	pr.Err, err = pd.GetError()
+	tmp, err := pd.GetInt16()
 	if err != nil {
 		return err
 	}
+	pr.Err = types.KError(tmp)
 
 	pr.Offset, err = pd.GetInt64()
 	if err != nil {

+ 2 - 2
protocol/request.go

@@ -14,8 +14,8 @@ type request struct {
 	body           requestEncoder
 }
 
-func (r *request) Encode(pe enc.PacketEncoder) error {
-	pe.Push(&LengthField{})
+func (r *request) Encode(pe enc.PacketEncoder) (err error) {
+	pe.Push(&enc.LengthField{})
 	pe.PutInt16(r.body.key())
 	pe.PutInt16(r.body.version())
 	pe.PutInt32(r.correlation_id)

+ 1 - 1
protocol/response_header.go

@@ -14,7 +14,7 @@ func (r *responseHeader) Decode(pd enc.PacketDecoder) (err error) {
 		return err
 	}
 	if r.length <= 4 || r.length > 2*math.MaxUint16 {
-		return enc.DecodingError("Malformed length field.")
+		return enc.DecodingError
 	}
 
 	r.correlation_id, err = pd.GetInt32()