Quellcode durchsuchen

Merge pull request #4096 from heyitsanthony/serialize-applier-snapmerge

etcdserver: serialize snapshot merger with applier
Anthony Romano vor 10 Jahren
Ursprung
Commit
942b5570bd
2 geänderte Dateien mit 147 neuen und 30 gelöschten Zeilen
  1. 21 28
      etcdserver/server.go
  2. 126 2
      etcdserver/server_test.go

+ 21 - 28
etcdserver/server.go

@@ -479,21 +479,26 @@ type etcdProgress struct {
 	appliedi  uint64
 }
 
-// newApplier buffers apply operations and streams their results over an
-// etcdProgress output channel. This is so raftNode won't block on sending
+// newApplier buffers apply operations so raftNode won't block on sending
 // new applies, timing out (since applies can be slow). The goroutine begins
-// shutdown on close(s.done) and closes the etcdProgress channel when finished.
-func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
-	etcdprogc := make(chan etcdProgress)
+// shutdown on close(s.done) and closes the returned channel when finished.
+func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} {
+	donec := make(chan struct{})
 	go func() {
-		defer close(etcdprogc)
+		defer close(donec)
 		pending := []apply{}
 		sdonec := s.done
 		apdonec := make(chan struct{})
 		// serialized function
 		f := func(ap apply) {
 			s.applyAll(&ep, &ap)
-			etcdprogc <- ep
+			select {
+			// snapshot requested via send()
+			case m := <-s.msgSnapC:
+				merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
+				s.sendMergedSnap(merged)
+			default:
+			}
 			apdonec <- struct{}{}
 		}
 		for sdonec != nil || len(pending) > 0 {
@@ -516,7 +521,7 @@ func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
 			}
 		}
 	}()
-	return etcdprogc
+	return donec
 }
 
 func (s *EtcdServer) run() {
@@ -527,36 +532,24 @@ func (s *EtcdServer) run() {
 	s.r.start(s)
 
 	// asynchronously accept apply packets, dispatch progress in-order
-	ep := etcdProgress{
+	appdonec := s.startApplier(etcdProgress{
 		confState: snap.Metadata.ConfState,
 		snapi:     snap.Metadata.Index,
 		appliedi:  snap.Metadata.Index,
-	}
-	etcdprogc := s.newApplier(ep)
+	})
 
 	defer func() {
 		s.r.stop()
 		close(s.done)
-		for range etcdprogc {
-			/* wait for outstanding applys */
-		}
+		<-appdonec
 	}()
 
-	for {
-		select {
-		case ep = <-etcdprogc:
-		case m := <-s.msgSnapC:
-			merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
-			s.sendMergedSnap(merged)
-		case err := <-s.errorc:
-			plog.Errorf("%s", err)
-			plog.Infof("the data-dir used by this member must be removed.")
-			return
-		case <-s.stop:
-			return
-		}
+	select {
+	case err := <-s.errorc:
+		plog.Errorf("%s", err)
+		plog.Infof("the data-dir used by this member must be removed.")
+	case <-s.stop:
 	}
-
 }
 
 func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {

+ 126 - 2
etcdserver/server_test.go

@@ -17,7 +17,9 @@ package etcdserver
 import (
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
 	"net/http"
+	"os"
 	"path"
 	"reflect"
 	"strconv"
@@ -34,6 +36,7 @@ import (
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
+	dstorage "github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/store"
 )
 
@@ -823,6 +826,100 @@ func TestTriggerSnap(t *testing.T) {
 	}
 }
 
+// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
+// proposals.
+func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
+	const (
+		// snapshots that may queue up at once without dropping
+		maxInFlightMsgSnap = 16
+	)
+	n := newReadyNode()
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
+
+	testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
+	if err != nil {
+		t.Fatalf("Couldn't open tempdir (%v)", err)
+	}
+	defer os.RemoveAll(testdir)
+	if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
+		t.Fatalf("Couldn't make snap dir (%v)", err)
+	}
+
+	rs := raft.NewMemoryStorage()
+	tr := newSnapTransporter(testdir)
+	s := &EtcdServer{
+		cfg: &ServerConfig{
+			V3demo:  true,
+			DataDir: testdir,
+		},
+		r: raftNode{
+			Node:        n,
+			transport:   tr,
+			storage:     &storageRecorder{dbPath: testdir},
+			raftStorage: rs,
+		},
+		store:    cl.store,
+		cluster:  cl,
+		msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
+	}
+
+	s.kv = dstorage.New(
+		path.Join(testdir, "testdb.db"),
+		&s.consistIndex)
+
+	s.start()
+	defer s.Stop()
+
+	// submit applied entries and snap entries
+	idx := uint64(0)
+	outdated := 0
+	accepted := 0
+	for k := 1; k <= 101; k++ {
+		idx++
+		ch := s.w.Register(uint64(idx))
+		req := &pb.Request{Method: "QGET", ID: uint64(idx)}
+		ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
+		ready := raft.Ready{Entries: []raftpb.Entry{ent}}
+		n.readyc <- ready
+
+		ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
+		n.readyc <- ready
+
+		// "idx" applied
+		<-ch
+
+		// one snapshot for every two messages
+		if k%2 != 0 {
+			continue
+		}
+
+		n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
+		// get the snapshot sent by the transport
+		snapMsg := <-tr.snapDoneC
+		// If the snapshot trails applied records, recovery will panic
+		// since there's no allocated snapshot at the place of the
+		// snapshot record. This only happens when the applier and the
+		// snapshot sender get out of sync.
+		if snapMsg.Snapshot.Metadata.Index == idx {
+			idx++
+			snapMsg.Snapshot.Metadata.Index = idx
+			ready = raft.Ready{Snapshot: snapMsg.Snapshot}
+			n.readyc <- ready
+			accepted++
+		} else {
+			outdated++
+		}
+		// don't wait for the snapshot to complete, move to next message
+	}
+	if accepted != 50 {
+		t.Errorf("accepted=%v, want 50", accepted)
+	}
+	if outdated != 0 {
+		t.Errorf("outdated=%v, want 0", outdated)
+	}
+}
+
 // TestRecvSnapshot tests when it receives a snapshot from raft leader,
 // it should trigger storage.SaveSnap and also store.Recover.
 func TestRecvSnapshot(t *testing.T) {
@@ -1345,7 +1442,10 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
 }
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
-type storageRecorder struct{ testutil.Recorder }
+type storageRecorder struct {
+	testutil.Recorder
+	dbPath string // must have '/' suffix if set
+}
 
 func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 	p.Record(testutil.Action{Name: "Save"})
@@ -1361,7 +1461,11 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
 
 func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
 	p.Record(testutil.Action{Name: "DBFilePath"})
-	return fmt.Sprintf("%016x.snap.db", id), nil
+	path := p.dbPath
+	if path != "" {
+		path = path + "/"
+	}
+	return fmt.Sprintf("%s%016x.snap.db", path, id), nil
 }
 
 func (p *storageRecorder) Close() error { return nil }
@@ -1493,3 +1597,23 @@ func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Pause()                              {}
 func (s *nopTransporter) Resume()                             {}
+
+type snapTransporter struct {
+	nopTransporter
+	snapDoneC chan snap.Message
+	snapDir   string
+}
+
+func newSnapTransporter(snapDir string) *snapTransporter {
+	return &snapTransporter{
+		snapDoneC: make(chan snap.Message, 1),
+		snapDir:   snapDir,
+	}
+}
+
+func (s *snapTransporter) SendSnapshot(m snap.Message) {
+	ss := snap.New(s.snapDir)
+	ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
+	m.CloseWithError(nil)
+	s.snapDoneC <- m
+}