Browse Source

...deadlocked...

Blake Mizerany 11 years ago
parent
commit
f8be54b416
4 changed files with 80 additions and 26 deletions
  1. 3 1
      etcdserver2/server.go
  2. 53 10
      etcdserver2/server_test.go
  3. 19 15
      raft/node.go
  4. 5 0
      store/store.go

+ 3 - 1
etcdserver2/server.go

@@ -88,7 +88,9 @@ func (s *Server) run() {
 	}
 }
 
-func (s *Server) Stop() { close(s.done) }
+func (s *Server) Stop() {
+	s.done <- struct{}{}
+}
 
 func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
 	if r.Id == 0 {

+ 53 - 10
etcdserver2/server_test.go

@@ -3,6 +3,7 @@ package etcdserver
 import (
 	"reflect"
 	"testing"
+	"time"
 	"code.google.com/p/go.net/context"
 
 	pb "github.com/coreos/etcd/etcdserver2/etcdserverpb"
@@ -11,21 +12,51 @@ import (
 	"github.com/coreos/etcd/store"
 )
 
-func TestServer(t *testing.T) {
+func TestClusterOf1(t *testing.T) { testServer(t, 1) }
+func TestClusterOf3(t *testing.T) { testServer(t, 3) }
+
+func testServer(t *testing.T, ns int64) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	n := raft.Start(ctx, 1, []int64{1})
-	n.Campaign(ctx)
+	ss := make([]*Server, ns)
+
+	send := func(msgs []raftpb.Message) {
+		var m raftpb.Message
+		for len(msgs) > 0 {
+			m, msgs = msgs[0], msgs[1:]
+			t.Logf("sending: %+v", m)
+			if err := ss[m.To].Node.Step(ctx, m); err != nil {
+				t.Fatal(err)
+			}
+			rd := raft.RecvReadyNow(ss[m.To].Node)
+			msgs = append(msgs, rd.Messages...)
+		}
+	}
+
+	peers := make([]int64, ns)
+	for i := int64(0); i < ns; i++ {
+		peers[i] = i
+	}
+
+	var srv *Server
+	for i := int64(0); i < ns; i++ {
+		n := raft.Start(ctx, i, peers)
+
+		srv = &Server{
+			Node:  n,
+			Store: store.New(),
+			Send:  send,
+			Save:  func(_ raftpb.State, _ []raftpb.Entry) {},
+		}
+		Start(srv)
 
-	srv := &Server{
-		Node:  n,
-		Store: store.New(),
-		Send:  func(_ []raftpb.Message) {},
-		Save:  func(_ raftpb.State, _ []raftpb.Entry) {},
+		ss[i] = srv
+	}
+
+	if err := srv.Node.Campaign(ctx); err != nil {
+		t.Fatal(err)
 	}
-	Start(srv)
-	defer srv.Stop()
 
 	r := pb.Request{
 		Method: "PUT",
@@ -49,6 +80,18 @@ func TestServer(t *testing.T) {
 		t.Error("value:", *g.Value)
 		t.Errorf("g = %+v, w %+v", g, w)
 	}
+
+	time.Sleep(10 * time.Millisecond)
+
+	var last interface{}
+	for i, sv := range ss {
+		sv.Stop()
+		g := store.Root(sv.Store)
+		if last != nil && !reflect.DeepEqual(last, g) {
+			t.Errorf("server %d: Root = %#v, want %#v", i, g, last)
+		}
+		last = g
+	}
 }
 
 func stringp(s string) *string { return &s }

+ 19 - 15
raft/node.go

@@ -33,20 +33,22 @@ func (rd Ready) containsUpdates(prev Ready) bool {
 }
 
 type Node struct {
-	ctx    context.Context
-	propc  chan pb.Message
-	recvc  chan pb.Message
-	readyc chan Ready
-	tickc  chan struct{}
+	ctx          context.Context
+	propc        chan pb.Message
+	recvc        chan pb.Message
+	readyc       chan Ready
+	tickc        chan struct{}
+	alwaysreadyc chan Ready
 }
 
 func Start(ctx context.Context, id int64, peers []int64) Node {
 	n := Node{
-		ctx:    ctx,
-		propc:  make(chan pb.Message),
-		recvc:  make(chan pb.Message),
-		readyc: make(chan Ready),
-		tickc:  make(chan struct{}),
+		ctx:          ctx,
+		propc:        make(chan pb.Message),
+		recvc:        make(chan pb.Message),
+		readyc:       make(chan Ready),
+		tickc:        make(chan struct{}),
+		alwaysreadyc: make(chan Ready),
 	}
 	r := newRaft(id, peers)
 	go n.run(r)
@@ -94,6 +96,8 @@ func (n *Node) run(r *raft) {
 			r.raftLog.resetNextEnts()
 			r.raftLog.resetUnstable()
 			r.msgs = nil
+		case n.alwaysreadyc <- rd:
+			// this is for testing only
 		case <-n.ctx.Done():
 			return
 		}
@@ -141,8 +145,8 @@ func (n *Node) Ready() <-chan Ready {
 	return n.readyc
 }
 
-type byMsgType []pb.Message
-
-func (msgs byMsgType) Len() int           { return len(msgs) }
-func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp }
-func (msgs byMsgType) Swap(i, j int)      { msgs[i], msgs[j] = msgs[i], msgs[j] }
+// RecvReadyNow returns the state of n without blocking. It is primarly for
+// testing purposes only.
+func RecvReadyNow(n Node) Ready {
+	return <-n.alwaysreadyc
+}

+ 5 - 0
store/store.go

@@ -75,6 +75,11 @@ func New() Store {
 	return newStore()
 }
 
+// Root returns the root of a Store and is for testing only.
+func Root(st Store) interface{} {
+	return st.(*store).Root
+}
+
 func newStore() *store {
 	s := new(store)
 	s.CurrentVersion = defaultVersion