decoder.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package wal
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "hash"
  6. "io"
  7. "github.com/coreos/etcd/pkg/crc"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. "github.com/coreos/etcd/wal/walpb"
  10. )
  11. type decoder struct {
  12. br *bufio.Reader
  13. c io.Closer
  14. crc hash.Hash32
  15. }
  16. func newDecoder(rc io.ReadCloser) *decoder {
  17. return &decoder{
  18. br: bufio.NewReader(rc),
  19. c: rc,
  20. crc: crc.New(0, crcTable),
  21. }
  22. }
  23. func (d *decoder) decode(rec *walpb.Record) error {
  24. rec.Reset()
  25. l, err := readInt64(d.br)
  26. if err != nil {
  27. return err
  28. }
  29. data := make([]byte, l)
  30. if _, err = io.ReadFull(d.br, data); err != nil {
  31. return err
  32. }
  33. if err := rec.Unmarshal(data); err != nil {
  34. return err
  35. }
  36. // skip crc checking if the record type is crcType
  37. if rec.Type == crcType {
  38. return nil
  39. }
  40. d.crc.Write(rec.Data)
  41. return rec.Validate(d.crc.Sum32())
  42. }
  43. func (d *decoder) updateCRC(prevCrc uint32) {
  44. d.crc = crc.New(prevCrc, crcTable)
  45. }
  46. func (d *decoder) lastCRC() uint32 {
  47. return d.crc.Sum32()
  48. }
  49. func (d *decoder) close() error {
  50. return d.c.Close()
  51. }
  52. func mustUnmarshalEntry(d []byte) raftpb.Entry {
  53. var e raftpb.Entry
  54. if err := e.Unmarshal(d); err != nil {
  55. // crc matched, but we cannot unmarshal the struct?!
  56. // we must be the next winner of the $1B lottery.
  57. panic(err)
  58. }
  59. return e
  60. }
  61. func mustUnmarshalState(d []byte) raftpb.HardState {
  62. var s raftpb.HardState
  63. if err := s.Unmarshal(d); err != nil {
  64. panic(err)
  65. }
  66. return s
  67. }
  68. func readInt64(r io.Reader) (int64, error) {
  69. var n int64
  70. err := binary.Read(r, binary.LittleEndian, &n)
  71. return n, err
  72. }