浏览代码

Factor out common array encoding/decoding

Evan Huus 12 年之前
父节点
当前提交
fe73334510
共有 6 个文件被更改,包括 91 次插入45 次删除
  1. 7 1
      packet_decoder.go
  2. 7 1
      packet_encoder.go
  3. 4 26
      partition_metadata.go
  4. 16 3
      prep_encoder.go
  5. 39 11
      real_decoder.go
  6. 18 3
      real_encoder.go

+ 7 - 1
packet_decoder.go

@@ -3,16 +3,22 @@ package kafka
 type packetDecoder interface {
 	remaining() int
 
+	// primitives
 	getInt8() (int8, error)
 	getInt16() (int16, error)
 	getInt32() (int32, error)
 	getInt64() (int64, error)
 
+	// arrays
+	getInt32Array() ([]int32, error)
+	getArrayCount() (int, error)
+
+	// misc
 	getError() (KError, error)
 	getString() (*string, error)
 	getBytes() (*[]byte, error)
-	getArrayCount() (int, error)
 
+	// stackable
 	push(in pushDecoder) error
 	pushLength32() error
 	pushCRC32() error

+ 7 - 1
packet_encoder.go

@@ -1,16 +1,22 @@
 package kafka
 
 type packetEncoder interface {
+	// primitives
 	putInt8(in int8)
 	putInt16(in int16)
 	putInt32(in int32)
 	putInt64(in int64)
 
+	// arrays
+	putInt32Array(in []int32)
+	putArrayCount(in int)
+
+	// misc
 	putError(in KError)
 	putString(in *string)
 	putBytes(in *[]byte)
-	putArrayCount(in int)
 
+	// stackable
 	push(in pushEncoder)
 	pushLength32()
 	pushCRC32()

+ 4 - 26
partition_metadata.go

@@ -12,16 +12,8 @@ func (pm *partitionMetadata) encode(pe packetEncoder) {
 	pe.putError(pm.err)
 	pe.putInt32(pm.id)
 	pe.putInt32(pm.leader)
-
-	pe.putArrayCount(len(pm.replicas))
-	for _, val := range pm.replicas {
-		pe.putInt32(val)
-	}
-
-	pe.putArrayCount(len(pm.isr))
-	for _, val := range pm.isr {
-		pe.putInt32(val)
-	}
+	pe.putInt32Array(pm.replicas)
+	pe.putInt32Array(pm.isr)
 }
 
 func (pm *partitionMetadata) decode(pd packetDecoder) (err error) {
@@ -40,29 +32,15 @@ func (pm *partitionMetadata) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	n, err := pd.getArrayCount()
+	pm.replicas, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}
-	pm.replicas = make([]int32, n)
-	for i := 0; i < n; i++ {
-		pm.replicas[i], err = pd.getInt32()
-		if err != nil {
-			return err
-		}
-	}
 
-	n, err = pd.getArrayCount()
+	pm.isr, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}
-	pm.isr = make([]int32, n)
-	for i := 0; i < n; i++ {
-		pm.isr[i], err = pd.getInt32()
-		if err != nil {
-			return err
-		}
-	}
 
 	return nil
 }

+ 16 - 3
prep_encoder.go

@@ -7,6 +7,8 @@ type prepEncoder struct {
 	err    error
 }
 
+// primitives
+
 func (pe *prepEncoder) putInt8(in int8) {
 	pe.length += 1
 }
@@ -23,6 +25,19 @@ func (pe *prepEncoder) putInt64(in int64) {
 	pe.length += 8
 }
 
+// arrays
+
+func (pe *prepEncoder) putInt32Array(in []int32) {
+	pe.length += 4
+	pe.length += 4 * len(in)
+}
+
+func (pe *prepEncoder) putArrayCount(in int) {
+	pe.length += 4
+}
+
+// misc
+
 func (pe *prepEncoder) putError(in KError) {
 	pe.length += 2
 }
@@ -51,9 +66,7 @@ func (pe *prepEncoder) putBytes(in *[]byte) {
 	}
 }
 
-func (pe *prepEncoder) putArrayCount(in int) {
-	pe.length += 4
-}
+// stackable
 
 func (pe *prepEncoder) push(in pushEncoder) {
 	pe.length += in.reserveLength()

+ 39 - 11
real_decoder.go

@@ -15,6 +15,8 @@ func (rd *realDecoder) remaining() int {
 	return len(rd.raw) - rd.off
 }
 
+// primitives
+
 func (rd *realDecoder) getInt8() (int8, error) {
 	if rd.remaining() < 1 {
 		return -1, DecodingError{}
@@ -51,6 +53,42 @@ func (rd *realDecoder) getInt64() (int64, error) {
 	return tmp, nil
 }
 
+// arrays
+
+func (rd *realDecoder) getInt32Array() ([]int32, error) {
+	if rd.remaining() < 4 {
+		return nil, DecodingError{}
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	var ret []int32 = nil
+	if rd.remaining() < 4*n {
+		return nil, DecodingError{}
+	} else if n > 0 {
+		ret = make([]int32, n)
+		for i := range ret {
+			ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+			rd.off += 4
+		}
+	}
+	return ret, nil
+}
+
+func (rd *realDecoder) getArrayCount() (int, error) {
+	if rd.remaining() < 4 {
+		return -1, DecodingError{}
+	}
+	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+	if tmp > rd.remaining() || tmp > 2*math.MaxUint16 {
+		return -1, DecodingError{}
+	}
+	return tmp, nil
+}
+
+// misc
+
 func (rd *realDecoder) getError() (KError, error) {
 	val, err := rd.getInt16()
 	return KError(val), err
@@ -108,17 +146,7 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 	}
 }
 
-func (rd *realDecoder) getArrayCount() (int, error) {
-	if rd.remaining() < 4 {
-		return -1, DecodingError{}
-	}
-	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
-	rd.off += 4
-	if tmp > rd.remaining() || tmp > 2*math.MaxUint16 {
-		return -1, DecodingError{}
-	}
-	return tmp, nil
-}
+// stackable
 
 func (rd *realDecoder) push(in pushDecoder) error {
 	in.saveOffset(rd.off)

+ 18 - 3
real_encoder.go

@@ -8,6 +8,8 @@ type realEncoder struct {
 	stack []pushEncoder
 }
 
+// primitives
+
 func (re *realEncoder) putInt8(in int8) {
 	re.raw[re.off] = byte(in)
 	re.off += 1
@@ -28,6 +30,21 @@ func (re *realEncoder) putInt64(in int64) {
 	re.off += 8
 }
 
+// arrays
+
+func (re *realEncoder) putInt32Array(in []int32) {
+	re.putArrayCount(len(in))
+	for _, val := range in {
+		re.putInt32(val)
+	}
+}
+
+func (re *realEncoder) putArrayCount(in int) {
+	re.putInt32(int32(in))
+}
+
+// misc
+
 func (re *realEncoder) putError(in KError) {
 	re.putInt16(int16(in))
 }
@@ -52,9 +69,7 @@ func (re *realEncoder) putBytes(in *[]byte) {
 	re.off += len(*in)
 }
 
-func (re *realEncoder) putArrayCount(in int) {
-	re.putInt32(int32(in))
-}
+// stackable
 
 func (re *realEncoder) push(in pushEncoder) {
 	in.saveOffset(re.off)