decoder.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package wal
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "hash"
  6. "io"
  7. "github.com/coreos/etcd/crc"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. "github.com/coreos/etcd/wal/walpb"
  10. )
  11. type decoder struct {
  12. br *bufio.Reader
  13. c io.Closer
  14. crc hash.Hash32
  15. }
  16. func newDecoder(rc io.ReadCloser) *decoder {
  17. return &decoder{
  18. br: bufio.NewReader(rc),
  19. c: rc,
  20. crc: crc.New(0, crcTable),
  21. }
  22. }
  23. func (d *decoder) decode(rec *walpb.Record) error {
  24. rec.Reset()
  25. l, err := readInt64(d.br)
  26. if err != nil {
  27. return err
  28. }
  29. data := make([]byte, l)
  30. if _, err = io.ReadFull(d.br, data); err != nil {
  31. return err
  32. }
  33. if err := rec.Unmarshal(data); err != nil {
  34. return err
  35. }
  36. // skip crc checking if the record type is crcType
  37. if rec.Type == crcType {
  38. return nil
  39. }
  40. d.crc.Write(rec.Data)
  41. return rec.Validate(d.crc.Sum32())
  42. }
  43. func (d *decoder) updateCRC(prevCrc uint32) {
  44. d.crc = crc.New(prevCrc, crcTable)
  45. }
  46. func (d *decoder) lastCRC() uint32 {
  47. return d.crc.Sum32()
  48. }
  49. func (d *decoder) close() error {
  50. return d.c.Close()
  51. }
  52. func mustUnmarshalInfo(d []byte) raftpb.Info {
  53. var i raftpb.Info
  54. if err := i.Unmarshal(d); err != nil {
  55. // crc matched, but we cannot unmarshal the struct?!
  56. // we must be the next winner of the $1B lottery.
  57. panic(err)
  58. }
  59. return i
  60. }
  61. func mustUnmarshalEntry(d []byte) raftpb.Entry {
  62. var e raftpb.Entry
  63. if err := e.Unmarshal(d); err != nil {
  64. panic(err)
  65. }
  66. return e
  67. }
  68. func mustUnmarshalState(d []byte) raftpb.HardState {
  69. var s raftpb.HardState
  70. if err := s.Unmarshal(d); err != nil {
  71. panic(err)
  72. }
  73. return s
  74. }
  75. func readInt64(r io.Reader) (int64, error) {
  76. var n int64
  77. err := binary.Read(r, binary.LittleEndian, &n)
  78. return n, err
  79. }