encoder.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wal
  15. import (
  16. "bufio"
  17. "encoding/binary"
  18. "hash"
  19. "io"
  20. "sync"
  21. "github.com/coreos/etcd/pkg/crc"
  22. "github.com/coreos/etcd/wal/walpb"
  23. )
  24. type encoder struct {
  25. mu sync.Mutex
  26. bw *bufio.Writer
  27. crc hash.Hash32
  28. }
  29. func newEncoder(w io.Writer, prevCrc uint32) *encoder {
  30. return &encoder{
  31. bw: bufio.NewWriter(w),
  32. crc: crc.New(prevCrc, crcTable),
  33. }
  34. }
  35. func (e *encoder) encode(rec *walpb.Record) error {
  36. e.mu.Lock()
  37. defer e.mu.Unlock()
  38. e.crc.Write(rec.Data)
  39. rec.Crc = e.crc.Sum32()
  40. data, err := rec.Marshal()
  41. if err != nil {
  42. return err
  43. }
  44. if err := writeInt64(e.bw, int64(len(data))); err != nil {
  45. return err
  46. }
  47. _, err = e.bw.Write(data)
  48. return err
  49. }
  50. func (e *encoder) flush() error {
  51. e.mu.Lock()
  52. defer e.mu.Unlock()
  53. return e.bw.Flush()
  54. }
  55. func writeInt64(w io.Writer, n int64) error {
  56. return binary.Write(w, binary.LittleEndian, n)
  57. }