decoder.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package wal
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "hash"
  6. "io"
  7. "github.com/coreos/etcd/crc"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. )
  10. type decoder struct {
  11. br *bufio.Reader
  12. c io.Closer
  13. crc hash.Hash32
  14. }
  15. func newDecoder(rc io.ReadCloser) *decoder {
  16. return &decoder{
  17. br: bufio.NewReader(rc),
  18. c: rc,
  19. crc: crc.New(0, crcTable),
  20. }
  21. }
  22. func (d *decoder) decode(rec *Record) error {
  23. rec.Reset()
  24. l, err := readInt64(d.br)
  25. if err != nil {
  26. return err
  27. }
  28. data := make([]byte, l)
  29. if _, err = io.ReadFull(d.br, data); err != nil {
  30. return err
  31. }
  32. if err := rec.Unmarshal(data); err != nil {
  33. return err
  34. }
  35. // skip crc checking if the record type is crcType
  36. if rec.Type == crcType {
  37. return nil
  38. }
  39. d.crc.Write(rec.Data)
  40. return rec.validate(d.crc.Sum32())
  41. }
  42. func (d *decoder) updateCRC(prevCrc uint32) {
  43. d.crc = crc.New(prevCrc, crcTable)
  44. }
  45. func (d *decoder) lastCRC() uint32 {
  46. return d.crc.Sum32()
  47. }
  48. func (d *decoder) close() error {
  49. return d.c.Close()
  50. }
  51. func mustUnmarshalInfo(d []byte) raftpb.Info {
  52. var i raftpb.Info
  53. if err := i.Unmarshal(d); err != nil {
  54. // crc matched, but we cannot unmarshal the struct?!
  55. // we must be the next winner of the $1B lottery.
  56. panic(err)
  57. }
  58. return i
  59. }
  60. func mustUnmarshalEntry(d []byte) raftpb.Entry {
  61. var e raftpb.Entry
  62. if err := e.Unmarshal(d); err != nil {
  63. panic(err)
  64. }
  65. return e
  66. }
  67. func mustUnmarshalState(d []byte) raftpb.State {
  68. var s raftpb.State
  69. if err := s.Unmarshal(d); err != nil {
  70. panic(err)
  71. }
  72. return s
  73. }
  74. func readInt64(r io.Reader) (int64, error) {
  75. var n int64
  76. err := binary.Read(r, binary.LittleEndian, &n)
  77. return n, err
  78. }