Evan Huus 12 anni fa
parent
commit
c9508e324a
11 ha cambiato i file con 228 aggiunte e 163 eliminazioni
  1. 4 4
      broker.go
  2. 22 17
      client.go
  3. 0 44
      fakeBuilder.go
  4. 22 24
      metadata.go
  5. 25 0
      metadataRequest.go
  6. 24 6
      packetDecoder.go
  7. 3 1
      packetEncoder.go
  8. 52 0
      prepEncoder.go
  9. 0 44
      realBuilder.go
  10. 54 0
      realEncoder.go
  11. 22 23
      topicMetadata.go

+ 4 - 4
broker.go

@@ -6,10 +6,10 @@ type broker struct {
 	port   int32
 }
 
-func (b *broker) build(pb packetBuilder) {
-	pb.putInt32(b.nodeId)
-	pb.putString(b.host)
-	pb.putInt32(b.port)
+func (b *broker) encode(pe packetEncoder) {
+	pe.putInt32(b.nodeId)
+	pe.putString(b.host)
+	pe.putInt32(b.port)
 }
 
 func (b *broker) decode(pd *packetDecoder) (err error) {

+ 22 - 17
client.go

@@ -1,6 +1,7 @@
 package kafka
 
 import (
+	"errors"
 	"encoding/binary"
 	"math"
 	"net"
@@ -106,23 +107,32 @@ func (client *Client) rcvResponseLoop() {
 	}
 }
 
+func (client *Client) encode(api API, body []byte, pe packetEncoder) {
+	pe.putInt32(int32(len(body)))
+	pe.putInt16(api.key)
+	pe.putInt16(api.version)
+	pe.putInt32(client.correlation_id)
+	pe.putString(client.id)
+	pe.putRaw(body)
+}
+
 func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
-	idLen, err := stringLength(client.id)
-	if err != nil {
-		return nil, err
+	var prepEnc prepEncoder
+	var realEnc realEncoder
+
+	client.encode(api, body, &prepEnc)
+	if prepEnc.err {
+		return nil, errors.New("kafka encoding error")
 	}
+
+	realEnc.raw = make([]byte, prepEnc.length)
+	client.encode(api, body, &realEnc)
+
 	// we buffer one packet so that we can send our packet to the request queue without
 	// blocking, and so that the responses can be sent to us async if we want them
 	request := reqResPair{client.correlation_id, make(chan []byte, 1)}
-	buf := make([]byte, 8+idLen+len(body))
-	off := 0
-	off = encodeInt32(buf, off, int32(len(buf)))
-	off = encodeInt16(buf, off, api.key)
-	off = encodeInt16(buf, off, api.version)
-	off = encodeInt32(buf, off, client.correlation_id)
-	off = encodeString(buf, off, client.id)
-	copy(buf[off:], body)
-	request.packets <- buf
+
+	request.packets <- realEnc.raw
 	client.requests <- request
 	client.correlation_id++
 	return request.packets, nil
@@ -145,8 +155,3 @@ func (client *Client) sendMetadataRequest(topics []string) (chan []byte, error)
 	}
 	return client.sendRequest(REQUEST_METADATA, buf)
 }
-
-func (client *Client) parseMetadataResponse(buf []byte) (m *metadata, err error) {
-	_, err = m.decode(buf, 0)
-	return
-}

+ 0 - 44
fakeBuilder.go

@@ -1,44 +0,0 @@
-package kafka
-
-import "math"
-
-type fakeBuilder struct {
-	length int
-	err    bool
-}
-
-func (fb *fakeBuilder) putInt16(in int16) {
-	fb.length += 2
-}
-
-func (fb *fakeBuilder) putInt32(in int32) {
-	fb.length += 4
-}
-
-func (fb *fakeBuilder) putError(in kafkaError) {
-	fb.length += 2
-}
-
-func (fb *fakeBuilder) putString(in *string) {
-	fb.length += 2
-	if in == nil {
-		return
-	}
-	if len(*in) > math.MaxInt16 {
-		fb.err = true
-	} else {
-		fb.length += len(*in)
-	}
-}
-
-func (fb *fakeBuilder) putBytes(in *[]byte) {
-	fb.length += 4
-	if in == nil {
-		return
-	}
-	if len(*in) > math.MaxInt32 {
-		fb.err = true
-	} else {
-		fb.length += len(*in)
-	}
-}

+ 22 - 24
metadata.go

@@ -5,37 +5,35 @@ type metadata struct {
 	topics  []topicMetadata
 }
 
-func (m *metadata) length() (int, error) {
-	length := 4
+func (m *metadata) encode(pe packetEncoder) {
+	pe.putInt32(int32(len(m.brokers)))
 	for i := range m.brokers {
-		tmp, err := (&m.brokers[i]).length()
-		if err != nil {
-			return -1, err
-		}
-		length += tmp
+		(&m.brokers[i]).encode(pe)
 	}
-	length += 4
+	pe.putInt32(int32(len(m.topics)))
 	for i := range m.topics {
-		tmp, err := (&m.topics[i]).length()
-		if err != nil {
-			return -1, err
-		}
-		length += tmp
+		(&m.topics[i]).encode(pe)
 	}
-	return length, nil
 }
 
-func (m *metadata) encode(buf []byte, off int) int {
-	off = encodeInt32(buf, off, int32(len(m.brokers)))
-	for i := range m.brokers {
-		off = (&m.brokers[i]).encode(buf, off)
+func (m *metadata) decode(pd *packetDecoder) (err error) {
+	n, err := pd.getArrayCount()
+	if err != nil { return err }
+
+	m.brokers = make([]broker, n)
+	for i := 0; i<n; i++ {
+		err = (&m.brokers[i]).decode(pd)
+		if err != nil { return err }
 	}
-	off = encodeInt32(buf, off, int32(len(m.topics)))
-	for i := range m.topics {
-		off = (&m.topics[i]).encode(buf, off)
+
+	n, err = pd.getArrayCount()
+	if err != nil { return err }
+
+	m.topics = make([]topicMetadata, n)
+	for i := 0; i<n; i++ {
+		err = (&m.topics[i]).decode(pd)
+		if err != nil { return err }
 	}
-	return off
-}
 
-func (m *metadata) decode(buf []byte, off int) (int, error) {
+	return nil
 }

+ 25 - 0
metadataRequest.go

@@ -0,0 +1,25 @@
+package kafka
+
+type metadataRequest struct {
+	topics []*string
+}
+
+func (mr *metadataRequest) encode(pe packetEncoder) {
+	pe.putArrayCount(len(mr.topics))
+	for i := range mr.topics {
+		pe.putString(mr.topics[i])
+	}
+}
+
+func (mr *metadataRequest) decode(pd *packetDecoder) (err error) {
+	n, err := pd.getArrayCount()
+	if err != nil { return err }
+
+	mr.topics = make([]*string, n)
+	for i := 0; i<n; i++ {
+		mr.topics[i], err = pd.getString()
+		if err != nil { return err }
+	}
+
+	return nil
+}

+ 24 - 6
packetDecoder.go

@@ -1,6 +1,10 @@
 package kafka
 
-import "errors"
+import (
+	"encoding/binary"
+	"errors"
+	"math"
+)
 
 type packetDecoder struct {
 	raw []byte
@@ -16,8 +20,8 @@ func (pd *packetDecoder) getInt16() (int16, error) {
 		return -1, errors.New("kafka getInt16: not enough data")
 	}
 	tmp := int16(binary.BigEndian.Uint16(pd.raw[pd.off:]))
-	off += 2
-	return tmp
+	pd.off += 2
+	return tmp, nil
 }
 
 func (pd *packetDecoder) getInt32() (int32, error) {
@@ -25,8 +29,20 @@ func (pd *packetDecoder) getInt32() (int32, error) {
 		return -1, errors.New("kafka getInt32: not enough data")
 	}
 	tmp := int32(binary.BigEndian.Uint32(pd.raw[pd.off:]))
-	off += 4
-	return tmp
+	pd.off += 4
+	return tmp, nil
+}
+
+func (pd *packetDecoder) getArrayCount() (int, error) {
+	if pd.avail() < 4 {
+		return -1, errors.New("kafka getArrayCount: not enough data")
+	}
+	tmp := int(binary.BigEndian.Uint32(pd.raw[pd.off:]))
+	pd.off += 4
+	if tmp > pd.avail() || tmp > 2*math.MaxUint16 {
+		return -1, errors.New("kafka getArrayCount: unreasonably long array")
+	}
+	return tmp, nil
 }
 
 func (pd *packetDecoder) getError() (kafkaError, error) {
@@ -53,6 +69,8 @@ func (pd *packetDecoder) getString() (*string, error) {
 	case n > pd.avail():
 		return nil, errors.New("kafka getString: not enough data")
 	default:
-		return string(pd.raw[pd.off:pd.off+n]), nil
+		tmp := new(string)
+		*tmp = string(pd.raw[pd.off:pd.off+n])
+		return tmp, nil
 	}
 }

+ 3 - 1
packetBuilder.go → packetEncoder.go

@@ -1,9 +1,11 @@
 package kafka
 
-type packetBuilder interface {
+type packetEncoder interface {
 	putInt16(in int16)
 	putInt32(in int32)
 	putError(in kafkaError)
 	putString(in *string)
 	putBytes(in *[]byte)
+	putRaw(in []byte)
+	putArrayCount(in int)
 }

+ 52 - 0
prepEncoder.go

@@ -0,0 +1,52 @@
+package kafka
+
+import "math"
+
+type prepEncoder struct {
+	length int
+	err    bool
+}
+
+func (pe *prepEncoder) putInt16(in int16) {
+	pe.length += 2
+}
+
+func (pe *prepEncoder) putInt32(in int32) {
+	pe.length += 4
+}
+
+func (pe *prepEncoder) putError(in kafkaError) {
+	pe.length += 2
+}
+
+func (pe *prepEncoder) putString(in *string) {
+	pe.length += 2
+	if in == nil {
+		return
+	}
+	if len(*in) > math.MaxInt16 {
+		pe.err = true
+	} else {
+		pe.length += len(*in)
+	}
+}
+
+func (pe *prepEncoder) putBytes(in *[]byte) {
+	pe.length += 4
+	if in == nil {
+		return
+	}
+	if len(*in) > math.MaxInt32 {
+		pe.err = true
+	} else {
+		pe.length += len(*in)
+	}
+}
+
+func (pe *prepEncoder) putRaw(in []byte) {
+	pe.length += len(in)
+}
+
+func (pe *prepEncoder) putArrayCount(in int) {
+	pe.length += 4
+}

+ 0 - 44
realBuilder.go

@@ -1,44 +0,0 @@
-package kafka
-
-import "encoding/binary"
-
-type realBuilder struct {
-	raw []byte
-	off int
-}
-
-func (rb *realBuilder) putInt16(in int16) {
-	binary.BigEndian.PutUint16(b.raw[b.off:], uint16(in))
-	rb.off += 2
-}
-
-func (rb *realBuilder) putInt32(in int32) {
-	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
-	rb.off += 4
-}
-
-func (rb *realBuilder) putError(in kafkaError) {
-	rb.putInt16(int16(in))
-}
-
-func (rb *realBuilder) putString(in *string) {
-	if in == nil {
-		rb.putInt16(-1)
-		return
-	}
-	rb.putInt16(int16(len(*in)))
-	rb.off += 2
-	copy(rb.raw[off:], *in)
-	rb.off += len(*in)
-}
-
-func (rb *realBuilder) putBytes(in *[]byte) {
-	if in == nil {
-		rb.putInt32(-1)
-		return
-	}
-	rb.putInt32(int32(len(*in)))
-	rb.off += 4
-	copy(rb.raw[off:], *in)
-	rb.off += len(*in)
-}

+ 54 - 0
realEncoder.go

@@ -0,0 +1,54 @@
+package kafka
+
+import "encoding/binary"
+
+type realEncoder struct {
+	raw []byte
+	off int
+}
+
+func (re *realEncoder) putInt16(in int16) {
+	binary.BigEndian.PutUint16(b.raw[b.off:], uint16(in))
+	re.off += 2
+}
+
+func (re *realEncoder) putInt32(in int32) {
+	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
+	re.off += 4
+}
+
+func (re *realEncoder) putError(in kafkaError) {
+	re.putInt16(int16(in))
+}
+
+func (re *realEncoder) putString(in *string) {
+	if in == nil {
+		re.putInt16(-1)
+		return
+	}
+	re.putInt16(int16(len(*in)))
+	re.off += 2
+	copy(re.raw[off:], *in)
+	re.off += len(*in)
+}
+
+func (re *realEncoder) putBytes(in *[]byte) {
+	if in == nil {
+		re.putInt32(-1)
+		return
+	}
+	re.putInt32(int32(len(*in)))
+	re.off += 4
+	copy(re.raw[off:], *in)
+	re.off += len(*in)
+}
+
+func (re *realEncoder) putRaw(in []byte) {
+	copy(re.raw[off:], in)
+	re.off += len(in)
+}
+
+func (re *realEncoder) putArrayCount(in int) {
+	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
+	re.off += 4
+}

+ 22 - 23
topicMetadata.go

@@ -6,32 +6,31 @@ type topicMetadata struct {
 	partitions []partitionMetadata
 }
 
-func (tm *topicMetadata) length() (int, error) {
-	length := 2
-	length, err := stringLength(tm.name)
-	if err != nil {
-		return -1, err
+func (tm *topicMetadata) encode(pe packetEncoder) {
+        pe.putError(tm.err)
+	pe.putString(tm.name)
+	pe.putInt32(int32(len(m.partitions)))
+	for i := range m.partitions {
+		(&m.partitions[i]).encode(pe)
 	}
-	length += 4
-	for i := range tm.partitions {
-		tmp, err := (&tm.partitions[i]).length()
-		if err != nil {
-			return -1, err
-		}
-		length += tmp
-	}
-	return length, nil
 }
 
-func (tm *topicMetadata) encode(buf []byte, off int) int {
-	off = encodeError(buf, off, tm.err)
-	off = encodeString(buf, off, tm.name)
-	off = encodeInt32(buf, off, int32(len(tm.partitions)))
-	for i := range tm.partitions {
-		off = (&tm.partitions[i]).encode(buf, off)
+func (tm *topicMetadata) decode(pd *packetDecoder) (err error) {
+	n, err := pd.getArrayCount()
+	if err != nil { return err }
+
+	m.brokers = make([]broker, n)
+	for i := 0; i<n; i++ {
+		err = (&m.brokers[i]).decode(pd)
+		if err != nil { return err }
 	}
-	return off
-}
 
-func (tm *topicMetadata) decode(buf []byte, off int) (int, error) {
+	n, err = pd.getArrayCount()
+	if err != nil { return err }
+
+	m.topics = make([]topic, n)
+	for i := 0; i<n; i++ {
+		err = (&m.topics[i]).decode(pd)
+		if err != nil { return err }
+	}
 }