浏览代码

Add support for Castagnoli polynomial for CRC32 computation

This is needed for Kafka 0.11 support as the new records' checksum is
computed with Castagnoli polynomial (previous versions of Kafka use
IEEE polynomial).
Vlad Hanciuta 8 年之前
父节点
当前提交
0d4d83d02a
共有 2 个文件被更改,包括 34 次插入4 次删除
  1. 32 2
      crc32_field.go
  2. 2 2
      message.go

+ 32 - 2
crc32_field.go

@@ -6,9 +6,17 @@ import (
 	"hash/crc32"
 	"hash/crc32"
 )
 )
 
 
+const (
+	crcIEEE = iota
+	crcCastagnoli
+)
+
+var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
+
 // crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
 // crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
 type crc32Field struct {
 type crc32Field struct {
 	startOffset int
 	startOffset int
+	polynomial  int
 }
 }
 
 
 func (c *crc32Field) saveOffset(in int) {
 func (c *crc32Field) saveOffset(in int) {
@@ -19,14 +27,24 @@ func (c *crc32Field) reserveLength() int {
 	return 4
 	return 4
 }
 }
 
 
+func newCRC32Field(polynomial int) *crc32Field {
+	return &crc32Field{polynomial: polynomial}
+}
+
 func (c *crc32Field) run(curOffset int, buf []byte) error {
 func (c *crc32Field) run(curOffset int, buf []byte) error {
-	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+	crc, err := c.crc(curOffset, buf)
+	if err != nil {
+		return err
+	}
 	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
 	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
 	return nil
 	return nil
 }
 }
 
 
 func (c *crc32Field) check(curOffset int, buf []byte) error {
 func (c *crc32Field) check(curOffset int, buf []byte) error {
-	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+	crc, err := c.crc(curOffset, buf)
+	if err != nil {
+		return err
+	}
 
 
 	expected := binary.BigEndian.Uint32(buf[c.startOffset:])
 	expected := binary.BigEndian.Uint32(buf[c.startOffset:])
 	if crc != expected {
 	if crc != expected {
@@ -35,3 +53,15 @@ func (c *crc32Field) check(curOffset int, buf []byte) error {
 
 
 	return nil
 	return nil
 }
 }
+func (c *crc32Field) crc(curOffset int, buf []byte) (uint32, error) {
+	var tab *crc32.Table
+	switch c.polynomial {
+	case crcIEEE:
+		tab = crc32.IEEETable
+	case crcCastagnoli:
+		tab = castagnoliTable
+	default:
+		return 0, PacketDecodingError{"invalid CRC type"}
+	}
+	return crc32.Checksum(buf[c.startOffset+4:curOffset], tab), nil
+}

+ 2 - 2
message.go

@@ -37,7 +37,7 @@ type Message struct {
 }
 }
 
 
 func (m *Message) encode(pe packetEncoder) error {
 func (m *Message) encode(pe packetEncoder) error {
-	pe.push(&crc32Field{})
+	pe.push(newCRC32Field(crcIEEE))
 
 
 	pe.putInt8(m.Version)
 	pe.putInt8(m.Version)
 
 
@@ -112,7 +112,7 @@ func (m *Message) encode(pe packetEncoder) error {
 }
 }
 
 
 func (m *Message) decode(pd packetDecoder) (err error) {
 func (m *Message) decode(pd packetDecoder) (err error) {
-	err = pd.push(&crc32Field{})
+	err = pd.push(newCRC32Field(crcIEEE))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}