peer.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package raft
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. //------------------------------------------------------------------------------
  7. //
  8. // Typedefs
  9. //
  10. //------------------------------------------------------------------------------
  11. // A peer is a reference to another server involved in the consensus protocol.
  12. type Peer struct {
  13. server *server
  14. Name string `json:"name"`
  15. ConnectionString string `json:"connectionString"`
  16. prevLogIndex uint64
  17. mutex sync.RWMutex
  18. stopChan chan bool
  19. heartbeatInterval time.Duration
  20. }
  21. //------------------------------------------------------------------------------
  22. //
  23. // Constructor
  24. //
  25. //------------------------------------------------------------------------------
  26. // Creates a new peer.
  27. func newPeer(server *server, name string, connectionString string, heartbeatInterval time.Duration) *Peer {
  28. return &Peer{
  29. server: server,
  30. Name: name,
  31. ConnectionString: connectionString,
  32. heartbeatInterval: heartbeatInterval,
  33. }
  34. }
  35. //------------------------------------------------------------------------------
  36. //
  37. // Accessors
  38. //
  39. //------------------------------------------------------------------------------
  40. // Sets the heartbeat timeout.
  41. func (p *Peer) setHeartbeatInterval(duration time.Duration) {
  42. p.heartbeatInterval = duration
  43. }
  44. //--------------------------------------
  45. // Prev log index
  46. //--------------------------------------
  47. // Retrieves the previous log index.
  48. func (p *Peer) getPrevLogIndex() uint64 {
  49. p.mutex.RLock()
  50. defer p.mutex.RUnlock()
  51. return p.prevLogIndex
  52. }
  53. // Sets the previous log index.
  54. func (p *Peer) setPrevLogIndex(value uint64) {
  55. p.mutex.Lock()
  56. defer p.mutex.Unlock()
  57. p.prevLogIndex = value
  58. }
  59. //------------------------------------------------------------------------------
  60. //
  61. // Methods
  62. //
  63. //------------------------------------------------------------------------------
  64. //--------------------------------------
  65. // Heartbeat
  66. //--------------------------------------
  67. // Starts the peer heartbeat.
  68. func (p *Peer) startHeartbeat() {
  69. p.stopChan = make(chan bool)
  70. c := make(chan bool)
  71. go p.heartbeat(c)
  72. <-c
  73. }
  74. // Stops the peer heartbeat.
  75. func (p *Peer) stopHeartbeat(flush bool) {
  76. p.stopChan <- flush
  77. }
  78. //--------------------------------------
  79. // Copying
  80. //--------------------------------------
  81. // Clones the state of the peer. The clone is not attached to a server and
  82. // the heartbeat timer will not exist.
  83. func (p *Peer) clone() *Peer {
  84. p.mutex.Lock()
  85. defer p.mutex.Unlock()
  86. return &Peer{
  87. Name: p.Name,
  88. ConnectionString: p.ConnectionString,
  89. prevLogIndex: p.prevLogIndex,
  90. }
  91. }
  92. //--------------------------------------
  93. // Heartbeat
  94. //--------------------------------------
  95. // Listens to the heartbeat timeout and flushes an AppendEntries RPC.
  96. func (p *Peer) heartbeat(c chan bool) {
  97. stopChan := p.stopChan
  98. c <- true
  99. ticker := time.Tick(p.heartbeatInterval)
  100. debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)
  101. for {
  102. select {
  103. case flush := <-stopChan:
  104. if flush {
  105. // before we can safely remove a node
  106. // we must flush the remove command to the node first
  107. p.flush()
  108. debugln("peer.heartbeat.stop.with.flush: ", p.Name)
  109. return
  110. } else {
  111. debugln("peer.heartbeat.stop: ", p.Name)
  112. return
  113. }
  114. case <-ticker:
  115. start := time.Now()
  116. p.flush()
  117. duration := time.Now().Sub(start)
  118. p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
  119. }
  120. }
  121. }
  122. func (p *Peer) flush() {
  123. debugln("peer.heartbeat.flush: ", p.Name)
  124. prevLogIndex := p.getPrevLogIndex()
  125. term := p.server.currentTerm
  126. entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
  127. if entries != nil {
  128. p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
  129. } else {
  130. p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
  131. }
  132. }
  133. //--------------------------------------
  134. // Append Entries
  135. //--------------------------------------
  136. // Sends an AppendEntries request to the peer through the transport.
  137. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
  138. tracef("peer.append.send: %s->%s [prevLog:%v length: %v]\n",
  139. p.server.Name(), p.Name, req.PrevLogIndex, len(req.Entries))
  140. resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
  141. if resp == nil {
  142. p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))
  143. debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name)
  144. return
  145. }
  146. traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name)
  147. // If successful then update the previous log index.
  148. p.mutex.Lock()
  149. if resp.Success() {
  150. if len(req.Entries) > 0 {
  151. p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex()
  152. // if peer append a log entry from the current term
  153. // we set append to true
  154. if req.Entries[len(req.Entries)-1].GetTerm() == p.server.currentTerm {
  155. resp.append = true
  156. }
  157. }
  158. traceln("peer.append.resp.success: ", p.Name, "; idx =", p.prevLogIndex)
  159. // If it was unsuccessful then decrement the previous log index and
  160. // we'll try again next time.
  161. } else {
  162. if resp.Term() > p.server.Term() {
  163. // this happens when there is a new leader comes up that this *leader* has not
  164. // known yet.
  165. // this server can know until the new leader send a ae with higher term
  166. // or this server finish processing this response.
  167. debugln("peer.append.resp.not.update: new.leader.found")
  168. } else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex {
  169. // we may miss a response from peer
  170. // so maybe the peer has committed the logs we just sent
  171. // but we did not receive the successful reply and did not increase
  172. // the prevLogIndex
  173. // peer failed to truncate the log and sent a fail reply at this time
  174. // we just need to update peer's prevLog index to commitIndex
  175. p.prevLogIndex = resp.CommitIndex()
  176. debugln("peer.append.resp.update: ", p.Name, "; idx =", p.prevLogIndex)
  177. } else if p.prevLogIndex > 0 {
  178. // Decrement the previous log index down until we find a match. Don't
  179. // let it go below where the peer's commit index is though. That's a
  180. // problem.
  181. p.prevLogIndex--
  182. // if it not enough, we directly decrease to the index of the
  183. if p.prevLogIndex > resp.Index() {
  184. p.prevLogIndex = resp.Index()
  185. }
  186. debugln("peer.append.resp.decrement: ", p.Name, "; idx =", p.prevLogIndex)
  187. }
  188. }
  189. p.mutex.Unlock()
  190. // Attach the peer to resp, thus server can know where it comes from
  191. resp.peer = p.Name
  192. // Send response to server for processing.
  193. p.server.sendAsync(resp)
  194. }
  195. // Sends an Snapshot request to the peer through the transport.
  196. func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
  197. debugln("peer.snap.send: ", p.Name)
  198. resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
  199. if resp == nil {
  200. debugln("peer.snap.timeout: ", p.Name)
  201. return
  202. }
  203. debugln("peer.snap.recv: ", p.Name)
  204. // If successful, the peer should have been to snapshot state
  205. // Send it the snapshot!
  206. if resp.Success {
  207. p.sendSnapshotRecoveryRequest()
  208. } else {
  209. debugln("peer.snap.failed: ", p.Name)
  210. return
  211. }
  212. }
  213. // Sends an Snapshot Recovery request to the peer through the transport.
  214. func (p *Peer) sendSnapshotRecoveryRequest() {
  215. req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
  216. debugln("peer.snap.recovery.send: ", p.Name)
  217. resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
  218. if resp == nil {
  219. debugln("peer.snap.recovery.timeout: ", p.Name)
  220. return
  221. }
  222. if resp.Success {
  223. p.prevLogIndex = req.LastIndex
  224. } else {
  225. debugln("peer.snap.recovery.failed: ", p.Name)
  226. return
  227. }
  228. p.server.sendAsync(resp)
  229. }
  230. //--------------------------------------
  231. // Vote Requests
  232. //--------------------------------------
  233. // send VoteRequest Request
  234. func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
  235. debugln("peer.vote: ", p.server.Name(), "->", p.Name)
  236. req.peer = p
  237. if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
  238. debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name)
  239. resp.peer = p
  240. c <- resp
  241. } else {
  242. debugln("peer.vote.failed: ", p.server.Name(), "<-", p.Name)
  243. }
  244. }