node.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package rafttest
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. )
  9. type node struct {
  10. raft.Node
  11. id uint64
  12. iface iface
  13. stopc chan struct{}
  14. pausec chan bool
  15. // stable
  16. storage *raft.MemoryStorage
  17. state raftpb.HardState
  18. }
  19. func startNode(id uint64, peers []raft.Peer, iface iface) *node {
  20. st := raft.NewMemoryStorage()
  21. rn := raft.StartNode(id, peers, 10, 1, st)
  22. n := &node{
  23. Node: rn,
  24. id: id,
  25. storage: st,
  26. iface: iface,
  27. pausec: make(chan bool),
  28. }
  29. n.start()
  30. return n
  31. }
  32. func (n *node) start() {
  33. n.stopc = make(chan struct{})
  34. ticker := time.Tick(5 * time.Millisecond)
  35. go func() {
  36. for {
  37. select {
  38. case <-ticker:
  39. n.Tick()
  40. case rd := <-n.Ready():
  41. if !raft.IsEmptyHardState(rd.HardState) {
  42. n.state = rd.HardState
  43. n.storage.SetHardState(n.state)
  44. }
  45. n.storage.Append(rd.Entries)
  46. // TODO: make send async, more like real world...
  47. for _, m := range rd.Messages {
  48. n.iface.send(m)
  49. }
  50. n.Advance()
  51. case m := <-n.iface.recv():
  52. n.Step(context.TODO(), m)
  53. case <-n.stopc:
  54. n.Stop()
  55. log.Printf("raft.%d: stop", n.id)
  56. n.Node = nil
  57. close(n.stopc)
  58. return
  59. case p := <-n.pausec:
  60. recvms := make([]raftpb.Message, 0)
  61. for p {
  62. select {
  63. case m := <-n.iface.recv():
  64. recvms = append(recvms, m)
  65. case p = <-n.pausec:
  66. }
  67. }
  68. // step all pending messages
  69. for _, m := range recvms {
  70. n.Step(context.TODO(), m)
  71. }
  72. }
  73. }
  74. }()
  75. }
  76. // stop stops the node. stop a stopped node might panic.
  77. // All in memory state of node is discarded.
  78. // All stable MUST be unchanged.
  79. func (n *node) stop() {
  80. n.iface.disconnect()
  81. n.stopc <- struct{}{}
  82. // wait for the shutdown
  83. <-n.stopc
  84. }
  85. // restart restarts the node. restart a started node
  86. // blocks and might affect the future stop operation.
  87. func (n *node) restart() {
  88. // wait for the shutdown
  89. <-n.stopc
  90. n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
  91. n.start()
  92. n.iface.connect()
  93. }
  94. // pause pauses the node.
  95. // The paused node buffers the received messages and replies
  96. // all of them when it resumes.
  97. func (n *node) pause() {
  98. n.pausec <- true
  99. }
  100. // resume resumes the paused node.
  101. func (n *node) resume() {
  102. n.pausec <- false
  103. }