Browse Source

wip checkpoint

proper packetEncoder/packetDecoder interfaces now
Evan Huus 11 years ago
parent
commit
5ab53c6457
11 changed files with 170 additions and 171 deletions
  1. 1 1
      broker.go
  2. 1 19
      client.go
  3. 1 1
      metadata.go
  4. 1 1
      metadataRequest.go
  5. 7 73
      packetDecoder.go
  6. 0 1
      packetEncoder.go
  7. 40 45
      partitionMetadata.go
  8. 0 4
      prepEncoder.go
  9. 101 0
      realDecoder.go
  10. 5 11
      realEncoder.go
  11. 13 15
      topicMetadata.go

+ 1 - 1
broker.go

@@ -12,7 +12,7 @@ func (b *broker) encode(pe packetEncoder) {
 	pe.putInt32(b.port)
 }
 
-func (b *broker) decode(pd *packetDecoder) (err error) {
+func (b *broker) decode(pd packetDecoder) (err error) {
 	b.nodeId, err = pd.getInt32()
 	if err != nil {
 		return err

+ 1 - 19
client.go

@@ -113,7 +113,7 @@ func (client *Client) encode(api API, body []byte, pe packetEncoder) {
 	pe.putInt16(api.version)
 	pe.putInt32(client.correlation_id)
 	pe.putString(client.id)
-	pe.putRaw(body)
+	//pe.putRaw(body)
 }
 
 func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
@@ -137,21 +137,3 @@ func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
 	client.correlation_id++
 	return request.packets, nil
 }
-
-func (client *Client) sendMetadataRequest(topics []string) (chan []byte, error) {
-	bufLen := 4
-	for i := range topics {
-		tmp, err := stringLength(&topics[i])
-		if err != nil {
-			return nil, err
-		}
-		bufLen += tmp
-	}
-	buf := make([]byte, bufLen)
-	off := 0
-	off = encodeInt32(buf, off, int32(len(topics)))
-	for i := range topics {
-		off = encodeString(buf, off, &topics[i])
-	}
-	return client.sendRequest(REQUEST_METADATA, buf)
-}

+ 1 - 1
metadata.go

@@ -16,7 +16,7 @@ func (m *metadata) encode(pe packetEncoder) {
 	}
 }
 
-func (m *metadata) decode(pd *packetDecoder) (err error) {
+func (m *metadata) decode(pd packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
 	if err != nil {
 		return err

+ 1 - 1
metadataRequest.go

@@ -11,7 +11,7 @@ func (mr *metadataRequest) encode(pe packetEncoder) {
 	}
 }
 
-func (mr *metadataRequest) decode(pd *packetDecoder) (err error) {
+func (mr *metadataRequest) decode(pd packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
 	if err != nil {
 		return err

+ 7 - 73
packetDecoder.go

@@ -1,76 +1,10 @@
 package kafka
 
-import (
-	"encoding/binary"
-	"errors"
-	"math"
-)
-
-type packetDecoder struct {
-	raw []byte
-	off int
-}
-
-func (pd *packetDecoder) avail() int {
-	return len(pd.raw) - pd.off
-}
-
-func (pd *packetDecoder) getInt16() (int16, error) {
-	if pd.avail() < 2 {
-		return -1, errors.New("kafka getInt16: not enough data")
-	}
-	tmp := int16(binary.BigEndian.Uint16(pd.raw[pd.off:]))
-	pd.off += 2
-	return tmp, nil
-}
-
-func (pd *packetDecoder) getInt32() (int32, error) {
-	if pd.avail() < 4 {
-		return -1, errors.New("kafka getInt32: not enough data")
-	}
-	tmp := int32(binary.BigEndian.Uint32(pd.raw[pd.off:]))
-	pd.off += 4
-	return tmp, nil
-}
-
-func (pd *packetDecoder) getArrayCount() (int, error) {
-	if pd.avail() < 4 {
-		return -1, errors.New("kafka getArrayCount: not enough data")
-	}
-	tmp := int(binary.BigEndian.Uint32(pd.raw[pd.off:]))
-	pd.off += 4
-	if tmp > pd.avail() || tmp > 2*math.MaxUint16 {
-		return -1, errors.New("kafka getArrayCount: unreasonably long array")
-	}
-	return tmp, nil
-}
-
-func (pd *packetDecoder) getError() (kafkaError, error) {
-	val, err := pd.getInt16()
-	return kafkaError(val), err
-}
-
-func (pd *packetDecoder) getString() (*string, error) {
-	tmp, err := pd.getInt16()
-
-	if err != nil {
-		return nil, err
-	}
-
-	n := int(tmp)
-
-	switch {
-	case n < -1:
-		return nil, errors.New("kafka getString: invalid negative length")
-	case n == -1:
-		return nil, nil
-	case n == 0:
-		return new(string), nil
-	case n > pd.avail():
-		return nil, errors.New("kafka getString: not enough data")
-	default:
-		tmp := new(string)
-		*tmp = string(pd.raw[pd.off : pd.off+n])
-		return tmp, nil
-	}
+type packetDecoder interface {
+	getInt16() (int16, error)
+	getInt32() (int32, error)
+	getError() (kafkaError, error)
+	getString() (*string, error)
+	getBytes() (*[]byte, error)
+	getArrayCount() (int, error)
 }

+ 0 - 1
packetEncoder.go

@@ -6,6 +6,5 @@ type packetEncoder interface {
 	putError(in kafkaError)
 	putString(in *string)
 	putBytes(in *[]byte)
-	putRaw(in []byte)
 	putArrayCount(in int)
 }

+ 40 - 45
partitionMetadata.go

@@ -1,7 +1,5 @@
 package kafka
 
-import "errors"
-
 type partitionMetadata struct {
 	err      kafkaError
 	id       int32
@@ -10,64 +8,61 @@ type partitionMetadata struct {
 	isr      []int32
 }
 
-func (pm *partitionMetadata) length() (int, error) {
-	length := 6
-
-	length += 4
-	length += len(pm.replicas) * 4
-
-	length += 4
-	length += len(pm.isr) * 4
-
-	return length, nil
-}
-
-func (pm *partitionMetadata) encode(buf []byte, off int) int {
-	off = encodeError(buf, off, pm.err)
-	off = encodeInt32(buf, off, pm.id)
-	off = encodeInt32(buf, off, pm.leader)
+func (pm *partitionMetadata) encode(pe packetEncoder) {
+	pe.putError(pm.err)
+	pe.putInt32(pm.id)
+	pe.putInt32(pm.leader)
 
-	off = encodeInt32(buf, off, int32(len(pm.replicas)))
+	pe.putArrayCount(len(pm.replicas))
 	for _, val := range pm.replicas {
-		off = encodeInt32(buf, off, val)
+		pe.putInt32(val)
 	}
 
-	off = encodeInt32(buf, off, int32(len(pm.isr)))
+	pe.putArrayCount(len(pm.isr))
 	for _, val := range pm.isr {
-		off = encodeInt32(buf, off, val)
+		pe.putInt32(val)
 	}
-
-	return off
 }
 
-func (pm *partitionMetadata) decode(buf []byte, off int) (int, error) {
-	if len(buf)-off < 14 {
-		return -1, errors.New("kafka decode: not enough data")
+func (pm *partitionMetadata) decode(pd packetDecoder) (err error) {
+	pm.err, err = pd.getError()
+	if err != nil {
+		return err
 	}
 
-	pm.err, off = decodeError(buf, off)
-	pm.id, off = decodeInt32(buf, off)
-	pm.leader, off = decodeInt32(buf, off)
+	pm.id, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	pm.leader, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
 
-	tmp, off := decodeInt32(buf, off)
-	length := int(tmp)
-	if length > (len(buf)-off)/4 {
-		return -1, errors.New("kafka decode: not enough data")
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
 	}
-	pm.replicas = make([]int32, length)
-	for i := 0; i < length; i++ {
-		pm.replicas[i], off = decodeInt32(buf, off)
+	pm.replicas = make([]int32, n)
+	for i := 0; i < n; i++ {
+		pm.replicas[i], err = pd.getInt32()
+		if err != nil {
+			return err
+		}
 	}
 
-	tmp, off = decodeInt32(buf, off)
-	length = int(tmp)
-	if length > (len(buf)-off)/4 {
-		return -1, errors.New("kafka decode: not enough data")
+	n, err = pd.getArrayCount()
+	if err != nil {
+		return err
 	}
-	pm.isr = make([]int32, length)
-	for i := 0; i < length; i++ {
-		pm.isr[i], off = decodeInt32(buf, off)
+	pm.isr = make([]int32, n)
+	for i := 0; i < n; i++ {
+		pm.isr[i], err = pd.getInt32()
+		if err != nil {
+			return err
+		}
 	}
 
-	return off, nil
+	return nil
 }

+ 0 - 4
prepEncoder.go

@@ -43,10 +43,6 @@ func (pe *prepEncoder) putBytes(in *[]byte) {
 	}
 }
 
-func (pe *prepEncoder) putRaw(in []byte) {
-	pe.length += len(in)
-}
-
 func (pe *prepEncoder) putArrayCount(in int) {
 	pe.length += 4
 }

+ 101 - 0
realDecoder.go

@@ -0,0 +1,101 @@
+package kafka
+
+import (
+	"encoding/binary"
+	"errors"
+	"math"
+)
+
+type realDecoder struct {
+	raw []byte
+	off int
+}
+
+func (rd *realDecoder) avail() int {
+	return len(rd.raw) - rd.off
+}
+
+func (rd *realDecoder) getInt16() (int16, error) {
+	if rd.avail() < 2 {
+		return -1, errors.New("kafka getInt16: not enough data")
+	}
+	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
+	rd.off += 2
+	return tmp, nil
+}
+
+func (rd *realDecoder) getInt32() (int32, error) {
+	if rd.avail() < 4 {
+		return -1, errors.New("kafka getInt32: not enough data")
+	}
+	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+	return tmp, nil
+}
+
+func (rd *realDecoder) getError() (kafkaError, error) {
+	val, err := rd.getInt16()
+	return kafkaError(val), err
+}
+
+func (rd *realDecoder) getString() (*string, error) {
+	tmp, err := rd.getInt16()
+
+	if err != nil {
+		return nil, err
+	}
+
+	n := int(tmp)
+
+	switch {
+	case n < -1:
+		return nil, errors.New("kafka getString: invalid negative length")
+	case n == -1:
+		return nil, nil
+	case n == 0:
+		return new(string), nil
+	case n > rd.avail():
+		return nil, errors.New("kafka getString: not enough data")
+	default:
+		tmp := new(string)
+		*tmp = string(rd.raw[rd.off : rd.off+n])
+		return tmp, nil
+	}
+}
+
+func (rd *realDecoder) getBytes() (*[]byte, error) {
+	tmp, err := rd.getInt32()
+
+	if err != nil {
+		return nil, err
+	}
+
+	n := int(tmp)
+
+	switch {
+	case n < -1:
+		return nil, errors.New("kafka getBytes: invalid negative length")
+	case n == -1:
+		return nil, nil
+	case n == 0:
+		tmp := make([]byte, 0)
+		return &tmp, nil
+	case n > rd.avail():
+		return nil, errors.New("kafka getString: not enough data")
+	default:
+		tmp := rd.raw[rd.off : rd.off+n]
+		return &tmp, nil
+	}
+}
+
+func (rd *realDecoder) getArrayCount() (int, error) {
+	if rd.avail() < 4 {
+		return -1, errors.New("kafka getArrayCount: not enough data")
+	}
+	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+	if tmp > rd.avail() || tmp > 2*math.MaxUint16 {
+		return -1, errors.New("kafka getArrayCount: unreasonably long array")
+	}
+	return tmp, nil
+}

+ 5 - 11
realEncoder.go

@@ -8,12 +8,12 @@ type realEncoder struct {
 }
 
 func (re *realEncoder) putInt16(in int16) {
-	binary.BigEndian.PutUint16(b.raw[b.off:], uint16(in))
+	binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
 	re.off += 2
 }
 
 func (re *realEncoder) putInt32(in int32) {
-	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
+	binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
 	re.off += 4
 }
 
@@ -28,7 +28,7 @@ func (re *realEncoder) putString(in *string) {
 	}
 	re.putInt16(int16(len(*in)))
 	re.off += 2
-	copy(re.raw[off:], *in)
+	copy(re.raw[re.off:], *in)
 	re.off += len(*in)
 }
 
@@ -39,16 +39,10 @@ func (re *realEncoder) putBytes(in *[]byte) {
 	}
 	re.putInt32(int32(len(*in)))
 	re.off += 4
-	copy(re.raw[off:], *in)
+	copy(re.raw[re.off:], *in)
 	re.off += len(*in)
 }
 
-func (re *realEncoder) putRaw(in []byte) {
-	copy(re.raw[off:], in)
-	re.off += len(in)
-}
-
 func (re *realEncoder) putArrayCount(in int) {
-	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
-	re.off += 4
+	re.putInt32(int32(in))
 }

+ 13 - 15
topicMetadata.go

@@ -9,36 +9,34 @@ type topicMetadata struct {
 func (tm *topicMetadata) encode(pe packetEncoder) {
 	pe.putError(tm.err)
 	pe.putString(tm.name)
-	pe.putInt32(int32(len(m.partitions)))
-	for i := range m.partitions {
-		(&m.partitions[i]).encode(pe)
+	pe.putArrayCount(len(tm.partitions))
+	for i := range tm.partitions {
+		(&tm.partitions[i]).encode(pe)
 	}
 }
 
-func (tm *topicMetadata) decode(pd *packetDecoder) (err error) {
-	n, err := pd.getArrayCount()
+func (tm *topicMetadata) decode(pd packetDecoder) (err error) {
+	tm.err, err = pd.getError()
 	if err != nil {
 		return err
 	}
 
-	m.brokers = make([]broker, n)
-	for i := 0; i < n; i++ {
-		err = (&m.brokers[i]).decode(pd)
-		if err != nil {
-			return err
-		}
+	tm.name, err = pd.getString()
+	if err != nil {
+		return err
 	}
 
-	n, err = pd.getArrayCount()
+	n, err := pd.getArrayCount()
 	if err != nil {
 		return err
 	}
-
-	m.topics = make([]topic, n)
+	tm.partitions = make([]partitionMetadata, n)
 	for i := 0; i < n; i++ {
-		err = (&m.topics[i]).decode(pd)
+		err = (&tm.partitions[i]).decode(pd)
 		if err != nil {
 			return err
 		}
 	}
+
+	return nil
 }