Evan Huus 12 роки тому
батько
коміт
07a8b70445
10 змінених файлів з 129 додано та 145 видалено
  1. 7 7
      crc32_field.go
  2. 10 18
      encoder_decoder.go
  3. 1 1
      errors.go
  4. 6 6
      length_field.go
  5. 19 19
      packet_decoder.go
  6. 16 16
      packet_encoder.go
  7. 13 13
      prep_encoder.go
  8. 34 34
      real_decoder.go
  9. 21 21
      real_encoder.go
  10. 2 10
      sarama.go

+ 7 - 7
encoding/crc32_field.go → crc32_field.go

@@ -1,30 +1,30 @@
-package encoding
+package kafka
 
 import (
 	"encoding/binary"
 	"hash/crc32"
 )
 
-// CRC32Field implements the PushEncoder and PushDecoder interfaces for calculating CRC32s.
-type CRC32Field struct {
+// crc32field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
+type crc32field struct {
 	startOffset int
 }
 
-func (c *CRC32Field) SaveOffset(in int) {
+func (c *crc32field) saveOffset(in int) {
 	c.startOffset = in
 }
 
-func (c *CRC32Field) ReserveLength() int {
+func (c *crc32field) reserveLength() int {
 	return 4
 }
 
-func (c *CRC32Field) Run(curOffset int, buf []byte) error {
+func (c *crc32field) run(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
 	return nil
 }
 
-func (c *CRC32Field) Check(curOffset int, buf []byte) error {
+func (c *crc32field) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {

+ 10 - 18
encoding/encoder_decoder.go → encoder_decoder.go

@@ -1,21 +1,13 @@
-/*
-Package encoding provides an API for dealing with data that is encoded using Kafka's
-encoding rules.
-
-Kafka uses a custom set of encoding rules for arrays, strings, and other non-trivial data structures.
-This package implements encoders and decoders for Go types in this format, as well as broader helper
-functions for encoding entire structs a field at a time.
-*/
-package encoding
+package kafka
 
 // Encoder is the interface that wraps the basic Encode method.
 // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
-type Encoder interface {
-	Encode(pe PacketEncoder) error
+type encoder interface {
+	encode(pe packetEncoder) error
 }
 
 // Encode takes an Encoder and turns it into bytes.
-func Encode(in Encoder) ([]byte, error) {
+func encode(in encoder) ([]byte, error) {
 	if in == nil {
 		return nil, nil
 	}
@@ -23,13 +15,13 @@ func Encode(in Encoder) ([]byte, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 
-	err := in.Encode(&prepEnc)
+	err := in.encode(&prepEnc)
 	if err != nil {
 		return nil, err
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length)
-	err = in.Encode(&realEnc)
+	err = in.encode(&realEnc)
 	if err != nil {
 		return nil, err
 	}
@@ -39,19 +31,19 @@ func Encode(in Encoder) ([]byte, error) {
 
 // Decoder is the interface that wraps the basic Decode method.
 // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
-type Decoder interface {
-	Decode(pd PacketDecoder) error
+type decoder interface {
+	decode(pd packetDecoder) error
 }
 
 // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
 // interpreted using Kafka's encoding rules.
-func Decode(buf []byte, in Decoder) error {
+func decode(buf []byte, in decoder) error {
 	if buf == nil {
 		return nil
 	}
 
 	helper := realDecoder{raw: buf}
-	err := in.Decode(&helper)
+	err := in.decode(&helper)
 	if err != nil {
 		return err
 	}

+ 1 - 1
encoding/errors.go → errors.go

@@ -1,4 +1,4 @@
-package encoding
+package kafka
 
 import "errors"
 

+ 6 - 6
encoding/length_field.go → length_field.go

@@ -1,26 +1,26 @@
-package encoding
+package kafka
 
 import "encoding/binary"
 
 // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
-type LengthField struct {
+type lengthField struct {
 	startOffset int
 }
 
-func (l *LengthField) SaveOffset(in int) {
+func (l *lengthField) saveOffset(in int) {
 	l.startOffset = in
 }
 
-func (l *LengthField) ReserveLength() int {
+func (l *lengthField) reserveLength() int {
 	return 4
 }
 
-func (l *LengthField) Run(curOffset int, buf []byte) error {
+func (l *lengthField) run(curOffset int, buf []byte) error {
 	binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
 	return nil
 }
 
-func (l *LengthField) Check(curOffset int, buf []byte) error {
+func (l *lengthField) check(curOffset int, buf []byte) error {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
 		return DecodingError
 	}

+ 19 - 19
encoding/packet_decoder.go → packet_decoder.go

@@ -1,44 +1,44 @@
-package encoding
+package kafka
 
 // PacketDecoder is the interface providing helpers for reading with Kafka's encoding rules.
 // Types implementing Decoder only need to worry about calling methods like GetString,
 // not about how a string is represented in Kafka.
-type PacketDecoder interface {
+type packetDecoder interface {
 	// Primitives
-	GetInt8() (int8, error)
-	GetInt16() (int16, error)
-	GetInt32() (int32, error)
-	GetInt64() (int64, error)
-	GetArrayLength() (int, error)
+	getInt8() (int8, error)
+	getInt16() (int16, error)
+	getInt32() (int32, error)
+	getInt64() (int64, error)
+	getArrayLength() (int, error)
 
 	// Collections
-	GetBytes() ([]byte, error)
-	GetString() (string, error)
-	GetInt32Array() ([]int32, error)
-	GetInt64Array() ([]int64, error)
+	getBytes() ([]byte, error)
+	getString() (string, error)
+	getInt32Array() ([]int32, error)
+	getInt64Array() ([]int64, error)
 
 	// Subsets
-	Remaining() int
-	GetSubset(length int) (PacketDecoder, error)
+	remaining() int
+	getSubset(length int) (packetDecoder, error)
 
 	// Stacks, see PushDecoder
-	Push(in PushDecoder) error
-	Pop() error
+	push(in pushDecoder) error
+	pop() error
 }
 
 // PushDecoder is the interface for decoding fields like CRCs and lengths where the validity
 // of the field depends on what is after it in the packet. Start them with PacketDecoder.Push() where
 // the actual value is located in the packet, then PacketDecoder.Pop() them when all the bytes they
 // depend upon have been decoded.
-type PushDecoder interface {
+type pushDecoder interface {
 	// Saves the offset into the input buffer as the location to actually read the calculated value when able.
-	SaveOffset(in int)
+	saveOffset(in int)
 
 	// Returns the length of data to reserve for the input of this encoder (eg 4 bytes for a CRC32).
-	ReserveLength() int
+	reserveLength() int
 
 	// Indicates that all required data is now available to calculate and check the field.
 	// SaveOffset is guaranteed to have been called first. The implementation should read ReserveLength() bytes
 	// of data from the saved offset, and verify it based on the data between the saved offset and curOffset.
-	Check(curOffset int, buf []byte) error
+	check(curOffset int, buf []byte) error
 }

+ 16 - 16
encoding/packet_encoder.go → packet_encoder.go

@@ -1,39 +1,39 @@
-package encoding
+package kafka
 
 // PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules.
 // Types implementing Encoder only need to worry about calling methods like PutString,
 // not about how a string is represented in Kafka.
-type PacketEncoder interface {
+type packetEncoder interface {
 	// Primitives
-	PutInt8(in int8)
-	PutInt16(in int16)
-	PutInt32(in int32)
-	PutInt64(in int64)
-	PutArrayLength(in int) error
+	putInt8(in int8)
+	putInt16(in int16)
+	putInt32(in int32)
+	putInt64(in int64)
+	putArrayLength(in int) error
 
 	// Collections
-	PutBytes(in []byte) error
-	PutString(in string) error
-	PutInt32Array(in []int32) error
+	putBytes(in []byte) error
+	putString(in string) error
+	putInt32Array(in []int32) error
 
 	// Stacks, see PushEncoder
-	Push(in PushEncoder)
-	Pop() error
+	push(in pushEncoder)
+	pop() error
 }
 
 // PushEncoder is the interface for encoding fields like CRCs and lengths where the value
 // of the field depends on what is encoded after it in the packet. Start them with PacketEncoder.Push() where
 // the actual value is located in the packet, then PacketEncoder.Pop() them when all the bytes they
 // depend upon have been written.
-type PushEncoder interface {
+type pushEncoder interface {
 	// Saves the offset into the input buffer as the location to actually write the calculated value when able.
-	SaveOffset(in int)
+	saveOffset(in int)
 
 	// Returns the length of data to reserve for the output of this encoder (eg 4 bytes for a CRC32).
-	ReserveLength() int
+	reserveLength() int
 
 	// Indicates that all required data is now available to calculate and write the field.
 	// SaveOffset is guaranteed to have been called first. The implementation should write ReserveLength() bytes
 	// of data to the saved offset, based on the data between the saved offset and curOffset.
-	Run(curOffset int, buf []byte) error
+	run(curOffset int, buf []byte) error
 }

+ 13 - 13
encoding/prep_encoder.go → prep_encoder.go

@@ -1,4 +1,4 @@
-package encoding
+package kafka
 
 import "math"
 
@@ -8,23 +8,23 @@ type prepEncoder struct {
 
 // primitives
 
-func (pe *prepEncoder) PutInt8(in int8) {
+func (pe *prepEncoder) putInt8(in int8) {
 	pe.length += 1
 }
 
-func (pe *prepEncoder) PutInt16(in int16) {
+func (pe *prepEncoder) putInt16(in int16) {
 	pe.length += 2
 }
 
-func (pe *prepEncoder) PutInt32(in int32) {
+func (pe *prepEncoder) putInt32(in int32) {
 	pe.length += 4
 }
 
-func (pe *prepEncoder) PutInt64(in int64) {
+func (pe *prepEncoder) putInt64(in int64) {
 	pe.length += 8
 }
 
-func (pe *prepEncoder) PutArrayLength(in int) error {
+func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
 		return EncodingError
 	}
@@ -34,7 +34,7 @@ func (pe *prepEncoder) PutArrayLength(in int) error {
 
 // arrays
 
-func (pe *prepEncoder) PutBytes(in []byte) error {
+func (pe *prepEncoder) putBytes(in []byte) error {
 	pe.length += 4
 	if in == nil {
 		return nil
@@ -46,7 +46,7 @@ func (pe *prepEncoder) PutBytes(in []byte) error {
 	return nil
 }
 
-func (pe *prepEncoder) PutString(in string) error {
+func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
 		return EncodingError
@@ -55,8 +55,8 @@ func (pe *prepEncoder) PutString(in string) error {
 	return nil
 }
 
-func (pe *prepEncoder) PutInt32Array(in []int32) error {
-	err := pe.PutArrayLength(len(in))
+func (pe *prepEncoder) putInt32Array(in []int32) error {
+	err := pe.putArrayLength(len(in))
 	if err != nil {
 		return err
 	}
@@ -66,10 +66,10 @@ func (pe *prepEncoder) PutInt32Array(in []int32) error {
 
 // stackable
 
-func (pe *prepEncoder) Push(in PushEncoder) {
-	pe.length += in.ReserveLength()
+func (pe *prepEncoder) push(in pushEncoder) {
+	pe.length += in.reserveLength()
 }
 
-func (pe *prepEncoder) Pop() error {
+func (pe *prepEncoder) pop() error {
 	return nil
 }

+ 34 - 34
encoding/real_decoder.go → real_decoder.go

@@ -1,4 +1,4 @@
-package encoding
+package kafka
 
 import (
 	"encoding/binary"
@@ -8,13 +8,13 @@ import (
 type realDecoder struct {
 	raw   []byte
 	off   int
-	stack []PushDecoder
+	stack []pushDecoder
 }
 
 // primitives
 
-func (rd *realDecoder) GetInt8() (int8, error) {
-	if rd.Remaining() < 1 {
+func (rd *realDecoder) getInt8() (int8, error) {
+	if rd.remaining() < 1 {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	}
@@ -23,8 +23,8 @@ func (rd *realDecoder) GetInt8() (int8, error) {
 	return tmp, nil
 }
 
-func (rd *realDecoder) GetInt16() (int16, error) {
-	if rd.Remaining() < 2 {
+func (rd *realDecoder) getInt16() (int16, error) {
+	if rd.remaining() < 2 {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	}
@@ -33,8 +33,8 @@ func (rd *realDecoder) GetInt16() (int16, error) {
 	return tmp, nil
 }
 
-func (rd *realDecoder) GetInt32() (int32, error) {
-	if rd.Remaining() < 4 {
+func (rd *realDecoder) getInt32() (int32, error) {
+	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	}
@@ -43,8 +43,8 @@ func (rd *realDecoder) GetInt32() (int32, error) {
 	return tmp, nil
 }
 
-func (rd *realDecoder) GetInt64() (int64, error) {
-	if rd.Remaining() < 8 {
+func (rd *realDecoder) getInt64() (int64, error) {
+	if rd.remaining() < 8 {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	}
@@ -53,14 +53,14 @@ func (rd *realDecoder) GetInt64() (int64, error) {
 	return tmp, nil
 }
 
-func (rd *realDecoder) GetArrayLength() (int, error) {
-	if rd.Remaining() < 4 {
+func (rd *realDecoder) getArrayLength() (int, error) {
+	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
-	if tmp > rd.Remaining() {
+	if tmp > rd.remaining() {
 		rd.off = len(rd.raw)
 		return -1, InsufficientData
 	} else if tmp > 2*math.MaxUint16 {
@@ -71,8 +71,8 @@ func (rd *realDecoder) GetArrayLength() (int, error) {
 
 // collections
 
-func (rd *realDecoder) GetBytes() ([]byte, error) {
-	tmp, err := rd.GetInt32()
+func (rd *realDecoder) getBytes() ([]byte, error) {
+	tmp, err := rd.getInt32()
 
 	if err != nil {
 		return nil, err
@@ -87,7 +87,7 @@ func (rd *realDecoder) GetBytes() ([]byte, error) {
 		return nil, nil
 	case n == 0:
 		return make([]byte, 0), nil
-	case n > rd.Remaining():
+	case n > rd.remaining():
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	default:
@@ -97,8 +97,8 @@ func (rd *realDecoder) GetBytes() ([]byte, error) {
 	}
 }
 
-func (rd *realDecoder) GetString() (string, error) {
-	tmp, err := rd.GetInt16()
+func (rd *realDecoder) getString() (string, error) {
+	tmp, err := rd.getInt16()
 
 	if err != nil {
 		return "", err
@@ -113,7 +113,7 @@ func (rd *realDecoder) GetString() (string, error) {
 		return "", nil
 	case n == 0:
 		return "", nil
-	case n > rd.Remaining():
+	case n > rd.remaining():
 		rd.off = len(rd.raw)
 		return "", InsufficientData
 	default:
@@ -123,8 +123,8 @@ func (rd *realDecoder) GetString() (string, error) {
 	}
 }
 
-func (rd *realDecoder) GetInt32Array() ([]int32, error) {
-	if rd.Remaining() < 4 {
+func (rd *realDecoder) getInt32Array() ([]int32, error) {
+	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	}
@@ -132,7 +132,7 @@ func (rd *realDecoder) GetInt32Array() ([]int32, error) {
 	rd.off += 4
 
 	var ret []int32 = nil
-	if rd.Remaining() < 4*n {
+	if rd.remaining() < 4*n {
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	} else if n > 0 {
@@ -145,8 +145,8 @@ func (rd *realDecoder) GetInt32Array() ([]int32, error) {
 	return ret, nil
 }
 
-func (rd *realDecoder) GetInt64Array() ([]int64, error) {
-	if rd.Remaining() < 4 {
+func (rd *realDecoder) getInt64Array() ([]int64, error) {
+	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	}
@@ -154,7 +154,7 @@ func (rd *realDecoder) GetInt64Array() ([]int64, error) {
 	rd.off += 4
 
 	var ret []int64 = nil
-	if rd.Remaining() < 8*n {
+	if rd.remaining() < 8*n {
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	} else if n > 0 {
@@ -169,12 +169,12 @@ func (rd *realDecoder) GetInt64Array() ([]int64, error) {
 
 // subsets
 
-func (rd *realDecoder) Remaining() int {
+func (rd *realDecoder) remaining() int {
 	return len(rd.raw) - rd.off
 }
 
-func (rd *realDecoder) GetSubset(length int) (PacketDecoder, error) {
-	if length > rd.Remaining() {
+func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
+	if length > rd.remaining() {
 		rd.off = len(rd.raw)
 		return nil, InsufficientData
 	}
@@ -186,11 +186,11 @@ func (rd *realDecoder) GetSubset(length int) (PacketDecoder, error) {
 
 // stacks
 
-func (rd *realDecoder) Push(in PushDecoder) error {
-	in.SaveOffset(rd.off)
+func (rd *realDecoder) push(in pushDecoder) error {
+	in.saveOffset(rd.off)
 
-	reserve := in.ReserveLength()
-	if rd.Remaining() < reserve {
+	reserve := in.reserveLength()
+	if rd.remaining() < reserve {
 		rd.off = len(rd.raw)
 		return InsufficientData
 	}
@@ -202,10 +202,10 @@ func (rd *realDecoder) Push(in PushDecoder) error {
 	return nil
 }
 
-func (rd *realDecoder) Pop() error {
+func (rd *realDecoder) pop() error {
 	// this is go's ugly pop pattern (the inverse of append)
 	in := rd.stack[len(rd.stack)-1]
 	rd.stack = rd.stack[:len(rd.stack)-1]
 
-	return in.Check(rd.off, rd.raw)
+	return in.check(rd.off, rd.raw)
 }

+ 21 - 21
encoding/real_encoder.go → real_encoder.go

@@ -1,80 +1,80 @@
-package encoding
+package kafka
 
 import "encoding/binary"
 
 type realEncoder struct {
 	raw   []byte
 	off   int
-	stack []PushEncoder
+	stack []pushEncoder
 }
 
 // primitives
 
-func (re *realEncoder) PutInt8(in int8) {
+func (re *realEncoder) putInt8(in int8) {
 	re.raw[re.off] = byte(in)
 	re.off += 1
 }
 
-func (re *realEncoder) PutInt16(in int16) {
+func (re *realEncoder) putInt16(in int16) {
 	binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
 	re.off += 2
 }
 
-func (re *realEncoder) PutInt32(in int32) {
+func (re *realEncoder) putInt32(in int32) {
 	binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
 	re.off += 4
 }
 
-func (re *realEncoder) PutInt64(in int64) {
+func (re *realEncoder) putInt64(in int64) {
 	binary.BigEndian.PutUint64(re.raw[re.off:], uint64(in))
 	re.off += 8
 }
 
-func (re *realEncoder) PutArrayLength(in int) error {
-	re.PutInt32(int32(in))
+func (re *realEncoder) putArrayLength(in int) error {
+	re.putInt32(int32(in))
 	return nil
 }
 
 // collection
 
-func (re *realEncoder) PutBytes(in []byte) error {
+func (re *realEncoder) putBytes(in []byte) error {
 	if in == nil {
-		re.PutInt32(-1)
+		re.putInt32(-1)
 		return nil
 	}
-	re.PutInt32(int32(len(in)))
+	re.putInt32(int32(len(in)))
 	copy(re.raw[re.off:], in)
 	re.off += len(in)
 	return nil
 }
 
-func (re *realEncoder) PutString(in string) error {
-	re.PutInt16(int16(len(in)))
+func (re *realEncoder) putString(in string) error {
+	re.putInt16(int16(len(in)))
 	copy(re.raw[re.off:], in)
 	re.off += len(in)
 	return nil
 }
 
-func (re *realEncoder) PutInt32Array(in []int32) error {
-	re.PutArrayLength(len(in))
+func (re *realEncoder) putInt32Array(in []int32) error {
+	re.putArrayLength(len(in))
 	for _, val := range in {
-		re.PutInt32(val)
+		re.putInt32(val)
 	}
 	return nil
 }
 
 // stacks
 
-func (re *realEncoder) Push(in PushEncoder) {
-	in.SaveOffset(re.off)
-	re.off += in.ReserveLength()
+func (re *realEncoder) push(in pushEncoder) {
+	in.saveOffset(re.off)
+	re.off += in.reserveLength()
 	re.stack = append(re.stack, in)
 }
 
-func (re *realEncoder) Pop() error {
+func (re *realEncoder) pop() error {
 	// this is go's ugly pop pattern (the inverse of append)
 	in := re.stack[len(re.stack)-1]
 	re.stack = re.stack[:len(re.stack)-1]
 
-	return in.Run(re.off, re.raw)
+	return in.run(re.off, re.raw)
 }

+ 2 - 10
sarama.go

@@ -1,12 +1,4 @@
 /*
-Package sarama provides client libraries for the Kafka 0.8 protocol.
-
-Package sarama is a dummy package, you almost certainly want sarama/kafka instead, which contains the high-level `userspace` API.
-
-The sarama/kafka package is built on sarama/protocol, which contains the lower-level API that gives you control over which requests get sent to which brokers.
-
-The sarama/protocol package is build on sarama/encoding, which implements the Kafka encoding rules for strings and other data structures.
-
-The sarama/mock package exposes some very basic helper functions for mocking a Kafka broker for testing purposes.
+Package kafka provides client libraries for the Kafka 0.8 protocol.
 */
-package sarama
+package kafka