Browse Source

Merge pull request #1927 from xiang90/flock

*: lock the in using files; do not purge locked the wal files
Xiang Li 11 years ago
parent
commit
0ea8c0929e

+ 1 - 1
etcdctl/command/backup_command.go

@@ -67,7 +67,7 @@ func handleBackup(c *cli.Context) {
 		}
 	}
 
-	w, err := wal.OpenAtIndex(srcWAL, index)
+	w, err := wal.OpenNotInUse(srcWAL, index)
 	if err != nil {
 		log.Fatal(err)
 	}

+ 7 - 25
etcdserver/server.go

@@ -90,21 +90,6 @@ type Response struct {
 	err     error
 }
 
-type Storage interface {
-	// Save function saves ents and state to the underlying stable storage.
-	// Save MUST block until st and ents are on stable storage.
-	Save(st raftpb.HardState, ents []raftpb.Entry) error
-	// SaveSnap function saves snapshot to the underlying stable storage.
-	SaveSnap(snap raftpb.Snapshot) error
-
-	// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
-	// remove it in this interface.
-	// Cut cuts out a new wal file for saving new state and entries.
-	Cut() error
-	// Close closes the Storage and performs finalization.
-	Close() error
-}
-
 type Server interface {
 	// Start performs any initialization of the Server necessary for it to
 	// begin serving requests. It must be called before Do or Process.
@@ -295,15 +280,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		id:          id,
 		attributes:  Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
 		Cluster:     cfg.Cluster,
-		storage: struct {
-			*wal.WAL
-			*snap.Snapshotter
-		}{w, ss},
-		stats:      sstats,
-		lstats:     lstats,
-		Ticker:     time.Tick(100 * time.Millisecond),
-		SyncTicker: time.Tick(500 * time.Millisecond),
-		snapCount:  cfg.SnapCount,
+		storage:     NewStorage(w, ss),
+		stats:       sstats,
+		lstats:      lstats,
+		Ticker:      time.Tick(100 * time.Millisecond),
+		SyncTicker:  time.Tick(500 * time.Millisecond),
+		snapCount:   cfg.SnapCount,
 	}
 	srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats)
 	for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) {
@@ -1005,7 +987,7 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (ty
 
 func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
 	var err error
-	if w, err = wal.OpenAtIndex(waldir, index); err != nil {
+	if w, err = wal.Open(waldir, index); err != nil {
 		log.Fatalf("etcdserver: open wal error: %v", err)
 	}
 	var wmetadata []byte

+ 45 - 0
etcdserver/storage.go

@@ -0,0 +1,45 @@
+package etcdserver
+
+import (
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
+	"github.com/coreos/etcd/wal"
+)
+
+type Storage interface {
+	// Save function saves ents and state to the underlying stable storage.
+	// Save MUST block until st and ents are on stable storage.
+	Save(st raftpb.HardState, ents []raftpb.Entry) error
+	// SaveSnap function saves snapshot to the underlying stable storage.
+	SaveSnap(snap raftpb.Snapshot) error
+
+	// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
+	// remove it in this interface.
+	// Cut cuts out a new wal file for saving new state and entries.
+	Cut() error
+	// Close closes the Storage and performs finalization.
+	Close() error
+}
+
+type storage struct {
+	*wal.WAL
+	*snap.Snapshotter
+}
+
+func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
+	return &storage{w, s}
+}
+
+// SaveSnap saves the snapshot to disk and release the locked
+// wal files since they will not be used.
+func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
+	err := st.Snapshotter.SaveSnap(snap)
+	if err != nil {
+		return err
+	}
+	err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
+	if err != nil {
+		return err
+	}
+	return nil
+}

+ 60 - 0
pkg/fileutil/lock.go

@@ -0,0 +1,60 @@
+package fileutil
+
+import (
+	"errors"
+	"os"
+	"syscall"
+)
+
+var (
+	ErrLocked = errors.New("file already locked")
+)
+
+type Lock interface {
+	Name() string
+	TryLock() error
+	Lock() error
+	Unlock() error
+	Destroy() error
+}
+
+type lock struct {
+	fd   int
+	file *os.File
+}
+
+func (l *lock) Name() string {
+	return l.file.Name()
+}
+
+// TryLock acquires exclusivity on the lock without blocking
+func (l *lock) TryLock() error {
+	err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB)
+	if err != nil && err == syscall.EWOULDBLOCK {
+		return ErrLocked
+	}
+	return err
+}
+
+// Lock acquires exclusivity on the lock without blocking
+func (l *lock) Lock() error {
+	return syscall.Flock(l.fd, syscall.LOCK_EX)
+}
+
+// Unlock unlocks the lock
+func (l *lock) Unlock() error {
+	return syscall.Flock(l.fd, syscall.LOCK_UN)
+}
+
+func (l *lock) Destroy() error {
+	return l.file.Close()
+}
+
+func NewLock(file string) (Lock, error) {
+	f, err := os.Open(file)
+	if err != nil {
+		return nil, err
+	}
+	l := &lock{int(f.Fd()), f}
+	return l, nil
+}

+ 82 - 0
pkg/fileutil/lock_test.go

@@ -0,0 +1,82 @@
+package fileutil
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+)
+
+func TestLockAndUnlock(t *testing.T) {
+	f, err := ioutil.TempFile("", "lock")
+	if err != nil {
+		t.Fatal(err)
+	}
+	f.Close()
+	defer func() {
+		err := os.Remove(f.Name())
+		if err != nil {
+			t.Fatal(err)
+		}
+	}()
+
+	// lock the file
+	l, err := NewLock(f.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer l.Destroy()
+	err = l.Lock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// try lock a locked file
+	dupl, err := NewLock(f.Name())
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = dupl.TryLock()
+	if err != ErrLocked {
+		t.Errorf("err = %v, want %v", err, ErrLocked)
+	}
+
+	// unlock the file
+	err = l.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// try lock the unlocked file
+	err = dupl.TryLock()
+	if err != nil {
+		t.Errorf("err = %v, want %v", err, nil)
+	}
+	defer dupl.Destroy()
+
+	// blocking on locked file
+	locked := make(chan struct{}, 1)
+	go func() {
+		l.Lock()
+		locked <- struct{}{}
+	}()
+
+	select {
+	case <-locked:
+		t.Error("unexpected unblocking")
+	case <-time.After(10 * time.Millisecond):
+	}
+
+	// unlock
+	err = dupl.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// the previously blocked routine should be unblocked
+	select {
+	case <-locked:
+	case <-time.After(10 * time.Millisecond):
+		t.Error("unexpected blocking")
+	}
+}

+ 19 - 2
pkg/fileutil/purge.go

@@ -27,12 +27,29 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
 			sort.Strings(newfnames)
 			for len(newfnames) > int(max) {
 				f := path.Join(dirname, newfnames[0])
-				err := os.Remove(f)
+				l, err := NewLock(f)
 				if err != nil {
 					errC <- err
 					return
 				}
-				log.Printf("filePurge: successfully remvoed file %s", f)
+				err = l.TryLock()
+				if err != nil {
+					break
+				}
+				err = os.Remove(f)
+				if err != nil {
+					errC <- err
+					return
+				}
+				err = l.Unlock()
+				if err != nil {
+					log.Printf("filePurge: unlock %s error %v", l.Name(), err)
+				}
+				err = l.Destroy()
+				if err != nil {
+					log.Printf("filePurge: destroy lock %s error %v", l.Name(), err)
+				}
+				log.Printf("filePurge: successfully removed file %s", f)
 				newfnames = newfnames[1:]
 			}
 			select {

+ 69 - 1
pkg/fileutil/purge_test.go

@@ -31,7 +31,7 @@ func TestPurgeFile(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		time.Sleep(time.Millisecond)
+		time.Sleep(2 * time.Millisecond)
 	}
 	fnames, err := ReadDir(dir)
 	if err != nil {
@@ -48,3 +48,71 @@ func TestPurgeFile(t *testing.T) {
 	}
 	close(stop)
 }
+
+func TestPurgeFileHoldingLock(t *testing.T) {
+	dir, err := ioutil.TempDir("", "purgefile")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	for i := 0; i < 10; i++ {
+		_, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// create a purge barrier at 5
+	l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5)))
+	err = l.Lock()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	stop := make(chan struct{})
+	errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
+	time.Sleep(5 * time.Millisecond)
+
+	fnames, err := ReadDir(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
+	if !reflect.DeepEqual(fnames, wnames) {
+		t.Errorf("filenames = %v, want %v", fnames, wnames)
+	}
+	select {
+	case err := <-errch:
+		t.Errorf("unexpected purge error %v", err)
+	case <-time.After(time.Millisecond):
+	}
+
+	// remove the purge barrier
+	err = l.Unlock()
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = l.Destroy()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(5 * time.Millisecond)
+
+	fnames, err = ReadDir(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	wnames = []string{"7.test", "8.test", "9.test"}
+	if !reflect.DeepEqual(fnames, wnames) {
+		t.Errorf("filenames = %v, want %v", fnames, wnames)
+	}
+	select {
+	case err := <-errch:
+		t.Errorf("unexpected purge error %v", err)
+	case <-time.After(time.Millisecond):
+	}
+
+	close(stop)
+}

+ 1 - 1
test

@@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
 source ./build
 
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
-TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
+TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
 FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
 
 # user has not provided PKG override

+ 1 - 1
wal/doc.go

@@ -48,7 +48,7 @@ Cut issues 0x10 entries with incremental index later then the file will be calle
 
 At a later time a WAL can be opened at a particular raft index:
 
-	w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
+	w, err := wal.Open("/var/lib/etcd", 0)
 	...
 
 The raft index must have been written to the WAL. When opening without a

+ 81 - 4
wal/wal.go

@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"hash/crc32"
 	"io"
+	"log"
 	"os"
 	"path"
 	"reflect"
@@ -67,6 +68,8 @@ type WAL struct {
 	seq     uint64   // sequence of the wal file currently used for writes
 	enti    uint64   // index of the last entry saved to the wal
 	encoder *encoder // encoder to encode records
+
+	locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
 }
 
 // Create creates a WAL ready for appending records. The given metadata is
@@ -85,6 +88,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	if err != nil {
 		return nil, err
 	}
+	l, err := fileutil.NewLock(f.Name())
+	if err != nil {
+		return nil, err
+	}
+	err = l.Lock()
+	if err != nil {
+		return nil, err
+	}
+
 	w := &WAL{
 		dir:      dirpath,
 		metadata: metadata,
@@ -92,6 +104,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 		f:        f,
 		encoder:  newEncoder(f, 0),
 	}
+	w.locks = append(w.locks, l)
 	if err := w.saveCrc(0); err != nil {
 		return nil, err
 	}
@@ -104,13 +117,23 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	return w, nil
 }
 
-// OpenAtIndex opens the WAL at the given index.
+// Open opens the WAL at the given index.
 // The index SHOULD have been previously committed to the WAL, or the following
 // ReadAll will fail.
 // The returned WAL is ready to read and the first record will be the given
 // index. The WAL cannot be appended to before reading out all of its
 // previous records.
-func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
+func Open(dirpath string, index uint64) (*WAL, error) {
+	return openAtIndex(dirpath, index, true)
+}
+
+// OpenNotInUse only opens the wal files that are not in use.
+// Other than that, it is similar to Open.
+func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
+	return openAtIndex(dirpath, index, false)
+}
+
+func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
 	names, err := fileutil.ReadDir(dirpath)
 	if err != nil {
 		return nil, err
@@ -129,12 +152,27 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
 
 	// open the wal files for reading
 	rcs := make([]io.ReadCloser, 0)
+	ls := make([]fileutil.Lock, 0)
 	for _, name := range names[nameIndex:] {
 		f, err := os.Open(path.Join(dirpath, name))
 		if err != nil {
 			return nil, err
 		}
+		l, err := fileutil.NewLock(f.Name())
+		if err != nil {
+			return nil, err
+		}
+		err = l.TryLock()
+		if err != nil {
+			if all {
+				return nil, err
+			} else {
+				log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
+				break
+			}
+		}
 		rcs = append(rcs, f)
+		ls = append(ls, l)
 	}
 	rc := MultiReadCloser(rcs...)
 
@@ -157,8 +195,9 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
 		ri:      index,
 		decoder: newDecoder(rc),
 
-		f:   f,
-		seq: seq,
+		f:     f,
+		seq:   seq,
+		locks: ls,
 	}
 	return w, nil
 }
@@ -224,6 +263,15 @@ func (w *WAL) Cut() error {
 	if err != nil {
 		return err
 	}
+	l, err := fileutil.NewLock(f.Name())
+	if err != nil {
+		return err
+	}
+	err = l.Lock()
+	if err != nil {
+		return err
+	}
+	w.locks = append(w.locks, l)
 	if err = w.sync(); err != nil {
 		return err
 	}
@@ -255,6 +303,30 @@ func (w *WAL) sync() error {
 	return w.f.Sync()
 }
 
+// ReleaseLockTo releases the locks w is holding, which
+// have index smaller or equal to the given index.
+func (w *WAL) ReleaseLockTo(index uint64) error {
+	for _, l := range w.locks {
+		_, i, err := parseWalName(path.Base(l.Name()))
+		if err != nil {
+			return err
+		}
+		if i > index {
+			return nil
+		}
+		err = l.Unlock()
+		if err != nil {
+			return err
+		}
+		err = l.Destroy()
+		if err != nil {
+			return err
+		}
+		w.locks = w.locks[1:]
+	}
+	return nil
+}
+
 func (w *WAL) Close() error {
 	if w.f != nil {
 		if err := w.sync(); err != nil {
@@ -264,6 +336,11 @@ func (w *WAL) Close() error {
 			return err
 		}
 	}
+	for _, l := range w.locks {
+		// TODO: log the error
+		l.Unlock()
+		l.Destroy()
+	}
 	return nil
 }
 

+ 8 - 6
wal/wal_test.go

@@ -90,7 +90,7 @@ func TestOpenAtIndex(t *testing.T) {
 	}
 	f.Close()
 
-	w, err := OpenAtIndex(dir, 0)
+	w, err := Open(dir, 0)
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
@@ -109,7 +109,7 @@ func TestOpenAtIndex(t *testing.T) {
 	}
 	f.Close()
 
-	w, err = OpenAtIndex(dir, 5)
+	w, err = Open(dir, 5)
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
@@ -126,7 +126,7 @@ func TestOpenAtIndex(t *testing.T) {
 		t.Fatal(err)
 	}
 	defer os.RemoveAll(emptydir)
-	if _, err = OpenAtIndex(emptydir, 0); err != ErrFileNotFound {
+	if _, err = Open(emptydir, 0); err != ErrFileNotFound {
 		t.Errorf("err = %v, want %v", err, ErrFileNotFound)
 	}
 }
@@ -219,7 +219,7 @@ func TestRecover(t *testing.T) {
 	}
 	w.Close()
 
-	if w, err = OpenAtIndex(p, 0); err != nil {
+	if w, err = Open(p, 0); err != nil {
 		t.Fatal(err)
 	}
 	metadata, state, entries, err := w.ReadAll()
@@ -238,6 +238,7 @@ func TestRecover(t *testing.T) {
 	if !reflect.DeepEqual(state, s) {
 		t.Errorf("state = %+v, want %+v", state, s)
 	}
+	w.Close()
 }
 
 func TestSearchIndex(t *testing.T) {
@@ -341,7 +342,7 @@ func TestRecoverAfterCut(t *testing.T) {
 	}
 
 	for i := 0; i < 10; i++ {
-		w, err := OpenAtIndex(p, uint64(i))
+		w, err := Open(p, uint64(i))
 		if err != nil {
 			if i <= 4 {
 				if err != ErrFileNotFound {
@@ -365,6 +366,7 @@ func TestRecoverAfterCut(t *testing.T) {
 				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
 			}
 		}
+		w.Close()
 	}
 }
 
@@ -384,7 +386,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
 	}
 	w.Close()
 
-	w, err = OpenAtIndex(p, 1)
+	w, err = Open(p, 1)
 	if err != nil {
 		t.Fatal(err)
 	}