node.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package rafttest
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. )
  9. type node struct {
  10. raft.Node
  11. id uint64
  12. paused bool
  13. iface iface
  14. stopc chan struct{}
  15. // stable
  16. storage *raft.MemoryStorage
  17. state raftpb.HardState
  18. }
  19. func startNode(id uint64, peers []raft.Peer, iface iface) *node {
  20. st := raft.NewMemoryStorage()
  21. rn := raft.StartNode(id, peers, 10, 1, st)
  22. n := &node{
  23. Node: rn,
  24. id: id,
  25. storage: st,
  26. iface: iface,
  27. }
  28. n.start()
  29. return n
  30. }
  31. func (n *node) start() {
  32. n.stopc = make(chan struct{})
  33. ticker := time.Tick(5 * time.Millisecond)
  34. go func() {
  35. for {
  36. select {
  37. case <-ticker:
  38. n.Tick()
  39. case rd := <-n.Ready():
  40. if !raft.IsEmptyHardState(rd.HardState) {
  41. n.state = rd.HardState
  42. n.storage.SetHardState(n.state)
  43. }
  44. n.storage.Append(rd.Entries)
  45. // TODO: make send async, more like real world...
  46. for _, m := range rd.Messages {
  47. n.iface.send(m)
  48. }
  49. n.Advance()
  50. case m := <-n.iface.recv():
  51. n.Step(context.TODO(), m)
  52. case <-n.stopc:
  53. n.Stop()
  54. log.Printf("raft.%d: stop", n.id)
  55. n.Node = nil
  56. close(n.stopc)
  57. return
  58. }
  59. }
  60. }()
  61. }
  62. // stop stops the node. stop a stopped node might panic.
  63. // All in memory state of node is discarded.
  64. // All stable MUST be unchanged.
  65. func (n *node) stop() {
  66. n.iface.disconnect()
  67. n.stopc <- struct{}{}
  68. // wait for the shutdown
  69. <-n.stopc
  70. }
  71. // restart restarts the node. restart a started node
  72. // blocks and might affect the future stop operation.
  73. func (n *node) restart() {
  74. // wait for the shutdown
  75. <-n.stopc
  76. n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
  77. n.start()
  78. n.iface.connect()
  79. }
  80. // pause pauses the node.
  81. // The paused node buffers the received messages and replies
  82. // all of them when it resumes.
  83. func (n *node) pause() {
  84. panic("unimplemented")
  85. }
  86. // resume resumes the paused node.
  87. func (n *node) resume() {
  88. panic("unimplemented")
  89. }
  90. func (n *node) isPaused() bool {
  91. return n.paused
  92. }