Browse Source

*: lock the in using files; do not purge locked the wal files

Xiang Li 11 years ago
parent
commit
ea94d19147
8 changed files with 346 additions and 29 deletions
  1. 6 24
      etcdserver/server.go
  2. 45 0
      etcdserver/storage.go
  3. 60 0
      pkg/fileutil/lock.go
  4. 82 0
      pkg/fileutil/lock_test.go
  5. 19 2
      pkg/fileutil/purge.go
  6. 69 1
      pkg/fileutil/purge_test.go
  7. 63 2
      wal/wal.go
  8. 2 0
      wal/wal_test.go

+ 6 - 24
etcdserver/server.go

@@ -90,21 +90,6 @@ type Response struct {
 	err     error
 }
 
-type Storage interface {
-	// Save function saves ents and state to the underlying stable storage.
-	// Save MUST block until st and ents are on stable storage.
-	Save(st raftpb.HardState, ents []raftpb.Entry) error
-	// SaveSnap function saves snapshot to the underlying stable storage.
-	SaveSnap(snap raftpb.Snapshot) error
-
-	// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
-	// remove it in this interface.
-	// Cut cuts out a new wal file for saving new state and entries.
-	Cut() error
-	// Close closes the Storage and performs finalization.
-	Close() error
-}
-
 type Server interface {
 	// Start performs any initialization of the Server necessary for it to
 	// begin serving requests. It must be called before Do or Process.
@@ -295,15 +280,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		id:          id,
 		attributes:  Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
 		Cluster:     cfg.Cluster,
-		storage: struct {
-			*wal.WAL
-			*snap.Snapshotter
-		}{w, ss},
-		stats:      sstats,
-		lstats:     lstats,
-		Ticker:     time.Tick(100 * time.Millisecond),
-		SyncTicker: time.Tick(500 * time.Millisecond),
-		snapCount:  cfg.SnapCount,
+		storage:     NewStorage(w, ss),
+		stats:       sstats,
+		lstats:      lstats,
+		Ticker:      time.Tick(100 * time.Millisecond),
+		SyncTicker:  time.Tick(500 * time.Millisecond),
+		snapCount:   cfg.SnapCount,
 	}
 	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
 	for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) {

+ 45 - 0
etcdserver/storage.go

@@ -0,0 +1,45 @@
+package etcdserver
+
+import (
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+	"github.com/coreos/etcd/wal"
+)
+
+type Storage interface {
+	// Save function saves ents and state to the underlying stable storage.
+	// Save MUST block until st and ents are on stable storage.
+	Save(st raftpb.HardState, ents []raftpb.Entry) error
+	// SaveSnap function saves snapshot to the underlying stable storage.
+	SaveSnap(snap raftpb.Snapshot) error
+
+	// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
+	// remove it in this interface.
+	// Cut cuts out a new wal file for saving new state and entries.
+	Cut() error
+	// Close closes the Storage and performs finalization.
+	Close() error
+}
+
+type storage struct {
+	*wal.WAL
+	*snap.Snapshotter
+}
+
+func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
+	return &storage{w, s}
+}
+
+// SaveSnap saves the snapshot to disk and release the locked
+// wal files since they will not be used.
+func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
+	err := st.Snapshotter.SaveSnap(snap)
+	if err != nil {
+		return err
+	}
+	err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
+	if err != nil {
+		return err
+	}
+	return nil
+}

+ 60 - 0
pkg/fileutil/lock.go

@@ -0,0 +1,60 @@
+package fileutil
+
+import (
+	"errors"
+	"os"
+	"syscall"
+)
+
+var (
+	ErrLocked = errors.New("file already locked")
+)
+
+type Lock interface {
+	Name() string
+	TryLock() error
+	Lock() error
+	Unlock() error
+	Destroy() error
+}
+
+type lock struct {
+	fd   int
+	file *os.File
+}
+
+func (l *lock) Name() string {
+	return l.file.Name()
+}
+
+// TryLock acquires exclusivity on the lock without blocking
+func (l *lock) TryLock() error {
+	err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB)
+	if err != nil && err == syscall.EWOULDBLOCK {
+		return ErrLocked
+	}
+	return err
+}
+
+// Lock acquires exclusivity on the lock without blocking
+func (l *lock) Lock() error {
+	return syscall.Flock(l.fd, syscall.LOCK_EX)
+}
+
+// Unlock unlocks the lock
+func (l *lock) Unlock() error {
+	return syscall.Flock(l.fd, syscall.LOCK_UN)
+}
+
+func (l *lock) Destroy() error {
+	return l.file.Close()
+}
+
+func NewLock(file string) (Lock, error) {
+	f, err := os.Open(file)
+	if err != nil {
+		return nil, err
+	}
+	l := &lock{int(f.Fd()), f}
+	return l, nil
+}

+ 82 - 0
pkg/fileutil/lock_test.go

@@ -0,0 +1,82 @@
+package fileutil
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+)
+
+func TestLockAndUnlock(t *testing.T) {
+	f, err := ioutil.TempFile("", "lock")
+	if err != nil {
+		t.Fatal(err)
+	}
+	f.Close()
+	defer func() {
+		err := os.Remove(f.Name())
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+
+	// lock the file
+	l, err := NewLock(f.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Destroy()
+	err = l.Lock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// try lock a locked file
+	dupl, err := NewLock(f.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = dupl.TryLock()
+	if err != ErrLocked {
+		t.Errorf("err = %v, want %v", err, ErrLocked)
+	}
+
+	// unlock the file
+	err = l.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// try lock the unlocked file
+	err = dupl.TryLock()
+	if err != nil {
+		t.Errorf("err = %v, want %v", err, nil)
+	}
+	defer dupl.Destroy()
+
+	// blocking on locked file
+	locked := make(chan struct{}, 1)
+	go func() {
+		l.Lock()
+		locked <- struct{}{}
+	}()
+
+	select {
+	case <-locked:
+		t.Error("unexpected unblocking")
+	case <-time.After(10 * time.Millisecond):
+	}
+
+	// unlock
+	err = dupl.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// the previously blocked routine should be unblocked
+	select {
+	case <-locked:
+	case <-time.After(10 * time.Millisecond):
+		t.Error("unexpected blocking")
+	}
+}

+ 19 - 2
pkg/fileutil/purge.go

@@ -27,12 +27,29 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
 			sort.Strings(newfnames)
 			for len(newfnames) > int(max) {
 				f := path.Join(dirname, newfnames[0])
-				err := os.Remove(f)
+				l, err := NewLock(f)
 				if err != nil {
 					errC <- err
 					return
 				}
-				log.Printf("filePurge: successfully remvoed file %s", f)
+				err = l.TryLock()
+				if err != nil {
+					break
+				}
+				err = os.Remove(f)
+				if err != nil {
+					errC <- err
+					return
+				}
+				err = l.Unlock()
+				if err != nil {
+					log.Printf("filePurge: unlock %s error %v", l.Name(), err)
+				}
+				err = l.Destroy()
+				if err != nil {
+					log.Printf("filePurge: destroy lock %s error %v", l.Name(), err)
+				}
+				log.Printf("filePurge: successfully removed file %s", f)
 				newfnames = newfnames[1:]
 			}
 			select {

+ 69 - 1
pkg/fileutil/purge_test.go

@@ -31,7 +31,7 @@ func TestPurgeFile(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		time.Sleep(time.Millisecond)
+		time.Sleep(2 * time.Millisecond)
 	}
 	fnames, err := ReadDir(dir)
 	if err != nil {
@@ -48,3 +48,71 @@ func TestPurgeFile(t *testing.T) {
 	}
 	close(stop)
 }
+
+func TestPurgeFileHoldingLock(t *testing.T) {
+	dir, err := ioutil.TempDir("", "purgefile")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	for i := 0; i < 10; i++ {
+		_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// create a purge barrier at 5
+	l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5)))
+	err = l.Lock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	stop := make(chan struct{})
+	errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
+	time.Sleep(5 * time.Millisecond)
+
+	fnames, err := ReadDir(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
+	if !reflect.DeepEqual(fnames, wnames) {
+		t.Errorf("filenames = %v, want %v", fnames, wnames)
+	}
+	select {
+	case err := <-errch:
+		t.Errorf("unexpected purge error %v", err)
+	case <-time.After(time.Millisecond):
+	}
+
+	// remove the purge barrier
+	err = l.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = l.Destroy()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(5 * time.Millisecond)
+
+	fnames, err = ReadDir(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wnames = []string{"7.test", "8.test", "9.test"}
+	if !reflect.DeepEqual(fnames, wnames) {
+		t.Errorf("filenames = %v, want %v", fnames, wnames)
+	}
+	select {
+	case err := <-errch:
+		t.Errorf("unexpected purge error %v", err)
+	case <-time.After(time.Millisecond):
+	}
+
+	close(stop)
+}

+ 63 - 2
wal/wal.go

@@ -67,6 +67,8 @@ type WAL struct {
 	seq     uint64   // sequence of the wal file currently used for writes
 	enti    uint64   // index of the last entry saved to the wal
 	encoder *encoder // encoder to encode records
+
+	locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
 }
 
 // Create creates a WAL ready for appending records. The given metadata is
@@ -85,6 +87,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	if err != nil {
 		return nil, err
 	}
+	l, err := fileutil.NewLock(f.Name())
+	if err != nil {
+		return nil, err
+	}
+	err = l.Lock()
+	if err != nil {
+		return nil, err
+	}
+
 	w := &WAL{
 		dir:      dirpath,
 		metadata: metadata,
@@ -92,6 +103,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 		f:        f,
 		encoder:  newEncoder(f, 0),
 	}
+	w.locks = append(w.locks, l)
 	if err := w.saveCrc(0); err != nil {
 		return nil, err
 	}
@@ -129,12 +141,22 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
 
 	// open the wal files for reading
 	rcs := make([]io.ReadCloser, 0)
+	ls := make([]fileutil.Lock, 0)
 	for _, name := range names[nameIndex:] {
 		f, err := os.Open(path.Join(dirpath, name))
 		if err != nil {
 			return nil, err
 		}
+		l, err := fileutil.NewLock(f.Name())
+		if err != nil {
+			return nil, err
+		}
+		err = l.TryLock()
+		if err != nil {
+			return nil, err
+		}
 		rcs = append(rcs, f)
+		ls = append(ls, l)
 	}
 	rc := MultiReadCloser(rcs...)
 
@@ -157,8 +179,9 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
 		ri:      index,
 		decoder: newDecoder(rc),
 
-		f:   f,
-		seq: seq,
+		f:     f,
+		seq:   seq,
+		locks: ls,
 	}
 	return w, nil
 }
@@ -224,6 +247,15 @@ func (w *WAL) Cut() error {
 	if err != nil {
 		return err
 	}
+	l, err := fileutil.NewLock(f.Name())
+	if err != nil {
+		return err
+	}
+	err = l.Lock()
+	if err != nil {
+		return err
+	}
+	w.locks = append(w.locks, l)
 	if err = w.sync(); err != nil {
 		return err
 	}
@@ -255,6 +287,30 @@ func (w *WAL) sync() error {
 	return w.f.Sync()
 }
 
+// ReleaseLockTo releases the locks w is holding, which
+// have index smaller or equal to the given index.
+func (w *WAL) ReleaseLockTo(index uint64) error {
+	for _, l := range w.locks {
+		_, i, err := parseWalName(path.Base(l.Name()))
+		if err != nil {
+			return err
+		}
+		if i > index {
+			return nil
+		}
+		err = l.Unlock()
+		if err != nil {
+			return err
+		}
+		err = l.Destroy()
+		if err != nil {
+			return err
+		}
+		w.locks = w.locks[1:]
+	}
+	return nil
+}
+
 func (w *WAL) Close() error {
 	if w.f != nil {
 		if err := w.sync(); err != nil {
@@ -264,6 +320,11 @@ func (w *WAL) Close() error {
 			return err
 		}
 	}
+	for _, l := range w.locks {
+		// TODO: log the error
+		l.Unlock()
+		l.Destroy()
+	}
 	return nil
 }
 

+ 2 - 0
wal/wal_test.go

@@ -238,6 +238,7 @@ func TestRecover(t *testing.T) {
 	if !reflect.DeepEqual(state, s) {
 		t.Errorf("state = %+v, want %+v", state, s)
 	}
+	w.Close()
 }
 
 func TestSearchIndex(t *testing.T) {
@@ -365,6 +366,7 @@ func TestRecoverAfterCut(t *testing.T) {
 				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
 			}
 		}
+		w.Close()
 	}
 }