Browse Source

etcd: snap and wal init

Xiang Li 11 years ago
parent
commit
63489b9ef5
2 changed files with 61 additions and 8 deletions
  1. 57 8
      etcd/participant.go
  2. 4 0
      raft/node.go

+ 57 - 8
etcd/participant.go

@@ -22,12 +22,14 @@ import (
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
 	"net/http"
 	"net/http"
+	"os"
 	"path"
 	"path"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/conf"
 	"github.com/coreos/etcd/conf"
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
 )
 )
@@ -79,6 +81,7 @@ type participant struct {
 	store.Store
 	store.Store
 	rh          *raftHandler
 	rh          *raftHandler
 	w           *wal.WAL
 	w           *wal.WAL
+	snapshotter *snap.Snapshotter
 	serverStats *raftServerStats
 	serverStats *raftServerStats
 
 
 	stopNotifyc chan struct{}
 	stopNotifyc chan struct{}
@@ -111,14 +114,30 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 	p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
 	p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
 	p.peerHub.setServerStats(p.serverStats)
 	p.peerHub.setServerStats(p.serverStats)
 
 
-	walPath := p.cfg.DataDir
+	snapDir := path.Join(p.cfg.DataDir, "snap")
+	if err := os.Mkdir(snapDir, 0700); err != nil {
+		if !os.IsExist(err) {
+			log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
+			return nil, err
+		}
+	}
+	p.snapshotter = snap.New(snapDir)
+
+	walDir := path.Join(p.cfg.DataDir, "wal")
+	if err := os.Mkdir(walDir, 0700); err != nil {
+		if !os.IsExist(err) {
+			log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
+			return nil, err
+		}
+	}
+
 	var w *wal.WAL
 	var w *wal.WAL
 	var err error
 	var err error
-	if !wal.Exist(walPath) {
+	if !wal.Exist(walDir) {
 		p.id = genId()
 		p.id = genId()
 		p.pubAddr = c.Addr
 		p.pubAddr = c.Addr
 		p.raftPubAddr = c.Peer.Addr
 		p.raftPubAddr = c.Peer.Addr
-		if w, err = wal.Create(walPath); err != nil {
+		if w, err = wal.Create(walDir); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
 		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
@@ -126,17 +145,30 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
 		if err = w.SaveInfo(&info); err != nil {
 		if err = w.SaveInfo(&info); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		log.Printf("id=%x participant.new path=%s\n", p.id, walPath)
+		log.Printf("id=%x participant.new path=%s\n", p.id, walDir)
 	} else {
 	} else {
-		n, err := wal.Read(walPath, 0)
+		s, err := p.snapshotter.Load()
+		if err != nil && err != snap.ErrNoSnapshot {
+			log.Printf("id=%x participant.snapload err=%s\n", p.id, err)
+			return nil, err
+		}
+		if s != nil {
+			p.node.Restore(*s)
+			if err := p.Recovery(s.Data); err != nil {
+				panic(err)
+			}
+			log.Printf("id=%x recovered index=%d\n", p.id, s.Index)
+		}
+
+		n, err := wal.Read(walDir, 0)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 		p.id = n.Id
 		p.id = n.Id
 		p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
 		p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
 		p.apply(p.node.Next())
 		p.apply(p.node.Next())
-		log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walPath, n.State, len(n.Ents))
-		if w, err = wal.Open(walPath); err != nil {
+		log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walDir, n.State, len(n.Ents))
+		if w, err = wal.Open(walDir); err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
 	}
 	}
@@ -236,7 +268,24 @@ func (p *participant) run(stop chan struct{}) {
 				panic(err)
 				panic(err)
 			}
 			}
 			p.node.Compact(d)
 			p.node.Compact(d)
-			log.Printf("id=%x compacted index=\n", p.id)
+			snap := p.node.GetSnap()
+			log.Printf("id=%x compacted index=%d", p.id, snap.Index)
+			if err := p.snapshotter.Save(&snap); err != nil {
+				log.Printf("id=%d snapshot err=%v", p.id, err)
+				// todo(xiangli): consume the error?
+				panic(err)
+			}
+			if err := p.w.Cut(p.node.Index()); err != nil {
+				log.Printf("id=%d wal.cut err=%v", p.id, err)
+				// todo(xiangli): consume the error?
+				panic(err)
+			}
+			info := p.node.Info()
+			if err = p.w.SaveInfo(&info); err != nil {
+				log.Printf("id=%d wal.saveInfo err=%v", p.id, err)
+				// todo(xiangli): consume the error?
+				panic(err)
+			}
 		}
 		}
 	}
 	}
 }
 }

+ 4 - 0
raft/node.go

@@ -231,6 +231,10 @@ func (n *Node) UpdateConf(t int64, c *Config) {
 	n.propose(t, data)
 	n.propose(t, data)
 }
 }
 
 
+func (n *Node) Restore(s Snapshot) bool {
+	return n.sm.restore(s)
+}
+
 // UnstableEnts retuens all the entries that need to be persistent.
 // UnstableEnts retuens all the entries that need to be persistent.
 // The first return value is offset, and the second one is unstable entries.
 // The first return value is offset, and the second one is unstable entries.
 func (n *Node) UnstableEnts() []Entry {
 func (n *Node) UnstableEnts() []Entry {