node.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package rafttest
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. )
  9. type node struct {
  10. raft.Node
  11. id uint64
  12. paused bool
  13. nt network
  14. stopc chan struct{}
  15. // stable
  16. storage *raft.MemoryStorage
  17. state raftpb.HardState
  18. }
  19. func startNode(id uint64, peers []raft.Peer, nt network) *node {
  20. st := raft.NewMemoryStorage()
  21. rn := raft.StartNode(id, peers, 10, 1, st)
  22. n := &node{
  23. Node: rn,
  24. id: id,
  25. storage: st,
  26. nt: nt,
  27. }
  28. n.start()
  29. return n
  30. }
  31. func (n *node) start() {
  32. n.stopc = make(chan struct{})
  33. ticker := time.Tick(5 * time.Millisecond)
  34. go func() {
  35. for {
  36. select {
  37. case <-ticker:
  38. n.Tick()
  39. case rd := <-n.Ready():
  40. if !raft.IsEmptyHardState(rd.HardState) {
  41. n.state = rd.HardState
  42. n.storage.SetHardState(n.state)
  43. }
  44. n.storage.Append(rd.Entries)
  45. go func() {
  46. for _, m := range rd.Messages {
  47. n.nt.send(m)
  48. }
  49. }()
  50. n.Advance()
  51. case m := <-n.nt.recv():
  52. n.Step(context.TODO(), m)
  53. case <-n.stopc:
  54. n.Stop()
  55. log.Printf("raft.%d: stop", n.id)
  56. n.Node = nil
  57. close(n.stopc)
  58. return
  59. }
  60. }
  61. }()
  62. }
  63. // stop stops the node. stop a stopped node might panic.
  64. // All in memory state of node is discarded.
  65. // All stable MUST be unchanged.
  66. func (n *node) stop() {
  67. n.nt.disconnect(n.id)
  68. n.stopc <- struct{}{}
  69. // wait for the shutdown
  70. <-n.stopc
  71. }
  72. // restart restarts the node. restart a started node
  73. // blocks and might affect the future stop operation.
  74. func (n *node) restart() {
  75. // wait for the shutdown
  76. <-n.stopc
  77. n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
  78. n.start()
  79. n.nt.connect(n.id)
  80. }
  81. // pause pauses the node.
  82. // The paused node buffers the received messages and replies
  83. // all of them when it resumes.
  84. func (n *node) pause() {
  85. panic("unimplemented")
  86. }
  87. // resume resumes the paused node.
  88. func (n *node) resume() {
  89. panic("unimplemented")
  90. }
  91. func (n *node) isPaused() bool {
  92. return n.paused
  93. }