Browse Source

wip: wal glue

Yicheng Qin 11 years ago
parent
commit
88e5bce63d
5 changed files with 91 additions and 24 deletions
  1. 14 4
      etcd/etcd.go
  2. 8 1
      etcd/etcd_test.go
  3. 61 16
      etcd/participant.go
  4. 7 2
      raft/node.go
  5. 1 1
      raft/raft.go

+ 14 - 4
etcd/etcd.go

@@ -22,6 +22,7 @@ import (
 	"log"
 	"net/http"
 	"net/url"
+	"os"
 	"sync"
 	"time"
 
@@ -55,7 +56,7 @@ type Server struct {
 	http.Handler
 }
 
-func New(c *config.Config) *Server {
+func New(c *config.Config) (*Server, error) {
 	if err := c.Sanitize(); err != nil {
 		log.Fatalf("server.new sanitizeErr=\"%v\"\n", err)
 	}
@@ -95,8 +96,12 @@ func New(c *config.Config) *Server {
 	s.Handler = m
 
 	log.Printf("id=%x server.new raftPubAddr=%s\n", s.id, s.raftPubAddr)
-
-	return s
+	if err = os.MkdirAll(s.config.DataDir, 0700); err != nil {
+		if !os.IsExist(err) {
+			return nil, err
+		}
+	}
+	return s, nil
 }
 
 func (s *Server) SetTick(tick time.Duration) {
@@ -176,7 +181,12 @@ func (s *Server) Run() error {
 		}
 		switch next {
 		case participantMode:
-			s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub, s.tickDuration)
+			p, err := newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.config.DataDir, s.client, s.peerHub, s.tickDuration)
+			if err != nil {
+				log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err)
+				return err
+			}
+			s.p = p
 			dStopc := make(chan struct{})
 			if d != nil {
 				go d.heartbeat(dStopc)

+ 8 - 1
etcd/etcd_test.go

@@ -23,6 +23,7 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"net/url"
+	"os"
 	"testing"
 	"time"
 
@@ -371,7 +372,13 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 }
 
 func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptest.Server) {
-	e = New(c)
+	c.DataDir = fmt.Sprintf("tests/etcd_%d", id)
+	os.RemoveAll(c.DataDir)
+
+	e, err := New(c)
+	if err != nil {
+		panic(err)
+	}
 	e.setId(id)
 	e.SetTick(time.Millisecond * 5)
 	m := http.NewServeMux()

+ 61 - 16
etcd/participant.go

@@ -22,6 +22,7 @@ import (
 	"log"
 	"math/rand"
 	"net/http"
+	"os"
 	"path"
 	"sync"
 	"time"
@@ -29,6 +30,7 @@ import (
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/store"
+	"github.com/coreos/etcd/wal"
 )
 
 const (
@@ -74,6 +76,7 @@ type participant struct {
 	node        *v2Raft
 	store.Store
 	rh *raftHandler
+	w  *wal.WAL
 
 	stopped bool
 	mu      sync.Mutex
@@ -82,12 +85,9 @@ type participant struct {
 	*http.ServeMux
 }
 
-func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant {
+func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
 	p := &participant{
-		id:           id,
 		clusterId:    -1,
-		pubAddr:      pubAddr,
-		raftPubAddr:  raftPubAddr,
 		tickDuration: tickDuration,
 
 		client:  client,
@@ -97,7 +97,6 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie
 		addNodeC:    make(chan raft.Config, 1),
 		removeNodeC: make(chan raft.Config, 1),
 		node: &v2Raft{
-			Node:   raft.New(id, defaultHeartbeat, defaultElection),
 			result: make(map[wait]chan interface{}),
 		},
 		Store: store.New(),
@@ -107,6 +106,31 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie
 		ServeMux: http.NewServeMux(),
 	}
 	p.rh = newRaftHandler(peerHub, p.Store.Version())
+
+	walPath := path.Join(dir, "wal")
+	w, err := wal.Open(walPath)
+	if err != nil {
+		if !os.IsNotExist(err) {
+			return nil, err
+		}
+		if w, err = wal.New(walPath); err != nil {
+			return nil, err
+		}
+		w.SaveInfo(p.id)
+		p.id = id
+		p.pubAddr = pubAddr
+		p.raftPubAddr = raftPubAddr
+		p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
+	} else {
+		n, err := w.LoadNode()
+		if err != nil {
+			return nil, err
+		}
+		p.id = n.Id
+		p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
+	}
+	p.w = w
+
 	p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
 	p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
 	p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
@@ -114,20 +138,23 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie
 	p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
 	p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
 	p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
-	return p
+
+	return p, nil
 }
 
 func (p *participant) run() int64 {
-	seeds := p.peerHub.getSeeds()
-	if len(seeds) == 0 {
-		log.Printf("id=%x participant.run action=bootstrap\n", p.id)
-		p.node.Campaign()
-		p.node.InitCluster(genId())
-		p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
-		p.apply(p.node.Next())
-	} else {
-		log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
-		p.join()
+	if p.node.IsEmpty() {
+		seeds := p.peerHub.getSeeds()
+		if len(seeds) == 0 {
+			log.Printf("id=%x participant.run action=bootstrap\n", p.id)
+			p.node.Campaign()
+			p.node.InitCluster(genId())
+			p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
+			p.apply(p.node.Next())
+		} else {
+			log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
+			p.join()
+		}
 	}
 
 	p.rh.start()
@@ -170,6 +197,8 @@ func (p *participant) run() int64 {
 			return stopMode
 		}
 		p.apply(node.Next())
+		_, ents := node.UnstableEnts()
+		p.save(ents, node.UnstableState())
 		p.send(node.Msgs())
 		if node.IsRemoved() {
 			p.stop()
@@ -187,6 +216,7 @@ func (p *participant) stop() {
 	}
 	p.stopped = true
 	close(p.stopc)
+	p.w.Close()
 }
 
 func (p *participant) raftHandler() http.Handler {
@@ -303,6 +333,10 @@ func (p *participant) apply(ents []raft.Entry) {
 			peer.participate()
 			pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
+			if p.id == cfg.NodeId {
+				p.raftPubAddr = cfg.Addr
+				p.pubAddr = string(cfg.Context)
+			}
 			log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
 		case raft.RemoveNode:
 			cfg := new(raft.Config)
@@ -324,6 +358,17 @@ func (p *participant) apply(ents []raft.Entry) {
 	}
 }
 
+func (p *participant) save(ents []raft.Entry, state raft.State) {
+	for _, ent := range ents {
+		p.w.SaveEntry(&ent)
+	}
+	if state != raft.EmptyState {
+		p.w.SaveState(&state)
+	}
+	p.w.Flush()
+
+}
+
 func (p *participant) send(msgs []raft.Message) {
 	for i := range msgs {
 		if err := p.peerHub.send(msgs[i]); err != nil {

+ 7 - 2
raft/node.go

@@ -204,6 +204,11 @@ func (n *Node) Tick() {
 	}
 }
 
+// IsEmpty returns ture if the log of the node is empty.
+func (n *Node) IsEmpty() bool {
+	return n.sm.raftLog.isEmpty()
+}
+
 func (n *Node) UpdateConf(t int64, c *Config) {
 	data, err := json.Marshal(c)
 	if err != nil {
@@ -219,8 +224,8 @@ func (n *Node) UnstableEnts() (int64, []Entry) {
 }
 
 func (n *Node) UnstableState() State {
-	if n.sm.unstableState == emptyState {
-		return emptyState
+	if n.sm.unstableState == EmptyState {
+		return EmptyState
 	}
 	s := n.sm.unstableState
 	n.sm.clearState()

+ 1 - 1
raft/raft.go

@@ -72,7 +72,7 @@ type State struct {
 	Commit int64
 }
 
-var emptyState = State{}
+var EmptyState = State{}
 
 type Message struct {
 	Type      messageType