decoder.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package wal
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "hash"
  6. "io"
  7. "github.com/coreos/etcd/pkg/crc"
  8. "github.com/coreos/etcd/pkg/pbutil"
  9. "github.com/coreos/etcd/raft/raftpb"
  10. "github.com/coreos/etcd/wal/walpb"
  11. )
  12. type decoder struct {
  13. br *bufio.Reader
  14. c io.Closer
  15. crc hash.Hash32
  16. }
  17. func newDecoder(rc io.ReadCloser) *decoder {
  18. return &decoder{
  19. br: bufio.NewReader(rc),
  20. c: rc,
  21. crc: crc.New(0, crcTable),
  22. }
  23. }
  24. func (d *decoder) decode(rec *walpb.Record) error {
  25. rec.Reset()
  26. l, err := readInt64(d.br)
  27. if err != nil {
  28. return err
  29. }
  30. data := make([]byte, l)
  31. if _, err = io.ReadFull(d.br, data); err != nil {
  32. return err
  33. }
  34. if err := rec.Unmarshal(data); err != nil {
  35. return err
  36. }
  37. // skip crc checking if the record type is crcType
  38. if rec.Type == crcType {
  39. return nil
  40. }
  41. d.crc.Write(rec.Data)
  42. return rec.Validate(d.crc.Sum32())
  43. }
  44. func (d *decoder) updateCRC(prevCrc uint32) {
  45. d.crc = crc.New(prevCrc, crcTable)
  46. }
  47. func (d *decoder) lastCRC() uint32 {
  48. return d.crc.Sum32()
  49. }
  50. func (d *decoder) close() error {
  51. return d.c.Close()
  52. }
  53. func mustUnmarshalEntry(d []byte) raftpb.Entry {
  54. var e raftpb.Entry
  55. pbutil.MustUnmarshal(&e, d)
  56. return e
  57. }
  58. func mustUnmarshalState(d []byte) raftpb.HardState {
  59. var s raftpb.HardState
  60. pbutil.MustUnmarshal(&s, d)
  61. return s
  62. }
  63. func readInt64(r io.Reader) (int64, error) {
  64. var n int64
  65. err := binary.Read(r, binary.LittleEndian, &n)
  66. return n, err
  67. }