123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- package rafttest
- import (
- "math/rand"
- "sync"
- "time"
- "github.com/coreos/etcd/raft/raftpb"
- )
- // a network interface
- type iface interface {
- send(m raftpb.Message)
- recv() chan raftpb.Message
- disconnect()
- connect()
- }
- // a network
- type network interface {
- // drop message at given rate (1.0 drops all messages)
- drop(from, to uint64, rate float64)
- // delay message for (0, d] randomly at given rate (1.0 delay all messages)
- // do we need rate here?
- delay(from, to uint64, d time.Duration, rate float64)
- disconnect(id uint64)
- connect(id uint64)
- // heal heals the network
- heal()
- }
- type raftNetwork struct {
- mu sync.Mutex
- disconnected map[uint64]bool
- dropmap map[conn]float64
- delaymap map[conn]delay
- recvQueues map[uint64]chan raftpb.Message
- }
- type conn struct {
- from, to uint64
- }
- type delay struct {
- d time.Duration
- rate float64
- }
- func newRaftNetwork(nodes ...uint64) *raftNetwork {
- pn := &raftNetwork{
- recvQueues: make(map[uint64]chan raftpb.Message),
- dropmap: make(map[conn]float64),
- delaymap: make(map[conn]delay),
- disconnected: make(map[uint64]bool),
- }
- for _, n := range nodes {
- pn.recvQueues[n] = make(chan raftpb.Message, 1024)
- }
- return pn
- }
- func (rn *raftNetwork) nodeNetwork(id uint64) iface {
- return &nodeNetwork{id: id, raftNetwork: rn}
- }
- func (rn *raftNetwork) send(m raftpb.Message) {
- rn.mu.Lock()
- to := rn.recvQueues[m.To]
- if rn.disconnected[m.To] {
- to = nil
- }
- drop := rn.dropmap[conn{m.From, m.To}]
- delay := rn.delaymap[conn{m.From, m.To}]
- rn.mu.Unlock()
- if to == nil {
- return
- }
- if drop != 0 && rand.Float64() < drop {
- return
- }
- // TODO: shall we delay without blocking the send call?
- if delay.d != 0 && rand.Float64() < delay.rate {
- rd := rand.Int63n(int64(delay.d))
- time.Sleep(time.Duration(rd))
- }
- select {
- case to <- m:
- default:
- // drop messages when the receiver queue is full.
- }
- }
- func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
- rn.mu.Lock()
- fromc := rn.recvQueues[from]
- if rn.disconnected[from] {
- fromc = nil
- }
- rn.mu.Unlock()
- return fromc
- }
- func (rn *raftNetwork) drop(from, to uint64, rate float64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.dropmap[conn{from, to}] = rate
- }
- func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.delaymap[conn{from, to}] = delay{d, rate}
- }
- func (rn *raftNetwork) heal() {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.dropmap = make(map[conn]float64)
- rn.delaymap = make(map[conn]delay)
- }
- func (rn *raftNetwork) disconnect(id uint64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.disconnected[id] = true
- }
- func (rn *raftNetwork) connect(id uint64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.disconnected[id] = false
- }
- type nodeNetwork struct {
- id uint64
- *raftNetwork
- }
- func (nt *nodeNetwork) connect() {
- nt.raftNetwork.connect(nt.id)
- }
- func (nt *nodeNetwork) disconnect() {
- nt.raftNetwork.disconnect(nt.id)
- }
- func (nt *nodeNetwork) send(m raftpb.Message) {
- nt.raftNetwork.send(m)
- }
- func (nt *nodeNetwork) recv() chan raftpb.Message {
- return nt.recvFrom(nt.id)
- }
|