Browse Source

Merge pull request #3683 from yichengq/raft-block

etcdserver: fix raft state machine may block
Yicheng Qin 10 years ago
parent
commit
de669be6d6

+ 8 - 0
etcdserver/server.go

@@ -459,8 +459,13 @@ func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(
 
 
 func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
 func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
 
 
+// ReportSnapshot reports snapshot sent status to the raft state machine,
+// and clears the used snapshot from the snapshot store.
 func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
 func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
 	s.r.ReportSnapshot(id, status)
 	s.r.ReportSnapshot(id, status)
+	if s.cfg.V3demo {
+		s.r.raftStorage.snapStore.clearUsedSnap()
+	}
 }
 }
 
 
 func (s *EtcdServer) run() {
 func (s *EtcdServer) run() {
@@ -1022,6 +1027,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 			plog.Panicf("unexpected compaction error %v", err)
 			plog.Panicf("unexpected compaction error %v", err)
 		}
 		}
 		plog.Infof("compacted raft log at %d", compacti)
 		plog.Infof("compacted raft log at %d", compacti)
+		if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) {
+			plog.Infof("closed snapshot stored due to compaction at %d", compacti)
+		}
 	}()
 	}()
 }
 }
 
 

+ 1 - 0
etcdserver/server_test.go

@@ -761,6 +761,7 @@ func TestSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
+		cfg: &ServerConfig{},
 		r: raftNode{
 		r: raftNode{
 			Node:        &nodeRecorder{},
 			Node:        &nodeRecorder{},
 			raftStorage: s,
 			raftStorage: s,

+ 106 - 11
etcdserver/snapshot_store.go

@@ -20,7 +20,10 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
 	"path"
 	"path"
+	"sync"
+	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -28,6 +31,14 @@ import (
 	dstorage "github.com/coreos/etcd/storage"
 	dstorage "github.com/coreos/etcd/storage"
 )
 )
 
 
+// clearUnusedSnapshotInterval specifies the time interval to wait
+// before clearing unused snapshot.
+// The newly created snapshot should be retrieved within one heartbeat
+// interval because raft state machine retries to send snapshot
+// to slow follower when receiving MsgHeartbeatResp from the follower.
+// Set it as 5s to match the upper limit of heartbeat interval.
+const clearUnusedSnapshotInterval = 5 * time.Second
+
 type snapshot struct {
 type snapshot struct {
 	r raftpb.Snapshot
 	r raftpb.Snapshot
 
 
@@ -78,7 +89,13 @@ type snapshotStore struct {
 	// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
 	// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
 	raftsnapc chan raftpb.Snapshot
 	raftsnapc chan raftpb.Snapshot
 
 
-	snap *snapshot
+	mu sync.Mutex // protect belowing vars
+	// snap is nil iff there is no snapshot stored
+	snap       *snapshot
+	inUse      bool
+	createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored
+
+	clock clockwork.Clock
 }
 }
 
 
 func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
 func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
@@ -87,35 +104,113 @@ func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
 		kv:        kv,
 		kv:        kv,
 		reqsnapc:  make(chan struct{}),
 		reqsnapc:  make(chan struct{}),
 		raftsnapc: make(chan raftpb.Snapshot),
 		raftsnapc: make(chan raftpb.Snapshot),
+		clock:     clockwork.NewRealClock(),
 	}
 	}
 }
 }
 
 
 // getSnap returns a snapshot.
 // getSnap returns a snapshot.
 // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
 // If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
 //
 //
-// Internally it creates new snapshot and returns the snapshot. Unless the
-// returned snapshot is closed, it rejects creating new one and returns
-// ErrSnapshotTemporarilyUnavailable.
+// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable.
+// If there is no snapshot stored, it creates new snapshot
+// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so
+// caller could get snapshot later when the snapshot is created.
+// Otherwise, it returns the snapshot stored.
+//
+// The created snapshot is cleared from the snapshot store if it is
+// either unused after clearUnusedSnapshotInterval, or explicitly cleared
+// through clearUsedSnap after using.
+// closeSnapBefore is used to close outdated snapshot,
+// so the snapshot will be cleared faster when in use.
+//
+// snapshot store stores at most one snapshot at a time.
 // If raft state machine wants to send two snapshot messages to two followers,
 // If raft state machine wants to send two snapshot messages to two followers,
 // the second snapshot message will keep getting snapshot and succeed only after
 // the second snapshot message will keep getting snapshot and succeed only after
 // the first message is sent. This increases the time used to send messages,
 // the first message is sent. This increases the time used to send messages,
 // but it is acceptable because this should happen seldomly.
 // but it is acceptable because this should happen seldomly.
 func (ss *snapshotStore) getSnap() (*snapshot, error) {
 func (ss *snapshotStore) getSnap() (*snapshot, error) {
-	// If snapshotStore has some snapshot that has not been closed, it cannot
-	// request new snapshot. So it returns ErrSnapshotTemporarilyUnavailable.
-	if ss.snap != nil && !ss.snap.isClosed() {
+	ss.mu.Lock()
+	defer ss.mu.Unlock()
+
+	if ss.inUse {
+		return nil, raft.ErrSnapshotTemporarilyUnavailable
+	}
+
+	if ss.snap == nil {
+		// create snapshot asynchronously
+		ss.createOnce.Do(func() { go ss.createSnap() })
 		return nil, raft.ErrSnapshotTemporarilyUnavailable
 		return nil, raft.ErrSnapshotTemporarilyUnavailable
 	}
 	}
 
 
+	ss.inUse = true
+	// give transporter the generated snapshot that is ready to send out
+	ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index)
+	return ss.snap, nil
+}
+
+// clearUsedSnap clears the snapshot from the snapshot store after it
+// is used.
+// After clear, snapshotStore could create new snapshot when getSnap.
+func (ss *snapshotStore) clearUsedSnap() {
+	ss.mu.Lock()
+	defer ss.mu.Unlock()
+	if !ss.inUse {
+		plog.Panicf("unexpected clearUsedSnap when snapshot is not in use")
+	}
+	ss.clear()
+}
+
+// closeSnapBefore closes the stored snapshot if its index is not greater
+// than the given compact index.
+// If it closes the snapshot, it returns true.
+func (ss *snapshotStore) closeSnapBefore(index uint64) bool {
+	ss.mu.Lock()
+	defer ss.mu.Unlock()
+	if ss.snap != nil && ss.snap.raft().Metadata.Index <= index {
+		if err := ss.snap.Close(); err != nil {
+			plog.Errorf("snapshot close error (%v)", err)
+		}
+		return true
+	}
+	return false
+}
+
+// createSnap creates a new snapshot and stores it into the snapshot store.
+// It also sets a timer to clear the snapshot if it is not in use after
+// some time interval.
+// It should only be called in snapshotStore functions.
+func (ss *snapshotStore) createSnap() {
 	// ask to generate v2 snapshot
 	// ask to generate v2 snapshot
 	ss.reqsnapc <- struct{}{}
 	ss.reqsnapc <- struct{}{}
 	// generate KV snapshot
 	// generate KV snapshot
 	kvsnap := ss.kv.Snapshot()
 	kvsnap := ss.kv.Snapshot()
 	raftsnap := <-ss.raftsnapc
 	raftsnap := <-ss.raftsnapc
-	ss.snap = newSnapshot(raftsnap, kvsnap)
-	// give transporter the generated snapshot that is ready to send out
-	ss.tr.SnapshotReady(ss.snap, raftsnap.Metadata.Index)
-	return ss.snap, nil
+	snap := newSnapshot(raftsnap, kvsnap)
+
+	ss.mu.Lock()
+	ss.snap = snap
+	ss.mu.Unlock()
+
+	go func() {
+		<-ss.clock.After(clearUnusedSnapshotInterval)
+		ss.mu.Lock()
+		defer ss.mu.Unlock()
+		if snap == ss.snap && !ss.inUse {
+			ss.clear()
+		}
+	}()
+}
+
+// clear clears snapshot related variables in snapshotStore. It closes
+// the snapshot stored and sets the variables to initial values.
+// It should only be called in snapshotStore functions.
+func (ss *snapshotStore) clear() {
+	if err := ss.snap.Close(); err != nil {
+		plog.Errorf("snapshot close error (%v)", err)
+	}
+	ss.snap = nil
+	ss.inUse = false
+	ss.createOnce = sync.Once{}
 }
 }
 
 
 // SaveFrom saves snapshot at the given index from the given reader.
 // SaveFrom saves snapshot at the given index from the given reader.

+ 204 - 0
etcdserver/snapshot_store_test.go

@@ -0,0 +1,204 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"io"
+	"reflect"
+	"sync"
+	"testing"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
+	"github.com/coreos/etcd/pkg/testutil"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	dstorage "github.com/coreos/etcd/storage"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+func TestSnapshotStoreCreateSnap(t *testing.T) {
+	snap := raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{Index: 1},
+	}
+	ss := newSnapshotStore("", &nopKV{})
+	fakeClock := clockwork.NewFakeClock()
+	ss.clock = fakeClock
+	go func() {
+		<-ss.reqsnapc
+		ss.raftsnapc <- snap
+	}()
+
+	// create snapshot
+	ss.createSnap()
+	if !reflect.DeepEqual(ss.snap.raft(), snap) {
+		t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap)
+	}
+
+	// unused snapshot is cleared after clearUnusedSnapshotInterval
+	fakeClock.BlockUntil(1)
+	fakeClock.Advance(clearUnusedSnapshotInterval)
+	testutil.WaitSchedule()
+	ss.mu.Lock()
+	if ss.snap != nil {
+		t.Errorf("snap = %+v, want %+v", ss.snap, nil)
+	}
+	ss.mu.Unlock()
+}
+
+func TestSnapshotStoreGetSnap(t *testing.T) {
+	snap := raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{Index: 1},
+	}
+	ss := newSnapshotStore("", &nopKV{})
+	fakeClock := clockwork.NewFakeClock()
+	ss.clock = fakeClock
+	ss.tr = &nopTransporter{}
+	go func() {
+		<-ss.reqsnapc
+		ss.raftsnapc <- snap
+	}()
+
+	// get snap when no snapshot stored
+	_, err := ss.getSnap()
+	if err != raft.ErrSnapshotTemporarilyUnavailable {
+		t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
+	}
+
+	// wait for asynchronous snapshot creation to finish
+	testutil.WaitSchedule()
+	// get the created snapshot
+	s, err := ss.getSnap()
+	if err != nil {
+		t.Fatalf("getSnap error = %v, want nil", err)
+	}
+	if !reflect.DeepEqual(s.raft(), snap) {
+		t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap)
+	}
+	if !ss.inUse {
+		t.Errorf("inUse = %v, want true", ss.inUse)
+	}
+
+	// get snap when snapshot stored has been in use
+	_, err = ss.getSnap()
+	if err != raft.ErrSnapshotTemporarilyUnavailable {
+		t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
+	}
+
+	// clean up
+	fakeClock.Advance(clearUnusedSnapshotInterval)
+}
+
+func TestSnapshotStoreClearUsedSnap(t *testing.T) {
+	s := &fakeSnapshot{}
+	var once sync.Once
+	once.Do(func() {})
+	ss := &snapshotStore{
+		snap:       newSnapshot(raftpb.Snapshot{}, s),
+		inUse:      true,
+		createOnce: once,
+	}
+
+	ss.clearUsedSnap()
+	// wait for underlying KV snapshot closed
+	testutil.WaitSchedule()
+	s.mu.Lock()
+	if !s.closed {
+		t.Errorf("snapshot closed = %v, want true", s.closed)
+	}
+	s.mu.Unlock()
+	if ss.snap != nil {
+		t.Errorf("snapshot = %v, want nil", ss.snap)
+	}
+	if ss.inUse {
+		t.Errorf("isUse = %v, want false", ss.inUse)
+	}
+	// test createOnce is reset
+	if ss.createOnce == once {
+		t.Errorf("createOnce fails to reset")
+	}
+}
+
+func TestSnapshotStoreCloseSnapBefore(t *testing.T) {
+	snapIndex := uint64(5)
+
+	tests := []struct {
+		index uint64
+		wok   bool
+	}{
+		{snapIndex - 2, false},
+		{snapIndex - 1, false},
+		{snapIndex, true},
+	}
+	for i, tt := range tests {
+		rs := raftpb.Snapshot{
+			Metadata: raftpb.SnapshotMetadata{Index: 5},
+		}
+		s := &fakeSnapshot{}
+		ss := &snapshotStore{
+			snap: newSnapshot(rs, s),
+		}
+
+		ok := ss.closeSnapBefore(tt.index)
+		if ok != tt.wok {
+			t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok)
+		}
+		if ok {
+			// wait for underlying KV snapshot closed
+			testutil.WaitSchedule()
+			s.mu.Lock()
+			if !s.closed {
+				t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed)
+			}
+			s.mu.Unlock()
+		}
+	}
+}
+
+type nopKV struct{}
+
+func (kv *nopKV) Rev() int64 { return 0 }
+func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
+	return nil, 0, nil
+}
+func (kv *nopKV) Put(key, value []byte) (rev int64)          { return 0 }
+func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
+func (kv *nopKV) TxnBegin() int64                            { return 0 }
+func (kv *nopKV) TxnEnd(txnID int64) error                   { return nil }
+func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
+	return nil, 0, nil
+}
+func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil }
+func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
+	return 0, 0, nil
+}
+func (kv *nopKV) Compact(rev int64) error     { return nil }
+func (kv *nopKV) Hash() (uint32, error)       { return 0, nil }
+func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
+func (kv *nopKV) Restore() error              { return nil }
+func (kv *nopKV) Close() error                { return nil }
+
+type fakeSnapshot struct {
+	mu     sync.Mutex
+	closed bool
+}
+
+func (s *fakeSnapshot) Size() int64                        { return 0 }
+func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil }
+func (s *fakeSnapshot) Close() error {
+	s.mu.Lock()
+	s.closed = true
+	s.mu.Unlock()
+	return nil
+}

+ 5 - 1
rafthttp/transport.go

@@ -86,7 +86,11 @@ type Transporter interface {
 	// If the connection is currently inactive, it returns zero time.
 	// If the connection is currently inactive, it returns zero time.
 	ActiveSince(id types.ID) time.Time
 	ActiveSince(id types.ID) time.Time
 	// SnapshotReady accepts a snapshot at the given index that is ready to send out.
 	// SnapshotReady accepts a snapshot at the given index that is ready to send out.
-	// SnapshotReady MUST not be called when the snapshot sent result of previous
+	// It is expected that caller sends a raft snapshot message with
+	// the given index soon, and the accepted snapshot will be sent out
+	// together. After sending, snapshot sent status is reported
+	// through Raft.SnapshotStatus.
+	// SnapshotReady MUST not be called when the snapshot sent status of previous
 	// accepted one has not been reported.
 	// accepted one has not been reported.
 	SnapshotReady(rc io.ReadCloser, index uint64)
 	SnapshotReady(rc io.ReadCloser, index uint64)
 	// Stop closes the connections and stops the transporter.
 	// Stop closes the connections and stops the transporter.