node.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package rafttest
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. )
  9. type node struct {
  10. raft.Node
  11. id uint64
  12. iface iface
  13. stopc chan struct{}
  14. pausec chan bool
  15. // stable
  16. storage *raft.MemoryStorage
  17. state raftpb.HardState
  18. }
  19. func startNode(id uint64, peers []raft.Peer, iface iface) *node {
  20. st := raft.NewMemoryStorage()
  21. c := &raft.Config{
  22. ID: id,
  23. ElectionTick: 10,
  24. HeartbeatTick: 1,
  25. Storage: st,
  26. MaxSizePerMsg: 1024 * 1024,
  27. MaxInflightMsgs: 256,
  28. }
  29. rn := raft.StartNode(c, peers)
  30. n := &node{
  31. Node: rn,
  32. id: id,
  33. storage: st,
  34. iface: iface,
  35. pausec: make(chan bool),
  36. }
  37. n.start()
  38. return n
  39. }
  40. func (n *node) start() {
  41. n.stopc = make(chan struct{})
  42. ticker := time.Tick(5 * time.Millisecond)
  43. go func() {
  44. for {
  45. select {
  46. case <-ticker:
  47. n.Tick()
  48. case rd := <-n.Ready():
  49. if !raft.IsEmptyHardState(rd.HardState) {
  50. n.state = rd.HardState
  51. n.storage.SetHardState(n.state)
  52. }
  53. n.storage.Append(rd.Entries)
  54. time.Sleep(time.Millisecond)
  55. // TODO: make send async, more like real world...
  56. for _, m := range rd.Messages {
  57. n.iface.send(m)
  58. }
  59. n.Advance()
  60. case m := <-n.iface.recv():
  61. n.Step(context.TODO(), m)
  62. case <-n.stopc:
  63. n.Stop()
  64. log.Printf("raft.%d: stop", n.id)
  65. n.Node = nil
  66. close(n.stopc)
  67. return
  68. case p := <-n.pausec:
  69. recvms := make([]raftpb.Message, 0)
  70. for p {
  71. select {
  72. case m := <-n.iface.recv():
  73. recvms = append(recvms, m)
  74. case p = <-n.pausec:
  75. }
  76. }
  77. // step all pending messages
  78. for _, m := range recvms {
  79. n.Step(context.TODO(), m)
  80. }
  81. }
  82. }
  83. }()
  84. }
  85. // stop stops the node. stop a stopped node might panic.
  86. // All in memory state of node is discarded.
  87. // All stable MUST be unchanged.
  88. func (n *node) stop() {
  89. n.iface.disconnect()
  90. n.stopc <- struct{}{}
  91. // wait for the shutdown
  92. <-n.stopc
  93. }
  94. // restart restarts the node. restart a started node
  95. // blocks and might affect the future stop operation.
  96. func (n *node) restart() {
  97. // wait for the shutdown
  98. <-n.stopc
  99. c := &raft.Config{
  100. ID: n.id,
  101. ElectionTick: 10,
  102. HeartbeatTick: 1,
  103. Storage: n.storage,
  104. MaxSizePerMsg: 1024 * 1024,
  105. MaxInflightMsgs: 256,
  106. }
  107. n.Node = raft.RestartNode(c)
  108. n.start()
  109. n.iface.connect()
  110. }
  111. // pause pauses the node.
  112. // The paused node buffers the received messages and replies
  113. // all of them when it resumes.
  114. func (n *node) pause() {
  115. n.pausec <- true
  116. }
  117. // resume resumes the paused node.
  118. func (n *node) resume() {
  119. n.pausec <- false
  120. }