Przeglądaj źródła

etcdserver: separate out raft related stuff

Xiang Li 11 lat temu
rodzic
commit
973f79e1c9

+ 92 - 1
etcdserver/force_cluster.go → etcdserver/raft.go

@@ -1,5 +1,5 @@
 /*
 /*
-   Copyright 2014 CoreOS, Inc.
+   Copyright 2015 CoreOS, Inc.
 
 
    Licensed under the Apache License, Version 2.0 (the "License");
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    you may not use this file except in compliance with the License.
@@ -19,16 +19,107 @@ package etcdserver
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"log"
 	"log"
+	"os"
 	"sort"
 	"sort"
+	"time"
 
 
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal/walpb"
 	"github.com/coreos/etcd/wal/walpb"
 )
 )
 
 
+type RaftTimer interface {
+	Index() uint64
+	Term() uint64
+}
+
+type raftNode struct {
+	raft.Node
+
+	// config
+	snapCount uint64 // number of entries to trigger a snapshot
+
+	// utility
+	ticker      <-chan time.Time
+	raftStorage *raft.MemoryStorage
+	storage     Storage
+	// transport specifies the transport to send and receive msgs to members.
+	// Sending messages MUST NOT block. It is okay to drop messages, since
+	// clients should timeout and reissue their messages.
+	// If transport is nil, server will panic.
+	transport rafthttp.Transporter
+
+	// Cache of the latest raft index and raft term the server has seen
+	index uint64
+	term  uint64
+	lead  uint64
+}
+
+// for testing
+func (r *raftNode) pauseSending() {
+	p := r.transport.(rafthttp.Pausable)
+	p.Pause()
+}
+
+func (r *raftNode) resumeSending() {
+	p := r.transport.(rafthttp.Pausable)
+	p.Resume()
+}
+
+func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
+	var err error
+	member := cfg.Cluster.MemberByName(cfg.Name)
+	metadata := pbutil.MustMarshal(
+		&pb.Metadata{
+			NodeID:    uint64(member.ID),
+			ClusterID: uint64(cfg.Cluster.ID()),
+		},
+	)
+	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
+		log.Fatalf("etcdserver create snapshot directory error: %v", err)
+	}
+	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
+		log.Fatalf("etcdserver: create wal error: %v", err)
+	}
+	peers := make([]raft.Peer, len(ids))
+	for i, id := range ids {
+		ctx, err := json.Marshal((*cfg.Cluster).Member(id))
+		if err != nil {
+			log.Panicf("marshal member should never fail: %v", err)
+		}
+		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
+	}
+	id = member.ID
+	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
+	s = raft.NewMemoryStorage()
+	n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
+	return
+}
+
+func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
+	var walsnap walpb.Snapshot
+	if snapshot != nil {
+		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
+	}
+	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
+	cfg.Cluster.SetID(cid)
+
+	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
+	s := raft.NewMemoryStorage()
+	if snapshot != nil {
+		s.ApplySnapshot(*snapshot)
+	}
+	s.SetHardState(st)
+	s.Append(ents)
+	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
+	return id, n, s, w
+}
+
 func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
 func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
 	var walsnap walpb.Snapshot
 	var walsnap walpb.Snapshot
 	if snapshot != nil {
 	if snapshot != nil {

+ 0 - 0
etcdserver/force_cluster_test.go → etcdserver/raft_test.go


+ 62 - 138
etcdserver/server.go

@@ -23,7 +23,6 @@ import (
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
 	"net/http"
 	"net/http"
-	"os"
 	"path"
 	"path"
 	"regexp"
 	"regexp"
 	"sort"
 	"sort"
@@ -46,7 +45,6 @@ import (
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
-	"github.com/coreos/etcd/wal/walpb"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 )
 )
@@ -115,14 +113,12 @@ type Server interface {
 	UpdateMember(ctx context.Context, updateMemb Member) error
 	UpdateMember(ctx context.Context, updateMemb Member) error
 }
 }
 
 
-type RaftTimer interface {
-	Index() uint64
-	Term() uint64
-}
-
 // EtcdServer is the production implementation of the Server interface
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
 type EtcdServer struct {
-	cfg        *ServerConfig
+	cfg *ServerConfig
+
+	r raftNode
+
 	w          wait.Wait
 	w          wait.Wait
 	stop       chan struct{}
 	stop       chan struct{}
 	done       chan struct{}
 	done       chan struct{}
@@ -132,32 +128,13 @@ type EtcdServer struct {
 
 
 	Cluster *Cluster
 	Cluster *Cluster
 
 
-	node        raft.Node
-	raftStorage *raft.MemoryStorage
-	storage     Storage
-
 	store store.Store
 	store store.Store
 
 
 	stats  *stats.ServerStats
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 	lstats *stats.LeaderStats
 
 
-	// transport specifies the transport to send and receive msgs to members.
-	// Sending messages MUST NOT block. It is okay to drop messages, since
-	// clients should timeout and reissue their messages.
-	// If transport is nil, server will panic.
-	transport rafthttp.Transporter
-
-	Ticker     <-chan time.Time
 	SyncTicker <-chan time.Time
 	SyncTicker <-chan time.Time
 
 
-	snapCount uint64 // number of entries to trigger a snapshot
-
-	// Cache of the latest raft index and raft term the server has seen
-	raftIndex uint64
-	raftTerm  uint64
-
-	raftLead uint64
-
 	reqIDGen *idutil.Generator
 	reqIDGen *idutil.Generator
 }
 }
 
 
@@ -254,21 +231,23 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	lstats := stats.NewLeaderStats(id.String())
 	lstats := stats.NewLeaderStats(id.String())
 
 
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		cfg:         cfg,
-		errorc:      make(chan error, 1),
-		store:       st,
-		node:        n,
-		raftStorage: s,
-		id:          id,
-		attributes:  Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
-		Cluster:     cfg.Cluster,
-		storage:     NewStorage(w, ss),
-		stats:       sstats,
-		lstats:      lstats,
-		Ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
-		SyncTicker:  time.Tick(500 * time.Millisecond),
-		snapCount:   cfg.SnapCount,
-		reqIDGen:    idutil.NewGenerator(uint8(id), time.Now()),
+		cfg:    cfg,
+		errorc: make(chan error, 1),
+		store:  st,
+		r: raftNode{
+			Node:        n,
+			snapCount:   cfg.SnapCount,
+			ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
+			raftStorage: s,
+			storage:     NewStorage(w, ss),
+		},
+		id:         id,
+		attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
+		Cluster:    cfg.Cluster,
+		stats:      sstats,
+		lstats:     lstats,
+		SyncTicker: time.Tick(500 * time.Millisecond),
+		reqIDGen:   idutil.NewGenerator(uint8(id), time.Now()),
 	}
 	}
 
 
 	tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
 	tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
@@ -278,7 +257,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 			tr.AddPeer(m.ID, m.PeerURLs)
 			tr.AddPeer(m.ID, m.PeerURLs)
 		}
 		}
 	}
 	}
-	srv.transport = tr
+	srv.r.transport = tr
 	return srv, nil
 	return srv, nil
 }
 }
 
 
@@ -295,9 +274,9 @@ func (s *EtcdServer) Start() {
 // modify a server's fields after it has been sent to Start.
 // modify a server's fields after it has been sent to Start.
 // This function is just used for testing.
 // This function is just used for testing.
 func (s *EtcdServer) start() {
 func (s *EtcdServer) start() {
-	if s.snapCount == 0 {
+	if s.r.snapCount == 0 {
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
-		s.snapCount = DefaultSnapCount
+		s.r.snapCount = DefaultSnapCount
 	}
 	}
 	s.w = wait.New()
 	s.w = wait.New()
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
@@ -328,7 +307,7 @@ func (s *EtcdServer) purgeFile() {
 
 
 func (s *EtcdServer) ID() types.ID { return s.id }
 func (s *EtcdServer) ID() types.ID { return s.id }
 
 
-func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() }
+func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
 
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
 	if s.Cluster.IsIDRemoved(types.ID(m.From)) {
@@ -338,7 +317,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if m.Type == raftpb.MsgApp {
 	if m.Type == raftpb.MsgApp {
 		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
 		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
 	}
 	}
-	return s.node.Step(ctx, m)
+	return s.r.Step(ctx, m)
 }
 }
 
 
 func (s *EtcdServer) run() {
 func (s *EtcdServer) run() {
@@ -346,7 +325,7 @@ func (s *EtcdServer) run() {
 	var shouldstop bool
 	var shouldstop bool
 
 
 	// load initial state from raft storage
 	// load initial state from raft storage
-	snap, err := s.raftStorage.Snapshot()
+	snap, err := s.r.raftStorage.Snapshot()
 	if err != nil {
 	if err != nil {
 		log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
 		log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
 	}
 	}
@@ -356,20 +335,21 @@ func (s *EtcdServer) run() {
 	confState := snap.Metadata.ConfState
 	confState := snap.Metadata.ConfState
 
 
 	defer func() {
 	defer func() {
-		s.node.Stop()
-		s.transport.Stop()
-		if err := s.storage.Close(); err != nil {
+		s.r.Stop()
+		s.r.transport.Stop()
+		if err := s.r.storage.Close(); err != nil {
 			log.Panicf("etcdserver: close storage error: %v", err)
 			log.Panicf("etcdserver: close storage error: %v", err)
 		}
 		}
 		close(s.done)
 		close(s.done)
 	}()
 	}()
+	// TODO: make raft loop a method on raftNode
 	for {
 	for {
 		select {
 		select {
-		case <-s.Ticker:
-			s.node.Tick()
-		case rd := <-s.node.Ready():
+		case <-s.r.ticker:
+			s.r.Tick()
+		case rd := <-s.r.Ready():
 			if rd.SoftState != nil {
 			if rd.SoftState != nil {
-				atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
+				atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead)
 				if rd.RaftState == raft.StateLeader {
 				if rd.RaftState == raft.StateLeader {
 					syncC = s.SyncTicker
 					syncC = s.SyncTicker
 					// TODO: remove the nil checking
 					// TODO: remove the nil checking
@@ -384,18 +364,18 @@ func (s *EtcdServer) run() {
 
 
 			// apply snapshot to storage if it is more updated than current snapi
 			// apply snapshot to storage if it is more updated than current snapi
 			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi {
 			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi {
-				if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
+				if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil {
 					log.Fatalf("etcdserver: save snapshot error: %v", err)
 					log.Fatalf("etcdserver: save snapshot error: %v", err)
 				}
 				}
-				s.raftStorage.ApplySnapshot(rd.Snapshot)
+				s.r.raftStorage.ApplySnapshot(rd.Snapshot)
 				snapi = rd.Snapshot.Metadata.Index
 				snapi = rd.Snapshot.Metadata.Index
 				log.Printf("etcdserver: saved incoming snapshot at index %d", snapi)
 				log.Printf("etcdserver: saved incoming snapshot at index %d", snapi)
 			}
 			}
 
 
-			if err := s.storage.Save(rd.HardState, rd.Entries); err != nil {
+			if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil {
 				log.Fatalf("etcdserver: save state and entries error: %v", err)
 				log.Fatalf("etcdserver: save state and entries error: %v", err)
 			}
 			}
-			s.raftStorage.Append(rd.Entries)
+			s.r.raftStorage.Append(rd.Entries)
 
 
 			s.send(rd.Messages)
 			s.send(rd.Messages)
 
 
@@ -427,9 +407,9 @@ func (s *EtcdServer) run() {
 				}
 				}
 			}
 			}
 
 
-			s.node.Advance()
+			s.r.Advance()
 
 
-			if appliedi-snapi > s.snapCount {
+			if appliedi-snapi > s.r.snapCount {
 				log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
 				log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
 				s.snapshot(appliedi, &confState)
 				s.snapshot(appliedi, &confState)
 				snapi = appliedi
 				snapi = appliedi
@@ -486,7 +466,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 			return Response{}, err
 			return Response{}, err
 		}
 		}
 		ch := s.w.Register(r.ID)
 		ch := s.w.Register(r.ID)
-		s.node.Propose(ctx, data)
+		s.r.Propose(ctx, data)
 		select {
 		select {
 		case x := <-ch:
 		case x := <-ch:
 			resp := x.(Response)
 			resp := x.(Response)
@@ -526,7 +506,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
 func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
 
 
 func (s *EtcdServer) LeaderStats() []byte {
 func (s *EtcdServer) LeaderStats() []byte {
-	lead := atomic.LoadUint64(&s.raftLead)
+	lead := atomic.LoadUint64(&s.r.lead)
 	if lead != uint64(s.id) {
 	if lead != uint64(s.id) {
 		return nil
 		return nil
 	}
 	}
@@ -571,14 +551,14 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
 }
 }
 
 
 // Implement the RaftTimer interface
 // Implement the RaftTimer interface
-func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) }
+func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
 
 
-func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) }
+func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
 
 
 // Only for testing purpose
 // Only for testing purpose
 // TODO: add Raft server interface to expose raft related info:
 // TODO: add Raft server interface to expose raft related info:
 // Index, Term, Lead, Committed, Applied, LastIndex, etc.
 // Index, Term, Lead, Committed, Applied, LastIndex, etc.
-func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) }
+func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
 
 
 func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
 func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
 
 
@@ -588,7 +568,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
 	cc.ID = s.reqIDGen.Next()
 	cc.ID = s.reqIDGen.Next()
 	ch := s.w.Register(cc.ID)
 	ch := s.w.Register(cc.ID)
-	if err := s.node.ProposeConfChange(ctx, cc); err != nil {
+	if err := s.r.ProposeConfChange(ctx, cc); err != nil {
 		s.w.Trigger(cc.ID, nil)
 		s.w.Trigger(cc.ID, nil)
 		return err
 		return err
 	}
 	}
@@ -623,7 +603,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 	// There is no promise that node has leader when do SYNC request,
 	// There is no promise that node has leader when do SYNC request,
 	// so it uses goroutine to propose.
 	// so it uses goroutine to propose.
 	go func() {
 	go func() {
-		s.node.Propose(ctx, data)
+		s.r.Propose(ctx, data)
 		cancel()
 		cancel()
 	}()
 	}()
 }
 }
@@ -668,7 +648,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 			m.To = 0
 			m.To = 0
 		}
 		}
 	}
 	}
-	s.transport.Send(ms)
+	s.r.transport.Send(ms)
 }
 }
 
 
 // apply takes entries received from Raft (after it has been committed) and
 // apply takes entries received from Raft (after it has been committed) and
@@ -693,8 +673,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 		default:
 		default:
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 		}
 		}
-		atomic.StoreUint64(&s.raftIndex, e.Index)
-		atomic.StoreUint64(&s.raftTerm, e.Term)
+		atomic.StoreUint64(&s.r.index, e.Index)
+		atomic.StoreUint64(&s.r.term, e.Term)
 		applied = e.Index
 		applied = e.Index
 	}
 	}
 	return applied, shouldstop
 	return applied, shouldstop
@@ -754,10 +734,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
 func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
 	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 		cc.NodeID = raft.None
 		cc.NodeID = raft.None
-		s.node.ApplyConfChange(cc)
+		s.r.ApplyConfChange(cc)
 		return false, err
 		return false, err
 	}
 	}
-	*confState = *s.node.ApplyConfChange(cc)
+	*confState = *s.r.ApplyConfChange(cc)
 	switch cc.Type {
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode:
 	case raftpb.ConfChangeAddNode:
 		m := new(Member)
 		m := new(Member)
@@ -771,7 +751,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if m.ID == s.id {
 		if m.ID == s.id {
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
 		} else {
-			s.transport.AddPeer(m.ID, m.PeerURLs)
+			s.r.transport.AddPeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 		}
 	case raftpb.ConfChangeRemoveNode:
 	case raftpb.ConfChangeRemoveNode:
@@ -780,7 +760,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if id == s.id {
 		if id == s.id {
 			return true, nil
 			return true, nil
 		} else {
 		} else {
-			s.transport.RemovePeer(id)
+			s.r.transport.RemovePeer(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 		}
 		}
 	case raftpb.ConfChangeUpdateNode:
 	case raftpb.ConfChangeUpdateNode:
@@ -795,7 +775,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if m.ID == s.id {
 		if m.ID == s.id {
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
 		} else {
-			s.transport.UpdatePeer(m.ID, m.PeerURLs)
+			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 		}
 	}
 	}
@@ -810,7 +790,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 	if err != nil {
 	if err != nil {
 		log.Panicf("etcdserver: store save should never fail: %v", err)
 		log.Panicf("etcdserver: store save should never fail: %v", err)
 	}
 	}
-	err = s.raftStorage.Compact(snapi, confState, d)
+	err = s.r.raftStorage.Compact(snapi, confState, d)
 	if err != nil {
 	if err != nil {
 		// the snapshot was done asynchronously with the progress of raft.
 		// the snapshot was done asynchronously with the progress of raft.
 		// raft might have already got a newer snapshot and called compact.
 		// raft might have already got a newer snapshot and called compact.
@@ -821,78 +801,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 	}
 	}
 	log.Printf("etcdserver: compacted log at index %d", snapi)
 	log.Printf("etcdserver: compacted log at index %d", snapi)
 
 
-	if err := s.storage.Cut(); err != nil {
+	if err := s.r.storage.Cut(); err != nil {
 		log.Panicf("etcdserver: rotate wal file should never fail: %v", err)
 		log.Panicf("etcdserver: rotate wal file should never fail: %v", err)
 	}
 	}
-	snap, err := s.raftStorage.Snapshot()
+	snap, err := s.r.raftStorage.Snapshot()
 	if err != nil {
 	if err != nil {
 		log.Panicf("etcdserver: snapshot error: %v", err)
 		log.Panicf("etcdserver: snapshot error: %v", err)
 	}
 	}
-	if err := s.storage.SaveSnap(snap); err != nil {
+	if err := s.r.storage.SaveSnap(snap); err != nil {
 		log.Fatalf("etcdserver: save snapshot error: %v", err)
 		log.Fatalf("etcdserver: save snapshot error: %v", err)
 	}
 	}
 	log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 	log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 }
 }
 
 
-// for testing
-func (s *EtcdServer) PauseSending() {
-	p := s.transport.(rafthttp.Pausable)
-	p.Pause()
-}
-
-func (s *EtcdServer) ResumeSending() {
-	p := s.transport.(rafthttp.Pausable)
-	p.Resume()
-}
-
-func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
-	var err error
-	member := cfg.Cluster.MemberByName(cfg.Name)
-	metadata := pbutil.MustMarshal(
-		&pb.Metadata{
-			NodeID:    uint64(member.ID),
-			ClusterID: uint64(cfg.Cluster.ID()),
-		},
-	)
-	if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
-		log.Fatalf("etcdserver create snapshot directory error: %v", err)
-	}
-	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
-		log.Fatalf("etcdserver: create wal error: %v", err)
-	}
-	peers := make([]raft.Peer, len(ids))
-	for i, id := range ids {
-		ctx, err := json.Marshal((*cfg.Cluster).Member(id))
-		if err != nil {
-			log.Panicf("marshal member should never fail: %v", err)
-		}
-		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
-	}
-	id = member.ID
-	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
-	s = raft.NewMemoryStorage()
-	n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
-	return
-}
-
-func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
-	var walsnap walpb.Snapshot
-	if snapshot != nil {
-		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
-	}
-	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
-	cfg.Cluster.SetID(cid)
+func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
 
 
-	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
-	s := raft.NewMemoryStorage()
-	if snapshot != nil {
-		s.ApplySnapshot(*snapshot)
-	}
-	s.SetHardState(st)
-	s.Append(ents)
-	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
-	return id, n, s, w
-}
+func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
 
 
 // isBootstrapped tries to check if the given member has been bootstrapped
 // isBootstrapped tries to check if the given member has been bootstrapped
 // in the given cluster.
 // in the given cluster.

+ 106 - 82
etcdserver/server_test.go

@@ -456,7 +456,7 @@ func TestApplyConfChangeError(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 		n := &nodeRecorder{}
 		n := &nodeRecorder{}
 		srv := &EtcdServer{
 		srv := &EtcdServer{
-			node:    n,
+			r:       raftNode{Node: n},
 			Cluster: cl,
 			Cluster: cl,
 		}
 		}
 		_, err := srv.applyConfChange(tt.cc, nil)
 		_, err := srv.applyConfChange(tt.cc, nil)
@@ -483,10 +483,12 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 		cl.AddMember(&Member{ID: types.ID(i)})
 		cl.AddMember(&Member{ID: types.ID(i)})
 	}
 	}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		id:        1,
-		node:      &nodeRecorder{},
-		Cluster:   cl,
-		transport: &nopTransporter{},
+		id: 1,
+		r: raftNode{
+			Node:      &nodeRecorder{},
+			transport: &nopTransporter{},
+		},
+		Cluster: cl,
 	}
 	}
 	cc := raftpb.ConfChange{
 	cc := raftpb.ConfChange{
 		Type:   raftpb.ConfChangeRemoveNode,
 		Type:   raftpb.ConfChangeRemoveNode,
@@ -522,12 +524,14 @@ func TestDoProposal(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 		st := &storeRecorder{}
 		st := &storeRecorder{}
 		srv := &EtcdServer{
 		srv := &EtcdServer{
-			node:        newNodeCommitter(),
-			raftStorage: raft.NewMemoryStorage(),
-			store:       st,
-			transport:   &nopTransporter{},
-			storage:     &storageRecorder{},
-			reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+			r: raftNode{
+				Node:        newNodeCommitter(),
+				storage:     &storageRecorder{},
+				raftStorage: raft.NewMemoryStorage(),
+				transport:   &nopTransporter{},
+			},
+			store:    st,
+			reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		}
 		}
 		srv.start()
 		srv.start()
 		resp, err := srv.Do(context.Background(), tt)
 		resp, err := srv.Do(context.Background(), tt)
@@ -550,7 +554,7 @@ func TestDoProposal(t *testing.T) {
 func TestDoProposalCancelled(t *testing.T) {
 func TestDoProposalCancelled(t *testing.T) {
 	wait := &waitRecorder{}
 	wait := &waitRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     &nodeRecorder{},
+		r:        raftNode{Node: &nodeRecorder{}},
 		w:        wait,
 		w:        wait,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
@@ -569,7 +573,7 @@ func TestDoProposalCancelled(t *testing.T) {
 
 
 func TestDoProposalTimeout(t *testing.T) {
 func TestDoProposalTimeout(t *testing.T) {
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     &nodeRecorder{},
+		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
@@ -582,7 +586,7 @@ func TestDoProposalTimeout(t *testing.T) {
 
 
 func TestDoProposalStopped(t *testing.T) {
 func TestDoProposalStopped(t *testing.T) {
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     &nodeRecorder{},
+		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
@@ -598,7 +602,7 @@ func TestDoProposalStopped(t *testing.T) {
 func TestSync(t *testing.T) {
 func TestSync(t *testing.T) {
 	n := &nodeRecorder{}
 	n := &nodeRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     n,
+		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	// check that sync is non-blocking
 	// check that sync is non-blocking
@@ -631,7 +635,7 @@ func TestSync(t *testing.T) {
 func TestSyncTimeout(t *testing.T) {
 func TestSyncTimeout(t *testing.T) {
 	n := &nodeProposalBlockerRecorder{}
 	n := &nodeProposalBlockerRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     n,
+		r:        raftNode{Node: n},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	// check that sync is non-blocking
 	// check that sync is non-blocking
@@ -656,13 +660,15 @@ func TestSyncTrigger(t *testing.T) {
 	n := newReadyNode()
 	n := newReadyNode()
 	st := make(chan time.Time, 1)
 	st := make(chan time.Time, 1)
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		store:       &storeRecorder{},
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		SyncTicker:  st,
-		reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:        n,
+			raftStorage: raft.NewMemoryStorage(),
+			transport:   &nopTransporter{},
+			storage:     &storageRecorder{},
+		},
+		store:      &storeRecorder{},
+		SyncTicker: st,
+		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	srv.start()
 	srv.start()
 	defer srv.Stop()
 	defer srv.Stop()
@@ -700,10 +706,12 @@ func TestSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:        &nodeRecorder{},
-		raftStorage: s,
-		store:       st,
-		storage:     p,
+		r: raftNode{
+			Node:        &nodeRecorder{},
+			raftStorage: s,
+			storage:     p,
+		},
+		store: st,
 	}
 	}
 	srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
 	srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
 	gaction := st.Action()
 	gaction := st.Action()
@@ -731,13 +739,15 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:        newNodeCommitter(),
-		raftStorage: raft.NewMemoryStorage(),
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     p,
-		snapCount:   uint64(snapc),
-		reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:        newNodeCommitter(),
+			snapCount:   uint64(snapc),
+			raftStorage: raft.NewMemoryStorage(),
+			storage:     p,
+			transport:   &nopTransporter{},
+		},
+		store:    st,
+		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	srv.start()
 	srv.start()
 	for i := 0; i < snapc+1; i++ {
 	for i := 0; i < snapc+1; i++ {
@@ -766,12 +776,14 @@ func TestRecvSnapshot(t *testing.T) {
 	cl := newCluster("abc")
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     p,
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		Cluster:     cl,
+		r: raftNode{
+			Node:        n,
+			transport:   &nopTransporter{},
+			storage:     p,
+			raftStorage: raft.NewMemoryStorage(),
+		},
+		store:   st,
+		Cluster: cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -799,12 +811,14 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	cl := newCluster("abc")
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		Cluster:     cl,
+		r: raftNode{
+			Node:        n,
+			storage:     &storageRecorder{},
+			raftStorage: raft.NewMemoryStorage(),
+			transport:   &nopTransporter{},
+		},
+		store:   st,
+		Cluster: cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -832,12 +846,14 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
 	storage := raft.NewMemoryStorage()
 	storage := raft.NewMemoryStorage()
 	s := &EtcdServer{
 	s := &EtcdServer{
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		node:        n,
-		raftStorage: storage,
-		Cluster:     cl,
+		r: raftNode{
+			Node:        n,
+			storage:     &storageRecorder{},
+			raftStorage: storage,
+			transport:   &nopTransporter{},
+		},
+		store:   st,
+		Cluster: cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -874,13 +890,15 @@ func TestAddMember(t *testing.T) {
 	st := store.New()
 	st := store.New()
 	cl.SetStore(st)
 	cl.SetStore(st)
 	s := &EtcdServer{
 	s := &EtcdServer{
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		Cluster:     cl,
-		reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:        n,
+			raftStorage: raft.NewMemoryStorage(),
+			storage:     &storageRecorder{},
+			transport:   &nopTransporter{},
+		},
+		store:    st,
+		Cluster:  cl,
+		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	s.start()
 	s.start()
 	m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
 	m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
@@ -911,13 +929,15 @@ func TestRemoveMember(t *testing.T) {
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
 	cl.AddMember(&Member{ID: 1234})
 	cl.AddMember(&Member{ID: 1234})
 	s := &EtcdServer{
 	s := &EtcdServer{
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		Cluster:     cl,
-		reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:        n,
+			raftStorage: raft.NewMemoryStorage(),
+			storage:     &storageRecorder{},
+			transport:   &nopTransporter{},
+		},
+		store:    st,
+		Cluster:  cl,
+		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	s.start()
 	s.start()
 	err := s.RemoveMember(context.TODO(), 1234)
 	err := s.RemoveMember(context.TODO(), 1234)
@@ -947,13 +967,15 @@ func TestUpdateMember(t *testing.T) {
 	cl.SetStore(st)
 	cl.SetStore(st)
 	cl.AddMember(&Member{ID: 1234})
 	cl.AddMember(&Member{ID: 1234})
 	s := &EtcdServer{
 	s := &EtcdServer{
-		node:        n,
-		raftStorage: raft.NewMemoryStorage(),
-		store:       st,
-		transport:   &nopTransporter{},
-		storage:     &storageRecorder{},
-		Cluster:     cl,
-		reqIDGen:    idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:        n,
+			raftStorage: raft.NewMemoryStorage(),
+			storage:     &storageRecorder{},
+			transport:   &nopTransporter{},
+		},
+		store:    st,
+		Cluster:  cl,
+		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	s.start()
 	s.start()
 	wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
 	wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
@@ -983,9 +1005,9 @@ func TestPublish(t *testing.T) {
 	w := &waitWithResponse{ch: ch}
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		id:         1,
 		id:         1,
+		r:          raftNode{Node: n},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
 		Cluster:    &Cluster{},
 		Cluster:    &Cluster{},
-		node:       n,
 		w:          w,
 		w:          w,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 	}
 	}
@@ -1022,13 +1044,15 @@ func TestPublish(t *testing.T) {
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 func TestPublishStopped(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:      &nodeRecorder{},
-		transport: &nopTransporter{},
-		Cluster:   &Cluster{},
-		w:         &waitRecorder{},
-		done:      make(chan struct{}),
-		stop:      make(chan struct{}),
-		reqIDGen:  idutil.NewGenerator(0, time.Time{}),
+		r: raftNode{
+			Node:      &nodeRecorder{},
+			transport: &nopTransporter{},
+		},
+		Cluster:  &Cluster{},
+		w:        &waitRecorder{},
+		done:     make(chan struct{}),
+		stop:     make(chan struct{}),
+		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	}
 	close(srv.done)
 	close(srv.done)
 	srv.publish(time.Hour)
 	srv.publish(time.Hour)
@@ -1040,7 +1064,7 @@ func TestPublishRetry(t *testing.T) {
 	defer log.SetOutput(os.Stderr)
 	defer log.SetOutput(os.Stderr)
 	n := &nodeRecorder{}
 	n := &nodeRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		node:     n,
+		r:        raftNode{Node: n},
 		w:        &waitRecorder{},
 		w:        &waitRecorder{},
 		done:     make(chan struct{}),
 		done:     make(chan struct{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),

+ 1 - 1
integration/cluster_test.go

@@ -484,6 +484,7 @@ func mustNewMember(t *testing.T, name string) *member {
 	m.NewCluster = true
 	m.NewCluster = true
 	m.Transport = mustNewTransport(t)
 	m.Transport = mustNewTransport(t)
 	m.ElectionTicks = electionTicks
 	m.ElectionTicks = electionTicks
+	m.TickMs = uint(tickDuration / time.Millisecond)
 	return m
 	return m
 }
 }
 
 
@@ -524,7 +525,6 @@ func (m *member) Launch() error {
 	if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
 	if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
 		return fmt.Errorf("failed to initialize the etcd server: %v", err)
 		return fmt.Errorf("failed to initialize the etcd server: %v", err)
 	}
 	}
-	m.s.Ticker = time.Tick(tickDuration)
 	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
 	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
 	m.s.Start()
 	m.s.Start()