|
|
@@ -4,7 +4,6 @@ import (
|
|
|
"fmt"
|
|
|
"math/rand"
|
|
|
"reflect"
|
|
|
- "runtime"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
"time"
|
|
|
@@ -368,16 +367,8 @@ func TestDoProposalStopped(t *testing.T) {
|
|
|
|
|
|
// TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
|
|
|
func TestSync(t *testing.T) {
|
|
|
- n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
|
|
- n.Campaign(context.TODO())
|
|
|
- select {
|
|
|
- case <-n.Ready():
|
|
|
- case <-time.After(time.Millisecond):
|
|
|
- t.Fatalf("expect to receive ready within 1ms, but fail")
|
|
|
- }
|
|
|
-
|
|
|
+ n := &nodeProposeDataRecorder{}
|
|
|
srv := &EtcdServer{
|
|
|
- // TODO: use fake node for better testability
|
|
|
Node: n,
|
|
|
}
|
|
|
start := time.Now()
|
|
|
@@ -388,46 +379,29 @@ func TestSync(t *testing.T) {
|
|
|
t.Errorf("CallSyncTime = %v, want < %v", d, time.Millisecond)
|
|
|
}
|
|
|
|
|
|
- // give time for goroutine in sync to run
|
|
|
- // TODO: use fake clock
|
|
|
- var ready raft.Ready
|
|
|
- select {
|
|
|
- case ready = <-n.Ready():
|
|
|
- case <-time.After(time.Millisecond):
|
|
|
- t.Fatalf("expect to receive ready within 1ms, but fail")
|
|
|
- }
|
|
|
-
|
|
|
- if len(ready.CommittedEntries) != 1 {
|
|
|
- t.Fatalf("len(CommittedEntries) = %d, want 1", len(ready.CommittedEntries))
|
|
|
+ testutil.ForceGosched()
|
|
|
+ data := n.data()
|
|
|
+ if len(data) != 1 {
|
|
|
+ t.Fatalf("len(proposeData) = %d, want 1", len(data))
|
|
|
}
|
|
|
- e := ready.CommittedEntries[0]
|
|
|
- var req pb.Request
|
|
|
- if err := req.Unmarshal(e.Data); err != nil {
|
|
|
- t.Fatalf("unmarshal error: %v", err)
|
|
|
+ var r pb.Request
|
|
|
+ if err := r.Unmarshal(data[0]); err != nil {
|
|
|
+ t.Fatalf("unmarshal request error: %v", err)
|
|
|
}
|
|
|
- if req.Method != "SYNC" {
|
|
|
- t.Errorf("method = %s, want SYNC", req.Method)
|
|
|
+ if r.Method != "SYNC" {
|
|
|
+ t.Errorf("method = %s, want SYNC", r.Method)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// TestSyncFail tests the case that sync 1. is non-blocking 2. fails to
|
|
|
-// propose SYNC request because there is no leader
|
|
|
-func TestSyncFail(t *testing.T) {
|
|
|
- // The node is run without Tick and Campaign, so it has no leader forever.
|
|
|
- n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
|
|
- select {
|
|
|
- case <-n.Ready():
|
|
|
- case <-time.After(time.Millisecond):
|
|
|
- t.Fatalf("expect to receive ready within 1ms, but fail")
|
|
|
- }
|
|
|
-
|
|
|
+// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
|
|
|
+// after timeout
|
|
|
+func TestSyncTimeout(t *testing.T) {
|
|
|
+ n := &nodeProposalBlockerRecorder{}
|
|
|
srv := &EtcdServer{
|
|
|
- // TODO: use fake node for better testability
|
|
|
Node: n,
|
|
|
}
|
|
|
- routineN := runtime.NumGoroutine()
|
|
|
start := time.Now()
|
|
|
- srv.sync(time.Millisecond)
|
|
|
+ srv.sync(0)
|
|
|
|
|
|
// check that sync is non-blocking
|
|
|
if d := time.Since(start); d > time.Millisecond {
|
|
|
@@ -436,36 +410,31 @@ func TestSyncFail(t *testing.T) {
|
|
|
|
|
|
// give time for goroutine in sync to cancel
|
|
|
// TODO: use fake clock
|
|
|
- time.Sleep(2 * time.Millisecond)
|
|
|
- if g := runtime.NumGoroutine(); g != routineN {
|
|
|
- t.Errorf("NumGoroutine = %d, want %d", g, routineN)
|
|
|
- }
|
|
|
- select {
|
|
|
- case g := <-n.Ready():
|
|
|
- t.Errorf("ready = %+v, want no", g)
|
|
|
- default:
|
|
|
+ testutil.ForceGosched()
|
|
|
+ w := []string{"Propose blocked"}
|
|
|
+ if g := n.Action(); !reflect.DeepEqual(g, w) {
|
|
|
+ t.Errorf("action = %v, want %v", g, w)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TODO: TestNoSyncWhenNoLeader
|
|
|
+
|
|
|
func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
|
|
|
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
|
|
|
n.Campaign(context.TODO())
|
|
|
st := &storeRecorder{}
|
|
|
- syncInterval := 5 * time.Millisecond
|
|
|
- syncTicker := time.NewTicker(syncInterval)
|
|
|
- defer syncTicker.Stop()
|
|
|
srv := &EtcdServer{
|
|
|
// TODO: use fake node for better testability
|
|
|
Node: n,
|
|
|
Store: st,
|
|
|
Send: func(_ []raftpb.Message) {},
|
|
|
Storage: &storageRecorder{},
|
|
|
- SyncTicker: syncTicker.C,
|
|
|
+ SyncTicker: time.After(0),
|
|
|
}
|
|
|
srv.Start()
|
|
|
// give time for sync request to be proposed and performed
|
|
|
// TODO: use fake clock
|
|
|
- time.Sleep(syncInterval + time.Millisecond)
|
|
|
+ testutil.ForceGosched()
|
|
|
srv.Stop()
|
|
|
|
|
|
action := st.Action()
|
|
|
@@ -621,6 +590,19 @@ func TestGetBool(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestGenID(t *testing.T) {
|
|
|
+ // Sanity check that the GenID function has been seeded appropriately
|
|
|
+ // (math/rand is seeded with 1 by default)
|
|
|
+ r := rand.NewSource(int64(1))
|
|
|
+ var n int64
|
|
|
+ for n == 0 {
|
|
|
+ n = r.Int63()
|
|
|
+ }
|
|
|
+ if n == GenID() {
|
|
|
+ t.Fatalf("GenID's rand seeded with 1!")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
type recorder struct {
|
|
|
sync.Mutex
|
|
|
action []string
|
|
|
@@ -764,15 +746,62 @@ func (n *readyNode) Compact(d []byte) {}
|
|
|
func (n *readyNode) AddNode(id int64) {}
|
|
|
func (n *readyNode) RemoveNode(id int64) {}
|
|
|
|
|
|
-func TestGenID(t *testing.T) {
|
|
|
- // Sanity check that the GenID function has been seeded appropriately
|
|
|
- // (math/rand is seeded with 1 by default)
|
|
|
- r := rand.NewSource(int64(1))
|
|
|
- var n int64
|
|
|
- for n == 0 {
|
|
|
- n = r.Int63()
|
|
|
- }
|
|
|
- if n == GenID() {
|
|
|
- t.Fatalf("GenID's rand seeded with 1!")
|
|
|
- }
|
|
|
+type nodeRecorder struct {
|
|
|
+ recorder
|
|
|
+}
|
|
|
+
|
|
|
+func (n *nodeRecorder) Tick() {
|
|
|
+ n.record("Tick")
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
|
|
+ n.record("Campaign")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
+ n.record("Propose")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
|
|
|
+ n.record("Step")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Ready() <-chan raft.Ready {
|
|
|
+ n.record("Ready")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Stop() {
|
|
|
+ n.record("Stop")
|
|
|
+}
|
|
|
+func (n *nodeRecorder) Compact(d []byte) {
|
|
|
+ n.record("Compact")
|
|
|
+}
|
|
|
+
|
|
|
+type nodeProposeDataRecorder struct {
|
|
|
+ nodeRecorder
|
|
|
+ sync.Mutex
|
|
|
+ d [][]byte
|
|
|
+}
|
|
|
+
|
|
|
+func (n *nodeProposeDataRecorder) data() [][]byte {
|
|
|
+ n.Lock()
|
|
|
+ d := n.d
|
|
|
+ n.Unlock()
|
|
|
+ return d
|
|
|
+}
|
|
|
+func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
+ n.nodeRecorder.Propose(ctx, data)
|
|
|
+ n.Lock()
|
|
|
+ n.d = append(n.d, data)
|
|
|
+ n.Unlock()
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+type nodeProposalBlockerRecorder struct {
|
|
|
+ nodeRecorder
|
|
|
+}
|
|
|
+
|
|
|
+func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
+ <-ctx.Done()
|
|
|
+ n.record("Propose blocked")
|
|
|
+ return nil
|
|
|
}
|