peer.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package raft
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. //------------------------------------------------------------------------------
  7. //
  8. // Typedefs
  9. //
  10. //------------------------------------------------------------------------------
  11. // A peer is a reference to another server involved in the consensus protocol.
  12. type Peer struct {
  13. server *server
  14. Name string `json:"name"`
  15. ConnectionString string `json:"connectionString"`
  16. prevLogIndex uint64
  17. stopChan chan bool
  18. heartbeatInterval time.Duration
  19. lastActivity time.Time
  20. sync.RWMutex
  21. }
  22. //------------------------------------------------------------------------------
  23. //
  24. // Constructor
  25. //
  26. //------------------------------------------------------------------------------
  27. // Creates a new peer.
  28. func newPeer(server *server, name string, connectionString string, heartbeatInterval time.Duration) *Peer {
  29. return &Peer{
  30. server: server,
  31. Name: name,
  32. ConnectionString: connectionString,
  33. heartbeatInterval: heartbeatInterval,
  34. }
  35. }
  36. //------------------------------------------------------------------------------
  37. //
  38. // Accessors
  39. //
  40. //------------------------------------------------------------------------------
  41. // Sets the heartbeat timeout.
  42. func (p *Peer) setHeartbeatInterval(duration time.Duration) {
  43. p.heartbeatInterval = duration
  44. }
  45. //--------------------------------------
  46. // Prev log index
  47. //--------------------------------------
  48. // Retrieves the previous log index.
  49. func (p *Peer) getPrevLogIndex() uint64 {
  50. p.RLock()
  51. defer p.RUnlock()
  52. return p.prevLogIndex
  53. }
  54. // Sets the previous log index.
  55. func (p *Peer) setPrevLogIndex(value uint64) {
  56. p.Lock()
  57. defer p.Unlock()
  58. p.prevLogIndex = value
  59. }
  60. func (p *Peer) setLastActivity(now time.Time) {
  61. p.Lock()
  62. defer p.Unlock()
  63. p.lastActivity = now
  64. }
  65. //------------------------------------------------------------------------------
  66. //
  67. // Methods
  68. //
  69. //------------------------------------------------------------------------------
  70. //--------------------------------------
  71. // Heartbeat
  72. //--------------------------------------
  73. // Starts the peer heartbeat.
  74. func (p *Peer) startHeartbeat() {
  75. p.stopChan = make(chan bool)
  76. c := make(chan bool)
  77. p.setLastActivity(time.Now())
  78. p.server.routineGroup.Add(1)
  79. go func() {
  80. defer p.server.routineGroup.Done()
  81. p.heartbeat(c)
  82. }()
  83. <-c
  84. }
  85. // Stops the peer heartbeat.
  86. func (p *Peer) stopHeartbeat(flush bool) {
  87. p.setLastActivity(time.Time{})
  88. p.stopChan <- flush
  89. }
  90. // LastActivity returns the last time any response was received from the peer.
  91. func (p *Peer) LastActivity() time.Time {
  92. p.RLock()
  93. defer p.RUnlock()
  94. return p.lastActivity
  95. }
  96. //--------------------------------------
  97. // Copying
  98. //--------------------------------------
  99. // Clones the state of the peer. The clone is not attached to a server and
  100. // the heartbeat timer will not exist.
  101. func (p *Peer) clone() *Peer {
  102. p.Lock()
  103. defer p.Unlock()
  104. return &Peer{
  105. Name: p.Name,
  106. ConnectionString: p.ConnectionString,
  107. prevLogIndex: p.prevLogIndex,
  108. lastActivity: p.lastActivity,
  109. }
  110. }
  111. //--------------------------------------
  112. // Heartbeat
  113. //--------------------------------------
  114. // Listens to the heartbeat timeout and flushes an AppendEntries RPC.
  115. func (p *Peer) heartbeat(c chan bool) {
  116. stopChan := p.stopChan
  117. c <- true
  118. ticker := time.Tick(p.heartbeatInterval)
  119. debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)
  120. for {
  121. select {
  122. case flush := <-stopChan:
  123. if flush {
  124. // before we can safely remove a node
  125. // we must flush the remove command to the node first
  126. p.flush()
  127. debugln("peer.heartbeat.stop.with.flush: ", p.Name)
  128. return
  129. } else {
  130. debugln("peer.heartbeat.stop: ", p.Name)
  131. return
  132. }
  133. case <-ticker:
  134. start := time.Now()
  135. p.flush()
  136. duration := time.Now().Sub(start)
  137. p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
  138. }
  139. }
  140. }
  141. func (p *Peer) flush() {
  142. debugln("peer.heartbeat.flush: ", p.Name)
  143. prevLogIndex := p.getPrevLogIndex()
  144. term := p.server.currentTerm
  145. entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
  146. if entries != nil {
  147. p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
  148. } else {
  149. p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
  150. }
  151. }
  152. //--------------------------------------
  153. // Append Entries
  154. //--------------------------------------
  155. // Sends an AppendEntries request to the peer through the transport.
  156. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
  157. tracef("peer.append.send: %s->%s [prevLog:%v length: %v]\n",
  158. p.server.Name(), p.Name, req.PrevLogIndex, len(req.Entries))
  159. resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
  160. if resp == nil {
  161. p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))
  162. debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name)
  163. return
  164. }
  165. traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name)
  166. p.setLastActivity(time.Now())
  167. // If successful then update the previous log index.
  168. p.Lock()
  169. if resp.Success() {
  170. if len(req.Entries) > 0 {
  171. p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex()
  172. // if peer append a log entry from the current term
  173. // we set append to true
  174. if req.Entries[len(req.Entries)-1].GetTerm() == p.server.currentTerm {
  175. resp.append = true
  176. }
  177. }
  178. traceln("peer.append.resp.success: ", p.Name, "; idx =", p.prevLogIndex)
  179. // If it was unsuccessful then decrement the previous log index and
  180. // we'll try again next time.
  181. } else {
  182. if resp.Term() > p.server.Term() {
  183. // this happens when there is a new leader comes up that this *leader* has not
  184. // known yet.
  185. // this server can know until the new leader send a ae with higher term
  186. // or this server finish processing this response.
  187. debugln("peer.append.resp.not.update: new.leader.found")
  188. } else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex {
  189. // we may miss a response from peer
  190. // so maybe the peer has committed the logs we just sent
  191. // but we did not receive the successful reply and did not increase
  192. // the prevLogIndex
  193. // peer failed to truncate the log and sent a fail reply at this time
  194. // we just need to update peer's prevLog index to commitIndex
  195. p.prevLogIndex = resp.CommitIndex()
  196. debugln("peer.append.resp.update: ", p.Name, "; idx =", p.prevLogIndex)
  197. } else if p.prevLogIndex > 0 {
  198. // Decrement the previous log index down until we find a match. Don't
  199. // let it go below where the peer's commit index is though. That's a
  200. // problem.
  201. p.prevLogIndex--
  202. // if it not enough, we directly decrease to the index of the
  203. if p.prevLogIndex > resp.Index() {
  204. p.prevLogIndex = resp.Index()
  205. }
  206. debugln("peer.append.resp.decrement: ", p.Name, "; idx =", p.prevLogIndex)
  207. }
  208. }
  209. p.Unlock()
  210. // Attach the peer to resp, thus server can know where it comes from
  211. resp.peer = p.Name
  212. // Send response to server for processing.
  213. p.server.sendAsync(resp)
  214. }
  215. // Sends an Snapshot request to the peer through the transport.
  216. func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
  217. debugln("peer.snap.send: ", p.Name)
  218. resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
  219. if resp == nil {
  220. debugln("peer.snap.timeout: ", p.Name)
  221. return
  222. }
  223. debugln("peer.snap.recv: ", p.Name)
  224. // If successful, the peer should have been to snapshot state
  225. // Send it the snapshot!
  226. p.setLastActivity(time.Now())
  227. if resp.Success {
  228. p.sendSnapshotRecoveryRequest()
  229. } else {
  230. debugln("peer.snap.failed: ", p.Name)
  231. return
  232. }
  233. }
  234. // Sends an Snapshot Recovery request to the peer through the transport.
  235. func (p *Peer) sendSnapshotRecoveryRequest() {
  236. req := newSnapshotRecoveryRequest(p.server.name, p.server.snapshot)
  237. debugln("peer.snap.recovery.send: ", p.Name)
  238. resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
  239. if resp == nil {
  240. debugln("peer.snap.recovery.timeout: ", p.Name)
  241. return
  242. }
  243. p.setLastActivity(time.Now())
  244. if resp.Success {
  245. p.prevLogIndex = req.LastIndex
  246. } else {
  247. debugln("peer.snap.recovery.failed: ", p.Name)
  248. return
  249. }
  250. p.server.sendAsync(resp)
  251. }
  252. //--------------------------------------
  253. // Vote Requests
  254. //--------------------------------------
  255. // send VoteRequest Request
  256. func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
  257. debugln("peer.vote: ", p.server.Name(), "->", p.Name)
  258. req.peer = p
  259. if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
  260. debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name)
  261. p.setLastActivity(time.Now())
  262. resp.peer = p
  263. c <- resp
  264. } else {
  265. debugln("peer.vote.failed: ", p.server.Name(), "<-", p.Name)
  266. }
  267. }