Evan Huus 12 anni fa
parent
commit
559f4d0b2e
11 ha cambiato i file con 358 aggiunte e 111 eliminazioni
  1. 26 0
      broker.go
  2. 7 2
      client.go
  3. 19 0
      errors.go
  4. 44 0
      fakeBuilder.go
  5. 41 0
      metadata.go
  6. 9 0
      packetBuilder.go
  7. 58 0
      packetDecoder.go
  8. 73 0
      partitionMetadata.go
  9. 0 109
      protocol.go
  10. 44 0
      realBuilder.go
  11. 37 0
      topicMetadata.go

+ 26 - 0
broker.go

@@ -0,0 +1,26 @@
+package kafka
+
+type broker struct {
+	nodeId int32
+	host   *string
+	port   int32
+}
+
+func (b *broker) build(pb packetBuilder) {
+	pb.putInt32(b.nodeId)
+	pb.putString(b.host)
+	pb.putInt32(b.port)
+}
+
+func (b *broker) decode(pd *packetDecoder) (err error) {
+	b.nodeId, err = pd.getInt32()
+	if err != nil { return err }
+
+	b.host, err = pd.getString()
+	if err != nil { return err }
+
+	b.port, err = pd.getInt32()
+	if err != nil { return err }
+
+	return nil
+}

+ 7 - 2
client.go

@@ -107,7 +107,7 @@ func (client *Client) rcvResponseLoop() {
 }
 
 func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
-	idLen, err := stringLen(client.id)
+	idLen, err := stringLength(client.id)
 	if err != nil {
 		return nil, err
 	}
@@ -131,7 +131,7 @@ func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
 func (client *Client) sendMetadataRequest(topics []string) (chan []byte, error) {
 	bufLen := 4
 	for i := range topics {
-		tmp, err := stringLen(&topics[i])
+		tmp, err := stringLength(&topics[i])
 		if err != nil {
 			return nil, err
 		}
@@ -145,3 +145,8 @@ func (client *Client) sendMetadataRequest(topics []string) (chan []byte, error)
 	}
 	return client.sendRequest(REQUEST_METADATA, buf)
 }
+
+func (client *Client) parseMetadataResponse(buf []byte) (m *metadata, err error) {
+	_, err = m.decode(buf, 0)
+	return
+}

+ 19 - 0
errors.go

@@ -0,0 +1,19 @@
+package kafka
+
+type kafkaError int16
+
+const (
+	NO_ERROR                    kafkaError = 0
+	UNKNOWN                                = -1
+	OFFSET_OUT_OF_RANGE                    = 1
+	INVALID_MESSAGE                        = 2
+	UNKNOWN_TOPIC_OR_PARTITION             = 3
+	INVALID_MESSAGE_SIZE                   = 4
+	LEADER_NOT_AVAILABLE                   = 5
+	NOT_LEADER_FOR_PARTITION               = 6
+	REQUEST_TIMED_OUT                      = 7
+	BROKER_NOT_AVAILABLE                   = 8
+	REPLICA_NOT_AVAILABLE                  = 9
+	MESSAGE_SIZE_TOO_LARGE                 = 10
+	STALE_CONTROLLER_EPOCH_CODE            = 11
+)

+ 44 - 0
fakeBuilder.go

@@ -0,0 +1,44 @@
+package kafka
+
+import "math"
+
+type fakeBuilder struct {
+	length int
+	err    bool
+}
+
+func (fb *fakeBuilder) putInt16(in int16) {
+	fb.length += 2
+}
+
+func (fb *fakeBuilder) putInt32(in int32) {
+	fb.length += 4
+}
+
+func (fb *fakeBuilder) putError(in kafkaError) {
+	fb.length += 2
+}
+
+func (fb *fakeBuilder) putString(in *string) {
+	fb.length += 2
+	if in == nil {
+		return
+	}
+	if len(*in) > math.MaxInt16 {
+		fb.err = true
+	} else {
+		fb.length += len(*in)
+	}
+}
+
+func (fb *fakeBuilder) putBytes(in *[]byte) {
+	fb.length += 4
+	if in == nil {
+		return
+	}
+	if len(*in) > math.MaxInt32 {
+		fb.err = true
+	} else {
+		fb.length += len(*in)
+	}
+}

+ 41 - 0
metadata.go

@@ -0,0 +1,41 @@
+package kafka
+
+type metadata struct {
+	brokers []broker
+	topics  []topicMetadata
+}
+
+func (m *metadata) length() (int, error) {
+	length := 4
+	for i := range m.brokers {
+		tmp, err := (&m.brokers[i]).length()
+		if err != nil {
+			return -1, err
+		}
+		length += tmp
+	}
+	length += 4
+	for i := range m.topics {
+		tmp, err := (&m.topics[i]).length()
+		if err != nil {
+			return -1, err
+		}
+		length += tmp
+	}
+	return length, nil
+}
+
+func (m *metadata) encode(buf []byte, off int) int {
+	off = encodeInt32(buf, off, int32(len(m.brokers)))
+	for i := range m.brokers {
+		off = (&m.brokers[i]).encode(buf, off)
+	}
+	off = encodeInt32(buf, off, int32(len(m.topics)))
+	for i := range m.topics {
+		off = (&m.topics[i]).encode(buf, off)
+	}
+	return off
+}
+
+func (m *metadata) decode(buf []byte, off int) (int, error) {
+}

+ 9 - 0
packetBuilder.go

@@ -0,0 +1,9 @@
+package kafka
+
+type packetBuilder interface {
+	putInt16(in int16)
+	putInt32(in int32)
+	putError(in kafkaError)
+	putString(in *string)
+	putBytes(in *[]byte)
+}

+ 58 - 0
packetDecoder.go

@@ -0,0 +1,58 @@
+package kafka
+
+import "errors"
+
+type packetDecoder struct {
+	raw []byte
+	off int
+}
+
+func (pd *packetDecoder) avail() int {
+	return len(pd.raw) - pd.off
+}
+
+func (pd *packetDecoder) getInt16() (int16, error) {
+	if pd.avail() < 2 {
+		return -1, errors.New("kafka getInt16: not enough data")
+	}
+	tmp := int16(binary.BigEndian.Uint16(pd.raw[pd.off:]))
+	off += 2
+	return tmp
+}
+
+func (pd *packetDecoder) getInt32() (int32, error) {
+	if pd.avail() < 4 {
+		return -1, errors.New("kafka getInt32: not enough data")
+	}
+	tmp := int32(binary.BigEndian.Uint32(pd.raw[pd.off:]))
+	off += 4
+	return tmp
+}
+
+func (pd *packetDecoder) getError() (kafkaError, error) {
+	val, err := pd.getInt16()
+	return kafkaError(val), err
+}
+
+func (pd *packetDecoder) getString() (*string, error) {
+	tmp, err := pd.getInt16()
+
+	if err != nil {
+		return nil, err
+	}
+
+	n := int(tmp)
+
+	switch {
+	case n < -1:
+		return nil, errors.New("kafka getString: invalid negative length")
+	case n == -1:
+		return nil, nil
+	case n == 0:
+		return new(string), nil
+	case n > pd.avail():
+		return nil, errors.New("kafka getString: not enough data")
+	default:
+		return string(pd.raw[pd.off:pd.off+n]), nil
+	}
+}

+ 73 - 0
partitionMetadata.go

@@ -0,0 +1,73 @@
+package kafka
+
+import "errors"
+
+type partitionMetadata struct {
+	err      kafkaError
+	id       int32
+	leader   int32
+	replicas []int32
+	isr      []int32
+}
+
+func (pm *partitionMetadata) length() (int, error) {
+	length := 6
+
+	length += 4
+	length += len(pm.replicas) * 4
+
+	length += 4
+	length += len(pm.isr) * 4
+
+	return length, nil
+}
+
+func (pm *partitionMetadata) encode(buf []byte, off int) int {
+	off = encodeError(buf, off, pm.err)
+	off = encodeInt32(buf, off, pm.id)
+	off = encodeInt32(buf, off, pm.leader)
+
+	off = encodeInt32(buf, off, int32(len(pm.replicas)))
+	for _, val := range pm.replicas {
+		off = encodeInt32(buf, off, val)
+	}
+
+	off = encodeInt32(buf, off, int32(len(pm.isr)))
+	for _, val := range pm.isr {
+		off = encodeInt32(buf, off, val)
+	}
+
+	return off
+}
+
+func (pm *partitionMetadata) decode(buf []byte, off int) (int, error) {
+	if len(buf)-off < 14 {
+		return -1, errors.New("kafka decode: not enough data")
+	}
+
+	pm.err, off = decodeError(buf, off)
+	pm.id, off = decodeInt32(buf, off)
+	pm.leader, off = decodeInt32(buf, off)
+
+	tmp, off := decodeInt32(buf, off)
+	length := int(tmp)
+	if length > (len(buf)-off)/4 {
+		return -1, errors.New("kafka decode: not enough data")
+	}
+	pm.replicas = make([]int32, length)
+	for i := 0; i < length; i++ {
+		pm.replicas[i], off = decodeInt32(buf, off)
+	}
+
+	tmp, off = decodeInt32(buf, off)
+	length = int(tmp)
+	if length > (len(buf)-off)/4 {
+		return -1, errors.New("kafka decode: not enough data")
+	}
+	pm.isr = make([]int32, length)
+	for i := 0; i < length; i++ {
+		pm.isr[i], off = decodeInt32(buf, off)
+	}
+
+	return off, nil
+}

+ 0 - 109
protocol.go

@@ -1,109 +0,0 @@
-package kafka
-
-import (
-	"encoding/binary"
-	"errors"
-	"math"
-)
-
-func encodeInt16(buf []byte, off int, in int16) int {
-	binary.BigEndian.PutUint16(buf[off:], uint16(in))
-	return off + 2
-}
-
-func encodeInt32(buf []byte, off int, in int32) int {
-	binary.BigEndian.PutUint32(buf[off:], uint32(in))
-	return off + 4
-}
-
-func stringLen(in *string) (n int, err error) {
-	if in == nil {
-		return 2, nil
-	}
-	n = len(*in)
-	if n > math.MaxInt16 {
-		return -1, errors.New("kafka: string too long to encode")
-	}
-	return 2 + n, nil
-}
-
-func encodeString(buf []byte, off int, in *string) int {
-	n := -1
-	if in != nil {
-		n = len(*in)
-	}
-	binary.BigEndian.PutUint16(buf[off:], uint16(n))
-	off += 2
-	if n > 0 {
-		copy(buf[off:], *in)
-	}
-	off += n
-	return off
-}
-
-func decodeString(buf []byte) (out *string, err error) {
-	if len(buf) < 2 {
-		return nil, errors.New("kafka: buffer too short to contain string")
-	}
-	n := int16(binary.BigEndian.Uint16(buf))
-	switch {
-	case n < -1:
-		return nil, errors.New("kafka: invalid negative string length")
-	case n == -1:
-		return nil, nil
-	case n == 0:
-		emptyString := ""
-		return &emptyString, nil
-	case int(n) > len(buf)-2:
-		return nil, errors.New("kafka: buffer too short to decode string")
-	default:
-		result := string(buf[2:])
-		return &result, nil
-	}
-}
-
-func bytesLen(in *[]byte) (n int, err error) {
-	if in == nil {
-		return 4, nil
-	}
-	n = len(*in)
-	if n > math.MaxInt32 {
-		return -1, errors.New("kafka: bytes too long to encode")
-	}
-	return 4 + n, nil
-}
-
-func encodeBytes(buf []byte, off int, in *[]byte) int {
-	n := -1
-	if in != nil {
-		n = len(*in)
-	}
-	binary.BigEndian.PutUint32(buf[off:], uint32(n))
-	off += 4
-	if n > 0 {
-		copy(buf[off:], *in)
-	}
-	off += n
-	return off
-}
-
-func decodeBytes(buf []byte) (out *[]byte, err error) {
-	if len(buf) < 4 {
-		return nil, errors.New("kafka: buffer too short to contain bytes")
-	}
-	n := int32(binary.BigEndian.Uint32(buf))
-	switch {
-	case n < -1:
-		return nil, errors.New("kafka: invalid negative byte length")
-	case n == -1:
-		return nil, nil
-	case n == 0:
-		emptyBytes := make([]byte, 0)
-		return &emptyBytes, nil
-	case int(n) > len(buf)-4:
-		return nil, errors.New("kafka: buffer too short to decode bytes")
-	default:
-		result := buf[4:]
-		return &result, nil
-	}
-}

+ 44 - 0
realBuilder.go

@@ -0,0 +1,44 @@
+package kafka
+
+import "encoding/binary"
+
+type realBuilder struct {
+	raw []byte
+	off int
+}
+
+func (rb *realBuilder) putInt16(in int16) {
+	binary.BigEndian.PutUint16(b.raw[b.off:], uint16(in))
+	rb.off += 2
+}
+
+func (rb *realBuilder) putInt32(in int32) {
+	binary.BigEndian.PutUint32(b.raw[b.off:], uint32(in))
+	rb.off += 4
+}
+
+func (rb *realBuilder) putError(in kafkaError) {
+	rb.putInt16(int16(in))
+}
+
+func (rb *realBuilder) putString(in *string) {
+	if in == nil {
+		rb.putInt16(-1)
+		return
+	}
+	rb.putInt16(int16(len(*in)))
+	rb.off += 2
+	copy(rb.raw[off:], *in)
+	rb.off += len(*in)
+}
+
+func (rb *realBuilder) putBytes(in *[]byte) {
+	if in == nil {
+		rb.putInt32(-1)
+		return
+	}
+	rb.putInt32(int32(len(*in)))
+	rb.off += 4
+	copy(rb.raw[off:], *in)
+	rb.off += len(*in)
+}

+ 37 - 0
topicMetadata.go

@@ -0,0 +1,37 @@
+package kafka
+
+type topicMetadata struct {
+	err        kafkaError
+	name       *string
+	partitions []partitionMetadata
+}
+
+func (tm *topicMetadata) length() (int, error) {
+	length := 2
+	length, err := stringLength(tm.name)
+	if err != nil {
+		return -1, err
+	}
+	length += 4
+	for i := range tm.partitions {
+		tmp, err := (&tm.partitions[i]).length()
+		if err != nil {
+			return -1, err
+		}
+		length += tmp
+	}
+	return length, nil
+}
+
+func (tm *topicMetadata) encode(buf []byte, off int) int {
+	off = encodeError(buf, off, tm.err)
+	off = encodeString(buf, off, tm.name)
+	off = encodeInt32(buf, off, int32(len(tm.partitions)))
+	for i := range tm.partitions {
+		off = (&tm.partitions[i]).encode(buf, off)
+	}
+	return off
+}
+
+func (tm *topicMetadata) decode(buf []byte, off int) (int, error) {
+}