Browse Source

Settle (for now) on how to encode/decode kafka values

Evan Huus 12 years ago
parent
commit
87ba3bf5c1
2 changed files with 126 additions and 33 deletions
  1. 13 33
      client.go
  2. 113 0
      protocol.go

+ 13 - 33
client.go

@@ -2,7 +2,6 @@ package kafka
 
 import (
 	"encoding/binary"
-	"math"
 	"net"
 )
 
@@ -26,7 +25,8 @@ var (
 )
 
 type Client struct {
-	addr, id       string
+	addr           string
+	id             *string
 	correlation_id int32
 	conn           net.Conn
 }
@@ -36,7 +36,7 @@ func NewClient(addr string) (client *Client, err error) {
 	if err != nil {
 		return nil, err
 	}
-	client = &Client{addr, "", 0, conn}
+	client = &Client{addr, nil, 0, conn}
 	return client, err
 }
 
@@ -75,40 +75,20 @@ func (client *Client) read() (buf []byte, err error) {
 	return buf, nil
 }
 
-func encodeString(in string) (buf []byte) {
-	size := len(in)
-	if size > math.MaxInt16 {
-		panic("string too long to encode") /* Or just return nil? */
-	}
-	buf = make([]byte, 2+size)
-	binary.BigEndian.PutUint16(buf, uint16(size))
-	if size > 0 {
-		copy(buf[2:], in)
-	}
-	return buf
-}
-
-func encodeBytes(in []byte) (buf []byte) {
-	size := len(in)
-	if size > math.MaxInt32 {
-		panic("bytes too long to encode") /* Or just return nil? */
-	}
-	buf = make([]byte, 4+size)
-	binary.BigEndian.PutUint32(buf, uint32(size))
-	if size > 0 {
-		copy(buf[4:], in)
-	}
-	return buf
-}
-
 func (client *Client) sendRequest(api *API, body []byte) (err error) {
-	id := encodeString(client.id)
-	buf := make([]byte, 4+len(id)+len(body))
+	idLen, err := stringLen(client.id)
+	if err != nil {
+		return err
+	}
+	buf := make([]byte, 4+idLen+len(body))
 	binary.BigEndian.PutUint16(buf[0:2], uint16(api.key))
 	binary.BigEndian.PutUint16(buf[2:4], uint16(api.version))
 	binary.BigEndian.PutUint32(buf[4:8], uint32(client.correlation_id))
 	client.correlation_id++
-	copy(buf[8:], id)
-	copy(buf[8+len(id):], body)
+	err = encodeString(client.id, buf[8:])
+	if err != nil {
+		return err
+	}
+	copy(buf[8+idLen:], body)
 	return client.write(buf)
 }

+ 113 - 0
protocol.go

@@ -0,0 +1,113 @@
+package kafka
+
+import (
+	"encoding/binary"
+	"errors"
+	"math"
+)
+
+func stringLen(in *string) (n int, err error) {
+	if in == nil {
+		return 2, nil
+	}
+	n = len(*in)
+	if n > math.MaxInt16 {
+		return -1, errors.New("kafka: string too long to encode")
+	}
+	return 2 + n, nil
+}
+
+func encodeString(in *string, buf []byte) (err error) {
+	if len(buf) < 2 {
+		return errors.New("kafka: buffer too short to encode any string")
+	}
+	n := -1
+	if in != nil {
+		n = len(*in)
+	}
+	if n > math.MaxInt16 {
+		return errors.New("kafka: string too long to encode")
+	}
+	if n > len(buf) {
+		return errors.New("kafka: buffer too short to encode string")
+	}
+	binary.BigEndian.PutUint16(buf, uint16(n))
+	if n > 0 {
+		copy(buf[2:], *in)
+	}
+	return nil
+}
+
+func decodeString(buf []byte) (out *string, err error) {
+	if len(buf) < 2 {
+		return nil, errors.New("kafka: buffer too short to contain string")
+	}
+	n := int16(binary.BigEndian.Uint16(buf))
+	switch {
+	case n < -1:
+		return nil, errors.New("kafka: invalid negative string length")
+	case n == -1:
+		return nil, nil
+	case n == 0:
+		emptyString := ""
+		return &emptyString, nil
+	case int(n) > len(buf)-2:
+		return nil, errors.New("kafka: buffer too short to decode string")
+	default:
+		result := string(buf[2:])
+		return &result, nil
+	}
+}
+
+func bytesLen(in *[]byte) (n int, err error) {
+	if in == nil {
+		return 4, nil
+	}
+	n = len(*in)
+	if n > math.MaxInt32 {
+		return -1, errors.New("kafka: bytes too long to encode")
+	}
+	return 4 + n, nil
+}
+
+func encodeBytes(in *[]byte, buf []byte) (err error) {
+	if len(buf) < 4 {
+		return errors.New("kafka: buffer too short to encode any bytes")
+	}
+	n := -1
+	if in != nil {
+		n = len(*in)
+	}
+	if n > math.MaxInt32 {
+		return errors.New("kafka: bytes too long to encode")
+	}
+	if n > len(buf) {
+		return errors.New("kafka: buffer too short to encode bytes")
+	}
+	binary.BigEndian.PutUint32(buf, uint32(n))
+	if n > 0 {
+		copy(buf[4:], *in)
+	}
+	return nil
+}
+
+func decodebyte(buf []byte) (out *[]byte, err error) {
+	if len(buf) < 4 {
+		return nil, errors.New("kafka: buffer too short to contain bytes")
+	}
+	n := int32(binary.BigEndian.Uint32(buf))
+	switch {
+	case n < -1:
+		return nil, errors.New("kafka: invalid negative byte length")
+	case n == -1:
+		return nil, nil
+	case n == 0:
+		emptyBytes := make([]byte, 0)
+		return &emptyBytes, nil
+	case int(n) > len(buf)-4:
+		return nil, errors.New("kafka: buffer too short to decode bytes")
+	default:
+		result := buf[4:]
+		return &result, nil
+	}
+}