Browse Source

wal: allow at most one WAL function called at one time

SaveSnap and Save are called in separate goroutines now. Allow at most
one WAL function being called at one time to protect internal fields and
guarantee execution order.
Or one possible bug is that the new cut file is started with snapshot
entry instead of crc entry.
Yicheng Qin 10 years ago
parent
commit
44de670de7
1 changed files with 17 additions and 0 deletions
  1. 17 0
      wal/wal.go

+ 17 - 0
wal/wal.go

@@ -23,6 +23,7 @@ import (
 	"os"
 	"os"
 	"path"
 	"path"
 	"reflect"
 	"reflect"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/fileutil"
@@ -69,6 +70,7 @@ type WAL struct {
 	start   walpb.Snapshot // snapshot to start reading
 	start   walpb.Snapshot // snapshot to start reading
 	decoder *decoder       // decoder to decode records
 	decoder *decoder       // decoder to decode records
 
 
+	mu      sync.Mutex
 	f       *os.File // underlay file opened for appending, sync
 	f       *os.File // underlay file opened for appending, sync
 	seq     uint64   // sequence of the wal file currently used for writes
 	seq     uint64   // sequence of the wal file currently used for writes
 	enti    uint64   // index of the last entry saved to the wal
 	enti    uint64   // index of the last entry saved to the wal
@@ -213,6 +215,9 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
 // TODO: maybe loose the checking of match.
 // TODO: maybe loose the checking of match.
 // After ReadAll, the WAL will be ready for appending new records.
 // After ReadAll, the WAL will be ready for appending new records.
 func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
 func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
 	rec := &walpb.Record{}
 	rec := &walpb.Record{}
 	decoder := w.decoder
 	decoder := w.decoder
 
 
@@ -335,6 +340,9 @@ func (w *WAL) sync() error {
 // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
 // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
 // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
 // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
 func (w *WAL) ReleaseLockTo(index uint64) error {
 func (w *WAL) ReleaseLockTo(index uint64) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
 	var smaller int
 	var smaller int
 	found := false
 	found := false
 
 
@@ -370,6 +378,9 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
 }
 }
 
 
 func (w *WAL) Close() error {
 func (w *WAL) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
 	if w.f != nil {
 	if w.f != nil {
 		if err := w.sync(); err != nil {
 		if err := w.sync(); err != nil {
 			return err
 			return err
@@ -409,6 +420,9 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
 }
 }
 
 
 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
 	// short cut, do not call sync
 	// short cut, do not call sync
 	if raft.IsEmptyHardState(st) && len(ents) == 0 {
 	if raft.IsEmptyHardState(st) && len(ents) == 0 {
 		return nil
 		return nil
@@ -436,6 +450,9 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 }
 }
 
 
 func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
 func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
 	b := pbutil.MustMarshal(&e)
 	b := pbutil.MustMarshal(&e)
 	rec := &walpb.Record{Type: snapshotType, Data: b}
 	rec := &walpb.Record{Type: snapshotType, Data: b}
 	if err := w.encoder.encode(rec); err != nil {
 	if err := w.encoder.encode(rec); err != nil {