Browse Source

Merge pull request #1096 from jonboulle/1096_embedded

simplify initialization of EtcdServer to support embedded etcd
Jonathan Boulle 11 years ago
parent
commit
1ca5991c8c
5 changed files with 220 additions and 195 deletions
  1. 2 2
      etcdserver/etcdhttp/http.go
  2. 1 1
      etcdserver/etcdhttp/http_test.go
  3. 141 55
      etcdserver/server.go
  4. 67 67
      etcdserver/server_test.go
  5. 9 70
      main.go

+ 2 - 2
etcdserver/etcdhttp/http.go

@@ -35,10 +35,10 @@ const (
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
-func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler {
+func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
 	sh := &serverHandler{
 		server:       server,
-		clusterStore: clusterStore,
+		clusterStore: server.ClusterStore,
 		timer:        server,
 		timeout:      timeout,
 	}

+ 1 - 1
etcdserver/etcdhttp/http_test.go

@@ -591,7 +591,7 @@ func TestV2MachinesEndpoint(t *testing.T) {
 		{"POST", http.StatusMethodNotAllowed},
 	}
 
-	m := NewClientHandler(nil, &fakeCluster{}, time.Hour)
+	m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}}, time.Hour)
 	s := httptest.NewServer(m)
 	defer s.Close()
 

+ 141 - 55
etcdserver/server.go

@@ -5,6 +5,9 @@ import (
 	"errors"
 	"log"
 	"math/rand"
+	"net/http"
+	"os"
+	"path"
 	"sync/atomic"
 	"time"
 
@@ -12,15 +15,20 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/wait"
+	"github.com/coreos/etcd/wal"
 )
 
 const (
+	// owner can make/remove files inside the directory
+	privateDirMode = 0700
+
 	defaultSyncTimeout = time.Second
 	DefaultSnapCount   = 10000
-	// TODO: calculated based on heartbeat interval
+	// TODO: calculate based on heartbeat interval
 	defaultPublishRetryInterval = 5 * time.Second
 )
 
@@ -33,8 +41,7 @@ func init() {
 	rand.Seed(time.Now().UnixNano())
 }
 
-type SendFunc func(m []raftpb.Message)
-type SaveFunc func(st raftpb.HardState, ents []raftpb.Entry)
+type sendFunc func(m []raftpb.Message)
 
 type Response struct {
 	Event   *store.Event
@@ -76,34 +83,113 @@ type RaftTimer interface {
 	Term() int64
 }
 
+type ServerConfig struct {
+	Name       string
+	ClientURLs types.URLs
+	DataDir    string
+	SnapCount  int64
+	Cluster    *Cluster
+	Transport  *http.Transport
+}
+
+// NewServer creates a new EtcdServer from the supplied configuration. The
+// configuration is considered static for the lifetime of the EtcdServer.
+func NewServer(cfg *ServerConfig) *EtcdServer {
+	m := cfg.Cluster.FindName(cfg.Name)
+	if m == nil {
+		// Should never happen
+		log.Fatalf("could not find name %v in cluster!", cfg.Name)
+	}
+	snapdir := path.Join(cfg.DataDir, "snap")
+	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
+		log.Fatalf("etcdserver: cannot create snapshot directory: %v", err)
+	}
+	ss := snap.New(snapdir)
+	st := store.New()
+	var w *wal.WAL
+	var n raft.Node
+	var err error
+	waldir := path.Join(cfg.DataDir, "wal")
+	if !wal.Exist(waldir) {
+		if w, err = wal.Create(waldir); err != nil {
+			log.Fatal(err)
+		}
+		n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
+	} else {
+		var index int64
+		snapshot, err := ss.Load()
+		if err != nil && err != snap.ErrNoSnapshot {
+			log.Fatal(err)
+		}
+		if snapshot != nil {
+			log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index)
+			st.Recovery(snapshot.Data)
+			index = snapshot.Index
+		}
+
+		// restart a node from previous wal
+		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
+			log.Fatal(err)
+		}
+		wid, st, ents, err := w.ReadAll()
+		if err != nil {
+			log.Fatal(err)
+		}
+		// TODO(xiangli): save/recovery nodeID?
+		if wid != 0 {
+			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
+		}
+		n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents)
+	}
+
+	cls := NewClusterStore(st, *cfg.Cluster)
+
+	s := &EtcdServer{
+		store: st,
+		node:  n,
+		name:  cfg.Name,
+		storage: struct {
+			*wal.WAL
+			*snap.Snapshotter
+		}{w, ss},
+		send:         Sender(cfg.Transport, cls),
+		clientURLs:   cfg.ClientURLs,
+		ticker:       time.Tick(100 * time.Millisecond),
+		syncTicker:   time.Tick(500 * time.Millisecond),
+		snapCount:    cfg.SnapCount,
+		ClusterStore: cls,
+	}
+	return s
+}
+
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
-	w    wait.Wait
-	done chan struct{}
+	w          wait.Wait
+	done       chan struct{}
+	name       string
+	clientURLs types.URLs
 
-	Name       string
-	ClientURLs types.URLs
+	ClusterStore ClusterStore
 
-	Node  raft.Node
-	Store store.Store
+	node  raft.Node
+	store store.Store
 
-	// Send specifies the send function for sending msgs to members. Send
+	// send specifies the send function for sending msgs to members. send
 	// MUST NOT block. It is okay to drop messages, since clients should
-	// timeout and reissue their messages.  If Send is nil, server will
+	// timeout and reissue their messages.  If send is nil, server will
 	// panic.
-	Send SendFunc
+	send sendFunc
 
-	Storage Storage
+	storage Storage
 
-	Ticker     <-chan time.Time
-	SyncTicker <-chan time.Time
+	ticker     <-chan time.Time
+	syncTicker <-chan time.Time
 
-	SnapCount int64 // number of entries to trigger a snapshot
+	snapCount int64 // number of entries to trigger a snapshot
 
 	// Cache of the latest raft index and raft term the server has seen
-	raftIndex    int64
-	raftTerm     int64
-	ClusterStore ClusterStore
+	raftIndex int64
+	raftTerm  int64
 }
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
@@ -118,9 +204,9 @@ func (s *EtcdServer) Start() {
 // modify a server's fields after it has been sent to Start.
 // This function is just used for testing.
 func (s *EtcdServer) start() {
-	if s.SnapCount == 0 {
+	if s.snapCount == 0 {
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
-		s.SnapCount = DefaultSnapCount
+		s.snapCount = DefaultSnapCount
 	}
 	s.w = wait.New()
 	s.done = make(chan struct{})
@@ -130,7 +216,7 @@ func (s *EtcdServer) start() {
 }
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
-	return s.Node.Step(ctx, m)
+	return s.node.Step(ctx, m)
 }
 
 func (s *EtcdServer) run() {
@@ -139,12 +225,12 @@ func (s *EtcdServer) run() {
 	var snapi, appliedi int64
 	for {
 		select {
-		case <-s.Ticker:
-			s.Node.Tick()
-		case rd := <-s.Node.Ready():
-			s.Storage.Save(rd.HardState, rd.Entries)
-			s.Storage.SaveSnap(rd.Snapshot)
-			s.Send(rd.Messages)
+		case <-s.ticker:
+			s.node.Tick()
+		case rd := <-s.node.Ready():
+			s.storage.Save(rd.HardState, rd.Entries)
+			s.storage.SaveSnap(rd.Snapshot)
+			s.send(rd.Messages)
 
 			// TODO(bmizerany): do this in the background, but take
 			// care to apply entries in a single goroutine, and not
@@ -163,7 +249,7 @@ func (s *EtcdServer) run() {
 					if err := cc.Unmarshal(e.Data); err != nil {
 						panic("TODO: this is bad, what do we do about it?")
 					}
-					s.Node.ApplyConfChange(cc)
+					s.node.ApplyConfChange(cc)
 					s.w.Trigger(cc.ID, nil)
 				default:
 					panic("unexpected entry type")
@@ -179,20 +265,20 @@ func (s *EtcdServer) run() {
 
 			// recover from snapshot if it is more updated than current applied
 			if rd.Snapshot.Index > appliedi {
-				if err := s.Store.Recovery(rd.Snapshot.Data); err != nil {
+				if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
 					panic("TODO: this is bad, what do we do about it?")
 				}
 				appliedi = rd.Snapshot.Index
 			}
 
-			if appliedi-snapi > s.SnapCount {
+			if appliedi-snapi > s.snapCount {
 				s.snapshot()
 				snapi = appliedi
 			}
 
 			if rd.SoftState != nil {
 				if rd.RaftState == raft.StateLeader {
-					syncC = s.SyncTicker
+					syncC = s.syncTicker
 				} else {
 					syncC = nil
 				}
@@ -212,11 +298,11 @@ func (s *EtcdServer) run() {
 // Stop stops the server, and shuts down the running goroutine. Stop should be
 // called after a Start(s), otherwise it will block forever.
 func (s *EtcdServer) Stop() {
-	s.Node.Stop()
+	s.node.Stop()
 	close(s.done)
 }
 
-// Do interprets r and performs an operation on s.Store according to r.Method
+// Do interprets r and performs an operation on s.store according to r.Method
 // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
 // Quorum == true, r will be sent through consensus before performing its
 // respective operation. Do will block until an action is performed or there is
@@ -235,7 +321,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 			return Response{}, err
 		}
 		ch := s.w.Register(r.ID)
-		s.Node.Propose(ctx, data)
+		s.node.Propose(ctx, data)
 		select {
 		case x := <-ch:
 			resp := x.(Response)
@@ -249,13 +335,13 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	case "GET":
 		switch {
 		case r.Wait:
-			wc, err := s.Store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
+			wc, err := s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
 			if err != nil {
 				return Response{}, err
 			}
 			return Response{Watcher: wc}, nil
 		default:
-			ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
+			ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted)
 			if err != nil {
 				return Response{}, err
 			}
@@ -298,7 +384,7 @@ func (s *EtcdServer) Term() int64 {
 // It will block until the change is performed or there is an error.
 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
 	ch := s.w.Register(cc.ID)
-	if err := s.Node.ProposeConfChange(ctx, cc); err != nil {
+	if err := s.node.ProposeConfChange(ctx, cc); err != nil {
 		log.Printf("configure error: %v", err)
 		s.w.Trigger(cc.ID, nil)
 		return err
@@ -332,20 +418,20 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 	// There is no promise that node has leader when do SYNC request,
 	// so it uses goroutine to propose.
 	go func() {
-		s.Node.Propose(ctx, data)
+		s.node.Propose(ctx, data)
 		cancel()
 	}()
 }
 
 // publish registers server information into the cluster. The information
-// is the json format of its self member struct, whose ClientURLs may be
-// updated.
+// is the JSON representation of this server's member struct, updated with the
+// static clientURLs of the server.
 // The function keeps attempting to register until it succeeds,
 // or its server is stopped.
 // TODO: take care of info fetched from cluster store after having reconfig.
 func (s *EtcdServer) publish(retryInterval time.Duration) {
-	m := *s.ClusterStore.Get().FindName(s.Name)
-	m.ClientURLs = s.ClientURLs.StringSlice()
+	m := *s.ClusterStore.Get().FindName(s.name)
+	m.ClientURLs = s.clientURLs.StringSlice()
 	b, err := json.Marshal(m)
 	if err != nil {
 		log.Printf("etcdserver: json marshal error: %v", err)
@@ -392,31 +478,31 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 	expr := getExpirationTime(&r)
 	switch r.Method {
 	case "POST":
-		return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
+		return f(s.store.Create(r.Path, r.Dir, r.Val, true, expr))
 	case "PUT":
 		exists, existsSet := getBool(r.PrevExist)
 		switch {
 		case existsSet:
 			if exists {
-				return f(s.Store.Update(r.Path, r.Val, expr))
+				return f(s.store.Update(r.Path, r.Val, expr))
 			}
-			return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
+			return f(s.store.Create(r.Path, r.Dir, r.Val, false, expr))
 		case r.PrevIndex > 0 || r.PrevValue != "":
-			return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
+			return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
 		default:
-			return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
+			return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
 		}
 	case "DELETE":
 		switch {
 		case r.PrevIndex > 0 || r.PrevValue != "":
-			return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
+			return f(s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
 		default:
-			return f(s.Store.Delete(r.Path, r.Dir, r.Recursive))
+			return f(s.store.Delete(r.Path, r.Dir, r.Recursive))
 		}
 	case "QGET":
-		return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
+		return f(s.store.Get(r.Path, r.Recursive, r.Sorted))
 	case "SYNC":
-		s.Store.DeleteExpiredKeys(time.Unix(0, r.Time))
+		s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
 		return Response{}
 	default:
 		// This should never be reached, but just in case:
@@ -426,14 +512,14 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 
 // TODO: non-blocking snapshot
 func (s *EtcdServer) snapshot() {
-	d, err := s.Store.Save()
+	d, err := s.store.Save()
 	// TODO: current store will never fail to do a snapshot
 	// what should we do if the store might fail?
 	if err != nil {
 		panic("TODO: this is bad, what do we do about it?")
 	}
-	s.Node.Compact(d)
-	s.Storage.Cut()
+	s.node.Compact(d)
+	s.storage.Cut()
 }
 
 // TODO: move the function to /id pkg maybe?

+ 67 - 67
etcdserver/server_test.go

@@ -76,7 +76,7 @@ func TestDoLocalAction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		st := &storeRecorder{}
-		srv := &EtcdServer{Store: st}
+		srv := &EtcdServer{store: st}
 		resp, err := srv.Do(context.TODO(), tt.req)
 
 		if err != tt.werr {
@@ -112,7 +112,7 @@ func TestDoBadLocalAction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		st := &errStoreRecorder{err: storeErr}
-		srv := &EtcdServer{Store: st}
+		srv := &EtcdServer{store: st}
 		resp, err := srv.Do(context.Background(), tt.req)
 
 		if err != storeErr {
@@ -355,7 +355,7 @@ func TestApply(t *testing.T) {
 
 	for i, tt := range tests {
 		st := &storeRecorder{}
-		srv := &EtcdServer{Store: st}
+		srv := &EtcdServer{store: st}
 		resp := srv.apply(tt.req)
 
 		if !reflect.DeepEqual(resp, tt.wresp) {
@@ -380,7 +380,7 @@ func testServer(t *testing.T, ns int64) {
 	send := func(msgs []raftpb.Message) {
 		for _, m := range msgs {
 			t.Logf("m = %+v\n", m)
-			ss[m.To-1].Node.Step(ctx, m)
+			ss[m.To-1].node.Step(ctx, m)
 		}
 	}
 
@@ -395,11 +395,11 @@ func testServer(t *testing.T, ns int64) {
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
 		srv := &EtcdServer{
-			Node:    n,
-			Store:   store.New(),
-			Send:    send,
-			Storage: &storageRecorder{},
-			Ticker:  tk.C,
+			node:    n,
+			store:   store.New(),
+			send:    send,
+			storage: &storageRecorder{},
+			ticker:  tk.C,
 		}
 		srv.start()
 		// TODO(xiangli): randomize election timeout
@@ -440,7 +440,7 @@ func testServer(t *testing.T, ns int64) {
 	var last interface{}
 	for i, sv := range ss {
 		sv.Stop()
-		g, _ := sv.Store.Get("/", true, true)
+		g, _ := sv.store.Get("/", true, true)
 		if last != nil && !reflect.DeepEqual(last, g) {
 			t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
 		}
@@ -464,11 +464,11 @@ func TestDoProposal(t *testing.T) {
 		// this makes <-tk always successful, which accelerates internal clock
 		close(tk)
 		srv := &EtcdServer{
-			Node:    n,
-			Store:   st,
-			Send:    func(_ []raftpb.Message) {},
-			Storage: &storageRecorder{},
-			Ticker:  tk,
+			node:    n,
+			store:   st,
+			send:    func(_ []raftpb.Message) {},
+			storage: &storageRecorder{},
+			ticker:  tk,
 		}
 		srv.start()
 		resp, err := srv.Do(ctx, tt)
@@ -496,8 +496,8 @@ func TestDoProposalCancelled(t *testing.T) {
 	wait := &waitRecorder{}
 	srv := &EtcdServer{
 		// TODO: use fake node for better testability
-		Node:  n,
-		Store: st,
+		node:  n,
+		store: st,
 		w:     wait,
 	}
 
@@ -534,11 +534,11 @@ func TestDoProposalStopped(t *testing.T) {
 	close(tk)
 	srv := &EtcdServer{
 		// TODO: use fake node for better testability
-		Node:    n,
-		Store:   st,
-		Send:    func(_ []raftpb.Message) {},
-		Storage: &storageRecorder{},
-		Ticker:  tk,
+		node:    n,
+		store:   st,
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
+		ticker:  tk,
 	}
 	srv.start()
 
@@ -564,7 +564,7 @@ func TestDoProposalStopped(t *testing.T) {
 func TestSync(t *testing.T) {
 	n := &nodeProposeDataRecorder{}
 	srv := &EtcdServer{
-		Node: n,
+		node: n,
 	}
 	start := time.Now()
 	srv.sync(defaultSyncTimeout)
@@ -593,7 +593,7 @@ func TestSync(t *testing.T) {
 func TestSyncTimeout(t *testing.T) {
 	n := &nodeProposalBlockerRecorder{}
 	srv := &EtcdServer{
-		Node: n,
+		node: n,
 	}
 	start := time.Now()
 	srv.sync(0)
@@ -634,11 +634,11 @@ func TestSyncTrigger(t *testing.T) {
 	}
 	st := make(chan time.Time, 1)
 	srv := &EtcdServer{
-		Node:       n,
-		Store:      &storeRecorder{},
-		Send:       func(_ []raftpb.Message) {},
-		Storage:    &storageRecorder{},
-		SyncTicker: st,
+		node:       n,
+		store:      &storeRecorder{},
+		send:       func(_ []raftpb.Message) {},
+		storage:    &storageRecorder{},
+		syncTicker: st,
 	}
 	srv.start()
 	// trigger the server to become a leader and accept sync requests
@@ -673,9 +673,9 @@ func TestSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	s := &EtcdServer{
-		Store:   st,
-		Storage: p,
-		Node:    n,
+		store:   st,
+		storage: p,
+		node:    n,
 	}
 
 	s.snapshot()
@@ -704,15 +704,15 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	s := &EtcdServer{
-		Store:     st,
-		Send:      func(_ []raftpb.Message) {},
-		Storage:   p,
-		Node:      n,
-		SnapCount: 10,
+		store:     st,
+		send:      func(_ []raftpb.Message) {},
+		storage:   p,
+		node:      n,
+		snapCount: 10,
 	}
 
 	s.start()
-	for i := 0; int64(i) < s.SnapCount; i++ {
+	for i := 0; int64(i) < s.snapCount; i++ {
 		s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
 	}
 	time.Sleep(time.Millisecond)
@@ -721,8 +721,8 @@ func TestTriggerSnap(t *testing.T) {
 	gaction := p.Action()
 	// each operation is recorded as a Save
 	// Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
-	if len(gaction) != 3+int(s.SnapCount) {
-		t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.SnapCount))
+	if len(gaction) != 3+int(s.snapCount) {
+		t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.snapCount))
 	}
 	if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) {
 		t.Errorf("action = %s, want SaveSnap", gaction[12])
@@ -736,10 +736,10 @@ func TestRecvSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	s := &EtcdServer{
-		Store:   st,
-		Send:    func(_ []raftpb.Message) {},
-		Storage: p,
-		Node:    n,
+		store:   st,
+		send:    func(_ []raftpb.Message) {},
+		storage: p,
+		node:    n,
 	}
 
 	s.start()
@@ -764,10 +764,10 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	n := newReadyNode()
 	st := &storeRecorder{}
 	s := &EtcdServer{
-		Store:   st,
-		Send:    func(_ []raftpb.Message) {},
-		Storage: &storageRecorder{},
-		Node:    n,
+		store:   st,
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
+		node:    n,
 	}
 
 	s.start()
@@ -790,10 +790,10 @@ func TestRecvSlowSnapshot(t *testing.T) {
 func TestAddNode(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	s := &EtcdServer{
-		Node:    n,
-		Store:   &storeRecorder{},
-		Send:    func(_ []raftpb.Message) {},
-		Storage: &storageRecorder{},
+		node:    n,
+		store:   &storeRecorder{},
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
 	}
 	s.start()
 	s.AddNode(context.TODO(), 1, []byte("foo"))
@@ -810,10 +810,10 @@ func TestAddNode(t *testing.T) {
 func TestRemoveNode(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	s := &EtcdServer{
-		Node:    n,
-		Store:   &storeRecorder{},
-		Send:    func(_ []raftpb.Message) {},
-		Storage: &storageRecorder{},
+		node:    n,
+		store:   &storeRecorder{},
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
 	}
 	s.start()
 	s.RemoveNode(context.TODO(), 1)
@@ -831,10 +831,10 @@ func TestRemoveNode(t *testing.T) {
 func TestServerStopItself(t *testing.T) {
 	n := newReadyNode()
 	s := &EtcdServer{
-		Node:    n,
-		Store:   &storeRecorder{},
-		Send:    func(_ []raftpb.Message) {},
-		Storage: &storageRecorder{},
+		node:    n,
+		store:   &storeRecorder{},
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
 	}
 	s.start()
 	n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}}
@@ -856,9 +856,9 @@ func TestPublish(t *testing.T) {
 	ch <- Response{}
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
-		Name:         "node1",
-		ClientURLs:   []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}},
-		Node:         n,
+		name:         "node1",
+		clientURLs:   []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}},
+		node:         n,
 		ClusterStore: cs,
 		w:            w,
 	}
@@ -892,8 +892,8 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	srv := &EtcdServer{
-		Name:         "node1",
-		Node:         &nodeRecorder{},
+		name:         "node1",
+		node:         &nodeRecorder{},
 		ClusterStore: cs,
 		w:            &waitRecorder{},
 		done:         make(chan struct{}),
@@ -907,8 +907,8 @@ func TestPublishRetry(t *testing.T) {
 	n := &nodeRecorder{}
 	cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
 	srv := &EtcdServer{
-		Name:         "node1",
-		Node:         n,
+		name:         "node1",
+		node:         n,
 		ClusterStore: cs,
 		w:            &waitRecorder{},
 		done:         make(chan struct{}),

+ 9 - 70
main.go

@@ -6,7 +6,6 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"path"
 	"strings"
 	"time"
 
@@ -17,9 +16,6 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/raft"
-	"github.com/coreos/etcd/snap"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/etcd/wal"
 )
 
 const (
@@ -33,7 +29,7 @@ var (
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
-	snapCount    = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
+	snapCount    = flag.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 
 	cluster   = &etcdserver.Cluster{}
@@ -125,93 +121,36 @@ func startEtcd() {
 		log.Fatalf("etcd: cannot use None(%d) as member id", raft.None)
 	}
 
-	if *snapCount <= 0 {
-		log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount)
-	}
-
 	if *dir == "" {
 		*dir = fmt.Sprintf("%v_etcd_data", self.ID)
-		log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
+		log.Printf("main: no data-dir provided, using default data-dir ./%s", *dir)
 	}
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 		log.Fatalf("main: cannot create data directory: %v", err)
 	}
-	snapdir := path.Join(*dir, "snap")
-	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
-		log.Fatalf("etcd: cannot create snapshot directory: %v", err)
-	}
-	snapshotter := snap.New(snapdir)
-
-	waldir := path.Join(*dir, "wal")
-	var w *wal.WAL
-	var n raft.Node
-	var err error
-	st := store.New()
-
-	if !wal.Exist(waldir) {
-		w, err = wal.Create(waldir)
-		if err != nil {
-			log.Fatal(err)
-		}
-		n = raft.StartNode(self.ID, cluster.IDs(), 10, 1)
-	} else {
-		var index int64
-		snapshot, err := snapshotter.Load()
-		if err != nil && err != snap.ErrNoSnapshot {
-			log.Fatal(err)
-		}
-		if snapshot != nil {
-			log.Printf("etcd: restart from snapshot at index %d", snapshot.Index)
-			st.Recovery(snapshot.Data)
-			index = snapshot.Index
-		}
-
-		// restart a node from previous wal
-		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
-			log.Fatal(err)
-		}
-		wid, st, ents, err := w.ReadAll()
-		if err != nil {
-			log.Fatal(err)
-		}
-		// TODO(xiangli): save/recovery nodeID?
-		if wid != 0 {
-			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
-		}
-		n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents)
-	}
 
 	pt, err := transport.NewTransport(peerTLSInfo)
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	cls := etcdserver.NewClusterStore(st, *cluster)
-
 	acurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-client-urls", "addr", clientTLSInfo)
 	if err != nil {
 		log.Fatal(err.Error())
 	}
-
-	s := &etcdserver.EtcdServer{
+	cfg := &etcdserver.ServerConfig{
 		Name:       *name,
 		ClientURLs: acurls,
-		Store:      st,
-		Node:       n,
-		Storage: struct {
-			*wal.WAL
-			*snap.Snapshotter
-		}{w, snapshotter},
-		Send:         etcdserver.Sender(pt, cls),
-		Ticker:       time.Tick(100 * time.Millisecond),
-		SyncTicker:   time.Tick(500 * time.Millisecond),
-		SnapCount:    *snapCount,
-		ClusterStore: cls,
+		DataDir:    *dir,
+		SnapCount:  int64(*snapCount),
+		Cluster:    cluster,
+		Transport:  pt,
 	}
+	s := etcdserver.NewServer(cfg)
 	s.Start()
 
 	ch := &pkg.CORSHandler{
-		Handler: etcdhttp.NewClientHandler(s, cls, *timeout),
+		Handler: etcdhttp.NewClientHandler(s, *timeout),
 		Info:    cors,
 	}
 	ph := etcdhttp.NewPeerHandler(s)