Browse Source

Merge pull request #2451 from xiang90/fix_wal

wal: do not race reader and writer
Xiang Li 10 years ago
parent
commit
daea484a9f
2 changed files with 16 additions and 2 deletions
  1. 7 1
      wal/decoder.go
  2. 9 1
      wal/encoder.go

+ 7 - 1
wal/decoder.go

@@ -19,6 +19,7 @@ import (
 	"encoding/binary"
 	"hash"
 	"io"
+	"sync"
 
 	"github.com/coreos/etcd/pkg/crc"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -27,7 +28,9 @@ import (
 )
 
 type decoder struct {
-	br  *bufio.Reader
+	mu sync.Mutex
+	br *bufio.Reader
+
 	c   io.Closer
 	crc hash.Hash32
 }
@@ -41,6 +44,9 @@ func newDecoder(rc io.ReadCloser) *decoder {
 }
 
 func (d *decoder) decode(rec *walpb.Record) error {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+
 	rec.Reset()
 	l, err := readInt64(d.br)
 	if err != nil {

+ 9 - 1
wal/encoder.go

@@ -19,13 +19,16 @@ import (
 	"encoding/binary"
 	"hash"
 	"io"
+	"sync"
 
 	"github.com/coreos/etcd/pkg/crc"
 	"github.com/coreos/etcd/wal/walpb"
 )
 
 type encoder struct {
-	bw  *bufio.Writer
+	mu sync.Mutex
+	bw *bufio.Writer
+
 	crc hash.Hash32
 }
 
@@ -37,6 +40,9 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
 }
 
 func (e *encoder) encode(rec *walpb.Record) error {
+	e.mu.Lock()
+	defer e.mu.Unlock()
+
 	e.crc.Write(rec.Data)
 	rec.Crc = e.crc.Sum32()
 	data, err := rec.Marshal()
@@ -51,6 +57,8 @@ func (e *encoder) encode(rec *walpb.Record) error {
 }
 
 func (e *encoder) flush() error {
+	e.mu.Lock()
+	defer e.mu.Unlock()
 	return e.bw.Flush()
 }