network.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package rafttest
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/coreos/etcd/raft/raftpb"
  6. )
  7. type network interface {
  8. send(m raftpb.Message)
  9. recv() chan raftpb.Message
  10. // drop message at given rate (1.0 drops all messages)
  11. drop(from, to uint64, rate float64)
  12. // delay message for (0, d] randomly at given rate (1.0 delay all messages)
  13. // do we need rate here?
  14. delay(from, to uint64, d time.Duration, rate float64)
  15. disconnect(id uint64)
  16. connect(id uint64)
  17. }
  18. type raftNetwork struct {
  19. mu sync.Mutex
  20. disconnected map[uint64]bool
  21. recvQueues map[uint64]chan raftpb.Message
  22. }
  23. func newRaftNetwork(nodes ...uint64) *raftNetwork {
  24. pn := &raftNetwork{
  25. recvQueues: make(map[uint64]chan raftpb.Message),
  26. disconnected: make(map[uint64]bool),
  27. }
  28. for _, n := range nodes {
  29. pn.recvQueues[n] = make(chan raftpb.Message, 1024)
  30. }
  31. return pn
  32. }
  33. func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
  34. return &nodeNetwork{id: id, raftNetwork: rn}
  35. }
  36. func (rn *raftNetwork) send(m raftpb.Message) {
  37. rn.mu.Lock()
  38. to := rn.recvQueues[m.To]
  39. if rn.disconnected[m.To] {
  40. to = nil
  41. }
  42. rn.mu.Unlock()
  43. if to == nil {
  44. return
  45. }
  46. to <- m
  47. }
  48. func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
  49. rn.mu.Lock()
  50. fromc := rn.recvQueues[from]
  51. if rn.disconnected[from] {
  52. fromc = nil
  53. }
  54. rn.mu.Unlock()
  55. return fromc
  56. }
  57. func (rn *raftNetwork) drop(from, to uint64, rate float64) {
  58. panic("unimplemented")
  59. }
  60. func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
  61. panic("unimplemented")
  62. }
  63. func (rn *raftNetwork) disconnect(id uint64) {
  64. rn.mu.Lock()
  65. defer rn.mu.Unlock()
  66. rn.disconnected[id] = true
  67. }
  68. func (rn *raftNetwork) connect(id uint64) {
  69. rn.mu.Lock()
  70. defer rn.mu.Unlock()
  71. rn.disconnected[id] = false
  72. }
  73. type nodeNetwork struct {
  74. id uint64
  75. *raftNetwork
  76. }
  77. func (nt *nodeNetwork) send(m raftpb.Message) {
  78. nt.raftNetwork.send(m)
  79. }
  80. func (nt *nodeNetwork) recv() chan raftpb.Message {
  81. return nt.recvFrom(nt.id)
  82. }