| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package rafttest
- import (
- "sync"
- "time"
- "github.com/coreos/etcd/raft/raftpb"
- )
- type network interface {
- send(m raftpb.Message)
- recv() chan raftpb.Message
- // drop message at given rate (1.0 drops all messages)
- drop(from, to uint64, rate float64)
- // delay message for (0, d] randomly at given rate (1.0 delay all messages)
- // do we need rate here?
- delay(from, to uint64, d time.Duration, rate float64)
- disconnect(id uint64)
- connect(id uint64)
- }
- type raftNetwork struct {
- mu sync.Mutex
- disconnected map[uint64]bool
- recvQueues map[uint64]chan raftpb.Message
- }
- func newRaftNetwork(nodes ...uint64) *raftNetwork {
- pn := &raftNetwork{
- recvQueues: make(map[uint64]chan raftpb.Message),
- disconnected: make(map[uint64]bool),
- }
- for _, n := range nodes {
- pn.recvQueues[n] = make(chan raftpb.Message, 1024)
- }
- return pn
- }
- func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
- return &nodeNetwork{id: id, raftNetwork: rn}
- }
- func (rn *raftNetwork) send(m raftpb.Message) {
- rn.mu.Lock()
- to := rn.recvQueues[m.To]
- if rn.disconnected[m.To] {
- to = nil
- }
- rn.mu.Unlock()
- if to == nil {
- return
- }
- to <- m
- }
- func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
- rn.mu.Lock()
- fromc := rn.recvQueues[from]
- if rn.disconnected[from] {
- fromc = nil
- }
- rn.mu.Unlock()
- return fromc
- }
- func (rn *raftNetwork) drop(from, to uint64, rate float64) {
- panic("unimplemented")
- }
- func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
- panic("unimplemented")
- }
- func (rn *raftNetwork) disconnect(id uint64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.disconnected[id] = true
- }
- func (rn *raftNetwork) connect(id uint64) {
- rn.mu.Lock()
- defer rn.mu.Unlock()
- rn.disconnected[id] = false
- }
- type nodeNetwork struct {
- id uint64
- *raftNetwork
- }
- func (nt *nodeNetwork) send(m raftpb.Message) {
- nt.raftNetwork.send(m)
- }
- func (nt *nodeNetwork) recv() chan raftpb.Message {
- return nt.recvFrom(nt.id)
- }
|