peer.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package raft
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. //------------------------------------------------------------------------------
  7. //
  8. // Typedefs
  9. //
  10. //------------------------------------------------------------------------------
  11. // A peer is a reference to another server involved in the consensus protocol.
  12. type Peer struct {
  13. server *Server
  14. name string
  15. prevLogIndex uint64
  16. mutex sync.RWMutex
  17. stopChan chan bool
  18. heartbeatTimeout time.Duration
  19. }
  20. //------------------------------------------------------------------------------
  21. //
  22. // Constructor
  23. //
  24. //------------------------------------------------------------------------------
  25. // Creates a new peer.
  26. func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
  27. return &Peer{
  28. server: server,
  29. name: name,
  30. heartbeatTimeout: heartbeatTimeout,
  31. }
  32. }
  33. //------------------------------------------------------------------------------
  34. //
  35. // Accessors
  36. //
  37. //------------------------------------------------------------------------------
  38. // Retrieves the name of the peer.
  39. func (p *Peer) Name() string {
  40. return p.name
  41. }
  42. // Sets the heartbeat timeout.
  43. func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
  44. p.heartbeatTimeout = duration
  45. }
  46. //--------------------------------------
  47. // Prev log index
  48. //--------------------------------------
  49. // Retrieves the previous log index.
  50. func (p *Peer) getPrevLogIndex() uint64 {
  51. p.mutex.RLock()
  52. defer p.mutex.RUnlock()
  53. return p.prevLogIndex
  54. }
  55. // Sets the previous log index.
  56. func (p *Peer) setPrevLogIndex(value uint64) {
  57. p.mutex.Lock()
  58. defer p.mutex.Unlock()
  59. p.prevLogIndex = value
  60. }
  61. //------------------------------------------------------------------------------
  62. //
  63. // Methods
  64. //
  65. //------------------------------------------------------------------------------
  66. //--------------------------------------
  67. // Heartbeat
  68. //--------------------------------------
  69. // Starts the peer heartbeat.
  70. func (p *Peer) startHeartbeat() {
  71. p.stopChan = make(chan bool, 1)
  72. c := make(chan bool)
  73. go p.heartbeat(c)
  74. <-c
  75. }
  76. // Stops the peer heartbeat.
  77. func (p *Peer) stopHeartbeat() {
  78. // here is a problem
  79. // the previous stop is no buffer leader may get blocked
  80. // when heartbeat returns at line 132
  81. // I make the channel with 1 buffer
  82. // and try to panic here
  83. select {
  84. case p.stopChan <- true:
  85. default:
  86. panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
  87. }
  88. }
  89. //--------------------------------------
  90. // Copying
  91. //--------------------------------------
  92. // Clones the state of the peer. The clone is not attached to a server and
  93. // the heartbeat timer will not exist.
  94. func (p *Peer) clone() *Peer {
  95. p.mutex.Lock()
  96. defer p.mutex.Unlock()
  97. return &Peer{
  98. name: p.name,
  99. prevLogIndex: p.prevLogIndex,
  100. }
  101. }
  102. //--------------------------------------
  103. // Heartbeat
  104. //--------------------------------------
  105. // Listens to the heartbeat timeout and flushes an AppendEntries RPC.
  106. func (p *Peer) heartbeat(c chan bool) {
  107. stopChan := p.stopChan
  108. c <- true
  109. debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
  110. for {
  111. select {
  112. case <-stopChan:
  113. debugln("peer.heartbeat.stop: ", p.Name())
  114. return
  115. case <-time.After(p.heartbeatTimeout):
  116. debugln("peer.heartbeat.run: ", p.Name())
  117. prevLogIndex := p.getPrevLogIndex()
  118. entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
  119. if p.server.State() != Leader {
  120. return
  121. }
  122. if entries != nil {
  123. p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
  124. } else {
  125. p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
  126. }
  127. }
  128. }
  129. }
  130. //--------------------------------------
  131. // Append Entries
  132. //--------------------------------------
  133. // Sends an AppendEntries request to the peer through the transport.
  134. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
  135. traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
  136. resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
  137. if resp == nil {
  138. debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
  139. return
  140. }
  141. traceln("peer.flush.recv: ", p.Name())
  142. // If successful then update the previous log index.
  143. p.mutex.Lock()
  144. if resp.Success {
  145. if len(req.Entries) > 0 {
  146. p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
  147. // if peer append a log entry from the current term
  148. // we set append to true
  149. if req.Entries[len(req.Entries)-1].Term == p.server.currentTerm {
  150. resp.append = true
  151. }
  152. }
  153. traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
  154. // If it was unsuccessful then decrement the previous log index and
  155. // we'll try again next time.
  156. } else {
  157. if resp.CommitIndex >= p.prevLogIndex {
  158. // we may miss a response from peer
  159. // so maybe the peer has commited the logs we sent
  160. // but we did not receive the success reply and did not increase
  161. // the prevLogIndex
  162. p.prevLogIndex = resp.CommitIndex
  163. debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
  164. } else if p.prevLogIndex > 0 {
  165. // Decrement the previous log index down until we find a match. Don't
  166. // let it go below where the peer's commit index is though. That's a
  167. // problem.
  168. p.prevLogIndex--
  169. // if it not enough, we directly decrease to the index of the
  170. if p.prevLogIndex > resp.Index {
  171. p.prevLogIndex = resp.Index
  172. }
  173. debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
  174. }
  175. }
  176. p.mutex.Unlock()
  177. // Attach the peer to resp, thus server can know where it comes from
  178. resp.peer = p.Name()
  179. // Send response to server for processing.
  180. p.server.send(resp)
  181. }
  182. // Sends an Snapshot request to the peer through the transport.
  183. func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
  184. debugln("peer.snap.send: ", p.name)
  185. resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
  186. if resp == nil {
  187. debugln("peer.snap.timeout: ", p.name)
  188. return
  189. }
  190. debugln("peer.snap.recv: ", p.name)
  191. // If successful, the peer should have been to snapshot state
  192. // Send it the snapshot!
  193. if resp.Success {
  194. p.sendSnapshotRecoveryRequest()
  195. } else {
  196. debugln("peer.snap.failed: ", p.name)
  197. return
  198. }
  199. }
  200. // Sends an Snapshot Recovery request to the peer through the transport.
  201. func (p *Peer) sendSnapshotRecoveryRequest() {
  202. req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
  203. debugln("peer.snap.recovery.send: ", p.name)
  204. resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
  205. if resp.Success {
  206. p.prevLogIndex = req.LastIndex
  207. } else {
  208. debugln("peer.snap.recovery.failed: ", p.name)
  209. return
  210. }
  211. // Send response to server for processing.
  212. p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
  213. }
  214. //--------------------------------------
  215. // Vote Requests
  216. //--------------------------------------
  217. // send VoteRequest Request
  218. func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
  219. debugln("peer.vote: ", p.server.Name(), "->", p.Name())
  220. req.peer = p
  221. if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
  222. debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
  223. resp.peer = p
  224. c <- resp
  225. }
  226. }