Browse Source

Merge pull request #4850 from xiang90/rm_demo

*: enable v3 by default
Xiang Li 9 years ago
parent
commit
d0d3b32210

+ 0 - 7
Documentation/configuration.md

@@ -250,13 +250,6 @@ Follow the instructions when using these flags.
 + default: false
 + env variable: ETCD_FORCE_NEW_CLUSTER
 
-## Experimental Flags
-
-### --experimental-v3demo
-+ Enable experimental [v3 demo API][rfc-v3].
-+ default: false
-+ env variable: ETCD_EXPERIMENTAL_V3DEMO
-
 ## Miscellaneous Flags
 
 ### --version

+ 0 - 4
e2e/etcd_test.go

@@ -229,7 +229,6 @@ type etcdProcessClusterConfig struct {
 	isPeerTLS     bool
 	isPeerAutoTLS bool
 	initialToken  string
-	isV3          bool
 }
 
 // newEtcdProcessCluster launches a new cluster from etcd processes, returning
@@ -342,9 +341,6 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
 			"--initial-cluster-token", cfg.initialToken,
 			"--data-dir", dataDirPath,
 		}
-		if cfg.isV3 {
-			args = append(args, "--experimental-v3demo")
-		}
 
 		args = append(args, cfg.tlsArgs()...)
 

+ 0 - 1
e2e/etcdctlv3_test.go

@@ -141,7 +141,6 @@ func setupCtlV3Test(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) *e
 		cfg = configStandalone(*cfg)
 	}
 	copied := *cfg
-	copied.isV3 = true
 	epc, err := newEtcdProcessCluster(&copied)
 	if err != nil {
 		t.Fatalf("could not start etcd process cluster (%v)", err)

+ 0 - 2
etcdmain/config.go

@@ -122,7 +122,6 @@ type config struct {
 
 	printVersion bool
 
-	v3demo                  bool
 	autoCompactionRetention int
 
 	enablePprof bool
@@ -224,7 +223,6 @@ func NewConfig() *config {
 	fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
 
 	// demo flag
-	fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.")
 	fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.")
 
 	// backwards-compatibility with v0.4.6

+ 0 - 1
etcdmain/etcd.go

@@ -333,7 +333,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		PeerTLSInfo:             cfg.peerTLSInfo,
 		TickMs:                  cfg.TickMs,
 		ElectionTicks:           cfg.electionTicks(),
-		V3demo:                  cfg.v3demo,
 		AutoCompactionRetention: cfg.autoCompactionRetention,
 		StrictReconfigCheck:     cfg.strictReconfigCheck,
 		EnablePprof:             cfg.enablePprof,

+ 0 - 2
etcdmain/help.go

@@ -135,8 +135,6 @@ given by the consensus protocol.
 
 experimental flags:
 
-	--experimental-v3demo 'false'
-		enable experimental v3 demo API.
 	--experimental-auto-compaction-retention '0'
 		auto compaction retention in hour. 0 means disable auto compaction.
 

+ 0 - 1
etcdserver/config.go

@@ -50,7 +50,6 @@ type ServerConfig struct {
 	ElectionTicks    int
 	BootstrapTimeout time.Duration
 
-	V3demo                  bool
 	AutoCompactionRetention int
 
 	StrictReconfigCheck bool

+ 2 - 8
etcdserver/raft.go

@@ -312,10 +312,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
 		Storage:         s,
 		MaxSizePerMsg:   maxSizePerMsg,
 		MaxInflightMsgs: maxInflightMsgs,
-	}
-
-	if cfg.V3demo {
-		c.CheckQuorum = true
+		CheckQuorum:     true,
 	}
 
 	n = raft.StartNode(c, peers)
@@ -349,10 +346,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
 		Storage:         s,
 		MaxSizePerMsg:   maxSizePerMsg,
 		MaxInflightMsgs: maxInflightMsgs,
-	}
-
-	if cfg.V3demo {
-		c.CheckQuorum = true
+		CheckQuorum:     true,
 	}
 
 	n := raft.RestartNode(c)

+ 50 - 64
etcdserver/server.go

@@ -16,7 +16,6 @@ package etcdserver
 
 import (
 	"encoding/json"
-	"errors"
 	"expvar"
 	"fmt"
 	"math/rand"
@@ -221,10 +220,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		return nil, fmt.Errorf("cannot access data directory: %v", terr)
 	}
 
-	if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) {
-		return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled")
-	}
-
 	// Run the migrations.
 	dataVer, err := version.DetectDataDir(cfg.DataDir)
 	if err != nil {
@@ -370,15 +365,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		msgSnapC:      make(chan raftpb.Message, maxInFlightMsgSnap),
 	}
 
-	if cfg.V3demo {
-		srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
-		srv.lessor = lease.NewLessor(srv.be)
-		srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
-		srv.authStore = auth.NewAuthStore(srv.be)
-		if h := cfg.AutoCompactionRetention; h != 0 {
-			srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
-			srv.compactor.Run()
-		}
+	srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
+	srv.lessor = lease.NewLessor(srv.be)
+	srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
+	srv.authStore = auth.NewAuthStore(srv.be)
+	if h := cfg.AutoCompactionRetention; h != 0 {
+		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
+		srv.compactor.Run()
 	}
 
 	// TODO: move transport initialization near the definition of remote
@@ -393,7 +386,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		ServerStats: sstats,
 		LeaderStats: lstats,
 		ErrorC:      srv.errorc,
-		V3demo:      cfg.V3demo,
 	}
 	if err := tr.Start(); err != nil {
 		return nil, err
@@ -588,44 +580,43 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			apply.snapshot.Metadata.Index, ep.appliedi)
 	}
 
-	if s.cfg.V3demo {
-		snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
-		if err != nil {
-			plog.Panicf("get database snapshot file path error: %v", err)
-		}
+	snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
+	if err != nil {
+		plog.Panicf("get database snapshot file path error: %v", err)
+	}
 
-		fn := path.Join(s.cfg.SnapDir(), databaseFilename)
-		if err := os.Rename(snapfn, fn); err != nil {
-			plog.Panicf("rename snapshot file error: %v", err)
-		}
+	fn := path.Join(s.cfg.SnapDir(), databaseFilename)
+	if err := os.Rename(snapfn, fn); err != nil {
+		plog.Panicf("rename snapshot file error: %v", err)
+	}
 
-		newbe := backend.NewDefaultBackend(fn)
-		if err := s.kv.Restore(newbe); err != nil {
-			plog.Panicf("restore KV error: %v", err)
-		}
+	newbe := backend.NewDefaultBackend(fn)
+	if err := s.kv.Restore(newbe); err != nil {
+		plog.Panicf("restore KV error: %v", err)
+	}
 
-		// Closing old backend might block until all the txns
-		// on the backend are finished.
-		// We do not want to wait on closing the old backend.
-		s.bemu.Lock()
-		oldbe := s.be
-		go func() {
-			if err := oldbe.Close(); err != nil {
-				plog.Panicf("close backend error: %v", err)
-			}
-		}()
+	// Closing old backend might block until all the txns
+	// on the backend are finished.
+	// We do not want to wait on closing the old backend.
+	s.bemu.Lock()
+	oldbe := s.be
+	go func() {
+		if err := oldbe.Close(); err != nil {
+			plog.Panicf("close backend error: %v", err)
+		}
+	}()
 
-		s.be = newbe
-		s.bemu.Unlock()
+	s.be = newbe
+	s.bemu.Unlock()
 
-		if s.lessor != nil {
-			s.lessor.Recover(newbe, s.kv)
-		}
+	if s.lessor != nil {
+		s.lessor.Recover(newbe, s.kv)
+	}
 
-		if s.authStore != nil {
-			s.authStore.Recover(newbe)
-		}
+	if s.authStore != nil {
+		s.authStore.Recover(newbe)
 	}
+
 	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 		plog.Panicf("recovery store error: %v", err)
 	}
@@ -938,20 +929,17 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 			ms[i].To = 0
 		}
 
-		if s.cfg.V3demo {
-			if ms[i].Type == raftpb.MsgSnap {
-				// There are two separate data store when v3 demo is enabled: the store for v2,
-				// and the KV for v3.
-				// The msgSnap only contains the most recent snapshot of store without KV.
-				// So we need to redirect the msgSnap to etcd server main loop for merging in the
-				// current store snapshot and KV snapshot.
-				select {
-				case s.msgSnapC <- ms[i]:
-				default:
-					// drop msgSnap if the inflight chan if full.
-				}
-				ms[i].To = 0
+		if ms[i].Type == raftpb.MsgSnap {
+			// There are two separate data store: the store for v2, and the KV for v3.
+			// The msgSnap only contains the most recent snapshot of store without KV.
+			// So we need to redirect the msgSnap to etcd server main loop for merging in the
+			// current store snapshot and KV snapshot.
+			select {
+			case s.msgSnapC <- ms[i]:
+			default:
+				// drop msgSnap if the inflight chan if full.
 			}
+			ms[i].To = 0
 		}
 		if ms[i].Type == raftpb.MsgHeartbeat {
 			ok, exceed := s.r.td.Observe(ms[i].To)
@@ -1182,11 +1170,9 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 			}
 			plog.Panicf("unexpected create snapshot error %v", err)
 		}
-		if s.cfg.V3demo {
-			// commit v3 storage because WAL file before snapshot index
-			// could be removed after SaveSnap.
-			s.getKV().Commit()
-		}
+		// commit v3 storage because WAL file before snapshot index
+		// could be removed after SaveSnap.
+		s.getKV().Commit()
 		// SaveSnap saves the snapshot and releases the locked wal files
 		// to the snapshot index.
 		if err = s.r.storage.SaveSnap(snap); err != nil {

+ 16 - 84
etcdserver/server_test.go

@@ -822,6 +822,11 @@ func TestSyncTrigger(t *testing.T) {
 
 // snapshot should snapshot the store and cut the persistent
 func TestSnapshot(t *testing.T) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	defer func() {
+		os.RemoveAll(tmpPath)
+	}()
+
 	s := raft.NewMemoryStorage()
 	s.Append([]raftpb.Entry{{Index: 1}})
 	st := mockstore.NewRecorder()
@@ -835,6 +840,9 @@ func TestSnapshot(t *testing.T) {
 		},
 		store: st,
 	}
+	srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex)
+	srv.be = be
+
 	srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
 	gaction, _ := st.Wait(2)
 	if len(gaction) != 2 {
@@ -857,6 +865,11 @@ func TestSnapshot(t *testing.T) {
 
 // Applied > SnapCount should trigger a SaveSnap event
 func TestTriggerSnap(t *testing.T) {
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	defer func() {
+		os.RemoveAll(tmpPath)
+	}()
+
 	snapc := 10
 	st := mockstore.NewRecorder()
 	p := mockstorage.NewStorageRecorderStream("")
@@ -872,6 +885,9 @@ func TestTriggerSnap(t *testing.T) {
 		store:    st,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
+	srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex)
+	srv.be = be
+
 	srv.start()
 
 	donec := make(chan struct{})
@@ -922,7 +938,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
 	s := &EtcdServer{
 		cfg: &ServerConfig{
-			V3demo:  true,
 			DataDir: testdir,
 		},
 		r: raftNode{
@@ -995,89 +1010,6 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	}
 }
 
-// TestRecvSnapshot tests when it receives a snapshot from raft leader,
-// it should trigger storage.SaveSnap and also store.Recover.
-func TestRecvSnapshot(t *testing.T) {
-	n := newNopReadyNode()
-	st := mockstore.NewRecorder()
-	p := mockstorage.NewStorageRecorder("")
-	cl := newCluster("abc")
-	cl.SetStore(store.New())
-	s := &EtcdServer{
-		cfg: &ServerConfig{},
-		r: raftNode{
-			Node:        n,
-			transport:   rafthttp.NewNopTransporter(),
-			storage:     p,
-			raftStorage: raft.NewMemoryStorage(),
-		},
-		store:   st,
-		cluster: cl,
-	}
-
-	s.start()
-	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
-
-	// wait for actions happened on the storage
-	for len(p.Action()) == 0 {
-		time.Sleep(10 * time.Millisecond)
-	}
-
-	s.Stop()
-
-	wactions := []testutil.Action{{Name: "Recovery"}}
-	if g := st.Action(); !reflect.DeepEqual(g, wactions) {
-		t.Errorf("store action = %v, want %v", g, wactions)
-	}
-	wactions = []testutil.Action{{Name: "SaveSnap"}, {Name: "Save"}}
-	if g := p.Action(); !reflect.DeepEqual(g, wactions) {
-		t.Errorf("storage action = %v, want %v", g, wactions)
-	}
-}
-
-// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
-// first and then committed entries.
-func TestApplySnapshotAndCommittedEntries(t *testing.T) {
-	n := newNopReadyNode()
-	st := mockstore.NewRecorderStream()
-	cl := newCluster("abc")
-	cl.SetStore(store.New())
-	storage := raft.NewMemoryStorage()
-	s := &EtcdServer{
-		cfg: &ServerConfig{},
-		r: raftNode{
-			Node:        n,
-			storage:     mockstorage.NewStorageRecorder(""),
-			raftStorage: storage,
-			transport:   rafthttp.NewNopTransporter(),
-		},
-		store:   st,
-		cluster: cl,
-	}
-
-	s.start()
-	req := &pb.Request{Method: "QGET"}
-	n.readyc <- raft.Ready{
-		Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}},
-		CommittedEntries: []raftpb.Entry{
-			{Index: 2, Data: pbutil.MustMarshal(req)},
-		},
-	}
-	// make goroutines move forward to receive snapshot
-	actions, _ := st.Wait(2)
-	s.Stop()
-
-	if len(actions) != 2 {
-		t.Fatalf("len(action) = %d, want 2", len(actions))
-	}
-	if actions[0].Name != "Recovery" {
-		t.Errorf("actions[0] = %s, want %s", actions[0].Name, "Recovery")
-	}
-	if actions[1].Name != "Get" {
-		t.Errorf("actions[1] = %s, want %s", actions[1].Name, "Get")
-	}
-}
-
 // TestAddMember tests AddMember can propose and perform node addition.
 func TestAddMember(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()

+ 0 - 6
integration/cluster.go

@@ -72,7 +72,6 @@ type ClusterConfig struct {
 	PeerTLS      *transport.TLSInfo
 	ClientTLS    *transport.TLSInfo
 	DiscoveryURL string
-	UseV3        bool
 	UseGRPC      bool
 }
 
@@ -199,7 +198,6 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
 	name := c.name(rand.Int())
 	m := mustNewMember(t, name, c.cfg.PeerTLS, c.cfg.ClientTLS)
 	m.DiscoveryURL = c.cfg.DiscoveryURL
-	m.V3demo = c.cfg.UseV3
 	if c.cfg.UseGRPC {
 		if err := m.listenGRPC(); err != nil {
 			t.Fatal(err)
@@ -471,9 +469,6 @@ func mustNewMember(t *testing.T, name string, peerTLS *transport.TLSInfo, client
 
 // listenGRPC starts a grpc server over a unix domain socket on the member
 func (m *member) listenGRPC() error {
-	if m.V3demo == false {
-		return fmt.Errorf("starting grpc server without v3 configured")
-	}
 	// prefix with localhost so cert has right domain
 	m.grpcAddr = "localhost:" + m.Name + ".sock"
 	if err := os.RemoveAll(m.grpcAddr); err != nil {
@@ -723,7 +718,6 @@ type ClusterV3 struct {
 // NewClusterV3 returns a launched cluster with a grpc client connection
 // for each cluster member.
 func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
-	cfg.UseV3 = true
 	cfg.UseGRPC = true
 	clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
 	for _, m := range clus.Members {

+ 0 - 1
integration/v3_grpc_test.go

@@ -629,7 +629,6 @@ func TestV3RangeRequest(t *testing.T) {
 }
 
 func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
-	cfg.UseV3 = true
 	cfg.UseGRPC = true
 	clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
 	clus.Launch(t)

+ 3 - 5
rafthttp/peer.go

@@ -92,9 +92,8 @@ type Peer interface {
 // It is only used when the stream has not been established.
 type peer struct {
 	// id of the remote raft peer node
-	id     types.ID
-	r      Raft
-	v3demo bool
+	id types.ID
+	r  Raft
 
 	status *peerStatus
 
@@ -118,13 +117,12 @@ type peer struct {
 	stopc  chan struct{}
 }
 
-func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
+func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 	status := newPeerStatus(to)
 	picker := newURLPicker(urls)
 	p := &peer{
 		id:             to,
 		r:              r,
-		v3demo:         v3demo,
 		status:         status,
 		picker:         picker,
 		msgAppV2Writer: startStreamWriter(to, status, fs, r),

+ 1 - 2
rafthttp/transport.go

@@ -111,7 +111,6 @@ type Transport struct {
 	// When an error is received from ErrorC, user should stop raft state
 	// machine and thus stop the Transport.
 	ErrorC chan error
-	V3demo bool
 
 	streamRt   http.RoundTripper // roundTripper used by streams
 	pipelineRt http.RoundTripper // roundTripper used by pipelines
@@ -232,7 +231,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
+	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
 	addPeerToProber(t.prober, id.String(), us)
 }
 

+ 0 - 4
tools/functional-tester/etcd-tester/cluster.go

@@ -113,10 +113,6 @@ func (c *cluster) Bootstrap() error {
 			"--initial-cluster", clusterStr,
 			"--initial-cluster-state", "new",
 		}
-		if !c.v2Only {
-			flags = append(flags,
-				"--experimental-v3demo")
-		}
 
 		if _, err := a.Start(flags...); err != nil {
 			// cleanup