node.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package rafttest
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. )
  9. type node struct {
  10. raft.Node
  11. id uint64
  12. paused bool
  13. iface iface
  14. stopc chan struct{}
  15. pausec chan bool
  16. // stable
  17. storage *raft.MemoryStorage
  18. state raftpb.HardState
  19. }
  20. func startNode(id uint64, peers []raft.Peer, iface iface) *node {
  21. st := raft.NewMemoryStorage()
  22. rn := raft.StartNode(id, peers, 10, 1, st)
  23. n := &node{
  24. Node: rn,
  25. id: id,
  26. storage: st,
  27. iface: iface,
  28. pausec: make(chan bool),
  29. }
  30. n.start()
  31. return n
  32. }
  33. func (n *node) start() {
  34. n.stopc = make(chan struct{})
  35. ticker := time.Tick(5 * time.Millisecond)
  36. go func() {
  37. for {
  38. select {
  39. case <-ticker:
  40. n.Tick()
  41. case rd := <-n.Ready():
  42. if !raft.IsEmptyHardState(rd.HardState) {
  43. n.state = rd.HardState
  44. n.storage.SetHardState(n.state)
  45. }
  46. n.storage.Append(rd.Entries)
  47. // TODO: make send async, more like real world...
  48. for _, m := range rd.Messages {
  49. n.iface.send(m)
  50. }
  51. n.Advance()
  52. case m := <-n.iface.recv():
  53. n.Step(context.TODO(), m)
  54. case <-n.stopc:
  55. n.Stop()
  56. log.Printf("raft.%d: stop", n.id)
  57. n.Node = nil
  58. close(n.stopc)
  59. return
  60. case p := <-n.pausec:
  61. recvms := make([]raftpb.Message, 0)
  62. for p {
  63. // TODO: locking around paused?
  64. n.paused = true
  65. select {
  66. case m := <-n.iface.recv():
  67. recvms = append(recvms, m)
  68. case p = <-n.pausec:
  69. }
  70. }
  71. n.paused = false
  72. // step all pending messages
  73. for _, m := range recvms {
  74. n.Step(context.TODO(), m)
  75. }
  76. }
  77. }
  78. }()
  79. }
  80. // stop stops the node. stop a stopped node might panic.
  81. // All in memory state of node is discarded.
  82. // All stable MUST be unchanged.
  83. func (n *node) stop() {
  84. n.iface.disconnect()
  85. n.stopc <- struct{}{}
  86. // wait for the shutdown
  87. <-n.stopc
  88. }
  89. // restart restarts the node. restart a started node
  90. // blocks and might affect the future stop operation.
  91. func (n *node) restart() {
  92. // wait for the shutdown
  93. <-n.stopc
  94. n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
  95. n.start()
  96. n.iface.connect()
  97. }
  98. // pause pauses the node.
  99. // The paused node buffers the received messages and replies
  100. // all of them when it resumes.
  101. func (n *node) pause() {
  102. n.pausec <- true
  103. }
  104. // resume resumes the paused node.
  105. func (n *node) resume() {
  106. n.pausec <- false
  107. }
  108. func (n *node) isPaused() bool {
  109. return n.paused
  110. }