Browse Source

Merge pull request #2252 from xiang90/raftdelay

rafttest: add network delay
Xiang Li 11 years ago
parent
commit
af00536d71
2 changed files with 39 additions and 1 deletions
  1. 17 1
      raft/rafttest/network.go
  2. 22 0
      raft/rafttest/network_test.go

+ 17 - 1
raft/rafttest/network.go

@@ -33,6 +33,7 @@ type raftNetwork struct {
 	mu           sync.Mutex
 	disconnected map[uint64]bool
 	dropmap      map[conn]float64
+	delaymap     map[conn]delay
 	recvQueues   map[uint64]chan raftpb.Message
 }
 
@@ -40,10 +41,16 @@ type conn struct {
 	from, to uint64
 }
 
+type delay struct {
+	d    time.Duration
+	rate float64
+}
+
 func newRaftNetwork(nodes ...uint64) *raftNetwork {
 	pn := &raftNetwork{
 		recvQueues:   make(map[uint64]chan raftpb.Message),
 		dropmap:      make(map[conn]float64),
+		delaymap:     make(map[conn]delay),
 		disconnected: make(map[uint64]bool),
 	}
 
@@ -64,6 +71,7 @@ func (rn *raftNetwork) send(m raftpb.Message) {
 		to = nil
 	}
 	drop := rn.dropmap[conn{m.From, m.To}]
+	delay := rn.delaymap[conn{m.From, m.To}]
 	rn.mu.Unlock()
 
 	if to == nil {
@@ -72,6 +80,11 @@ func (rn *raftNetwork) send(m raftpb.Message) {
 	if drop != 0 && rand.Float64() < drop {
 		return
 	}
+	// TODO: shall we delay without blocking the send call?
+	if delay.d != 0 && rand.Float64() < delay.rate {
+		rd := rand.Int63n(int64(delay.d))
+		time.Sleep(time.Duration(rd))
+	}
 
 	to <- m
 }
@@ -94,13 +107,16 @@ func (rn *raftNetwork) drop(from, to uint64, rate float64) {
 }
 
 func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
-	panic("unimplemented")
+	rn.mu.Lock()
+	defer rn.mu.Unlock()
+	rn.delaymap[conn{from, to}] = delay{d, rate}
 }
 
 func (rn *raftNetwork) heal() {
 	rn.mu.Lock()
 	defer rn.mu.Unlock()
 	rn.dropmap = make(map[conn]float64)
+	rn.delaymap = make(map[conn]delay)
 }
 
 func (rn *raftNetwork) disconnect(id uint64) {

+ 22 - 0
raft/rafttest/network_test.go

@@ -2,6 +2,7 @@ package rafttest
 
 import (
 	"testing"
+	"time"
 
 	"github.com/coreos/etcd/raft/raftpb"
 )
@@ -34,3 +35,24 @@ func TestNetworkDrop(t *testing.T) {
 		t.Errorf("drop = %d, want around %d", drop, droprate*float64(sent))
 	}
 }
+
+func TestNetworkDelay(t *testing.T) {
+	sent := 1000
+	delay := time.Millisecond
+	delayrate := 0.1
+	nt := newRaftNetwork(1, 2)
+
+	nt.delay(1, 2, delay, delayrate)
+	var total time.Duration
+	for i := 0; i < sent; i++ {
+		s := time.Now()
+		nt.send(raftpb.Message{From: 1, To: 2})
+		total += time.Since(s)
+	}
+
+	w := time.Duration(float64(sent)*delayrate/2) * delay
+	// there are pretty overhead in the send call, since it genarete random numbers.
+	if total < w+10*delay {
+		t.Errorf("total = %v, want > %v", total, w)
+	}
+}