server_test.go 16 KB


  1. package raft
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strconv"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. //------------------------------------------------------------------------------
  11. //
  12. // Tests
  13. //
  14. //------------------------------------------------------------------------------
  15. //--------------------------------------
  16. // Request Vote
  17. //--------------------------------------
  18. // Ensure that we can request a vote from a server that has not voted.
  19. func TestServerRequestVote(t *testing.T) {
  20. server := newTestServer("1", &testTransporter{})
  21. server.Start()
  22. if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  23. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  24. }
  25. defer server.Stop()
  26. resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
  27. if resp.Term != 1 || !resp.VoteGranted {
  28. t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
  29. }
  30. }
  31. // // Ensure that a vote request is denied if it comes from an old term.
  32. func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
  33. server := newTestServer("1", &testTransporter{})
  34. server.Start()
  35. if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  36. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  37. }
  38. server.currentTerm = 2
  39. defer server.Stop()
  40. resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
  41. if resp.Term != 2 || resp.VoteGranted {
  42. t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
  43. }
  44. if server.currentTerm != 2 && server.State() != Follower {
  45. t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.State())
  46. }
  47. }
  48. // Ensure that a vote request is denied if we've already voted for a different candidate.
  49. func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
  50. server := newTestServer("1", &testTransporter{})
  51. server.Start()
  52. if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  53. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  54. }
  55. server.currentTerm = 2
  56. defer server.Stop()
  57. resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
  58. if resp.Term != 2 || !resp.VoteGranted {
  59. t.Fatalf("First vote should not have been denied")
  60. }
  61. resp = server.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
  62. if resp.Term != 2 || resp.VoteGranted {
  63. t.Fatalf("Second vote should have been denied")
  64. }
  65. }
  66. // Ensure that a vote request is approved if vote occurs in a new term.
  67. func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
  68. server := newTestServer("1", &testTransporter{})
  69. server.Start()
  70. if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  71. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  72. }
  73. time.Sleep(time.Millisecond * 100)
  74. server.currentTerm = 2
  75. defer server.Stop()
  76. resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
  77. if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" {
  78. t.Fatalf("First vote should not have been denied")
  79. }
  80. resp = server.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
  81. if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" {
  82. t.Fatalf("Second vote should have been approved")
  83. }
  84. }
  85. // Ensure that a vote request is denied if the log is out of date.
  86. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
  87. tmpLog := newLog()
  88. e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
  89. e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
  90. e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
  91. server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
  92. // start as a follower with term 2 and index 3
  93. server.Start()
  94. defer server.Stop()
  95. // request vote from term 3 with last log entry 2, 2
  96. resp := server.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
  97. if resp.Term != 3 || resp.VoteGranted {
  98. t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
  99. }
  100. // request vote from term 2 with last log entry 2, 3
  101. resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
  102. if resp.Term != 3 || resp.VoteGranted {
  103. t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
  104. }
  105. // request vote from term 3 with last log entry 2, 3
  106. resp = server.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
  107. if resp.Term != 3 || !resp.VoteGranted {
  108. t.Fatalf("Matching log vote should have been granted")
  109. }
  110. // request vote from term 3 with last log entry 2, 4
  111. resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
  112. if resp.Term != 3 || !resp.VoteGranted {
  113. t.Fatalf("Ahead-of-log vote should have been granted")
  114. }
  115. }
  116. // //--------------------------------------
  117. // // Promotion
  118. // //--------------------------------------
  119. // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
  120. func TestServerPromoteSelf(t *testing.T) {
  121. e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
  122. server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
  123. // start as a follower
  124. server.Start()
  125. defer server.Stop()
  126. time.Sleep(2 * testElectionTimeout)
  127. if server.State() != Leader {
  128. t.Fatalf("Server self-promotion failed: %v", server.State())
  129. }
  130. }
  131. //Ensure that we can promote a server within a cluster to a leader.
  132. func TestServerPromote(t *testing.T) {
  133. lookup := map[string]*Server{}
  134. transporter := &testTransporter{}
  135. transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  136. return lookup[peer.Name()].RequestVote(req)
  137. }
  138. transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  139. return lookup[peer.Name()].AppendEntries(req)
  140. }
  141. servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
  142. servers[0].Start()
  143. servers[1].Start()
  144. servers[2].Start()
  145. time.Sleep(2 * testElectionTimeout)
  146. if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
  147. t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
  148. }
  149. for _, server := range servers {
  150. server.Stop()
  151. }
  152. }
  153. //--------------------------------------
  154. // Append Entries
  155. //--------------------------------------
  156. // Ensure we can append entries to a server.
  157. func TestServerAppendEntries(t *testing.T) {
  158. server := newTestServer("1", &testTransporter{})
  159. server.SetHeartbeatTimeout(time.Second * 10)
  160. server.Start()
  161. defer server.Stop()
  162. // Append single entry.
  163. e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  164. entries := []*LogEntry{e}
  165. resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
  166. if resp.Term != 1 || !resp.Success {
  167. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  168. }
  169. if index, term := server.log.commitInfo(); index != 0 || term != 0 {
  170. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  171. }
  172. // Append multiple entries + commit the last one.
  173. e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
  174. e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
  175. entries = []*LogEntry{e1, e2}
  176. resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
  177. if resp.Term != 1 || !resp.Success {
  178. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  179. }
  180. if index, term := server.log.commitInfo(); index != 1 || term != 1 {
  181. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  182. }
  183. // Send zero entries and commit everything.
  184. resp = server.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
  185. if resp.Term != 2 || !resp.Success {
  186. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  187. }
  188. if index, term := server.log.commitInfo(); index != 3 || term != 1 {
  189. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  190. }
  191. }
  192. //Ensure that entries with stale terms are rejected.
  193. func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
  194. server := newTestServer("1", &testTransporter{})
  195. server.Start()
  196. defer server.Stop()
  197. server.currentTerm = 2
  198. // Append single entry.
  199. e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  200. entries := []*LogEntry{e}
  201. resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
  202. if resp.Term != 2 || resp.Success {
  203. t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
  204. }
  205. if index, term := server.log.commitInfo(); index != 0 || term != 0 {
  206. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  207. }
  208. }
  209. // Ensure that we reject entries if the commit log is different.
  210. func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
  211. server := newTestServer("1", &testTransporter{})
  212. server.Start()
  213. defer server.Stop()
  214. // Append single entry + commit.
  215. e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  216. e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
  217. entries := []*LogEntry{e1, e2}
  218. resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
  219. if resp.Term != 1 || !resp.Success {
  220. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  221. }
  222. // Append entry again (post-commit).
  223. e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
  224. entries = []*LogEntry{e}
  225. resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
  226. if resp.Term != 1 || resp.Success {
  227. t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
  228. }
  229. }
  230. // Ensure that we uncommitted entries are rolled back if new entries overwrite them.
  231. func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
  232. server := newTestServer("1", &testTransporter{})
  233. server.Start()
  234. defer server.Stop()
  235. entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  236. entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
  237. entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20})
  238. // Append single entry + commit.
  239. entries := []*LogEntry{entry1, entry2}
  240. resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
  241. if resp.Term != 1 || !resp.Success || server.log.commitIndex != 1 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2}) {
  242. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  243. }
  244. // Append entry that overwrites the second (uncommitted) entry.
  245. entries = []*LogEntry{entry3}
  246. resp = server.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
  247. if resp.Term != 2 || !resp.Success || server.log.commitIndex != 2 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3}) {
  248. t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
  249. }
  250. }
  251. //--------------------------------------
  252. // Command Execution
  253. //--------------------------------------
  254. // Ensure that a follower cannot execute a command.
  255. func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
  256. server := newTestServer("1", &testTransporter{})
  257. server.Start()
  258. defer server.Stop()
  259. var err error
  260. if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
  261. t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
  262. }
  263. }
  264. //--------------------------------------
  265. // Membership
  266. //--------------------------------------
  267. // Ensure that we can start a single server and append to its log.
  268. func TestServerSingleNode(t *testing.T) {
  269. server := newTestServer("1", &testTransporter{})
  270. if server.State() != Stopped {
  271. t.Fatalf("Unexpected server state: %v", server.State())
  272. }
  273. server.Start()
  274. time.Sleep(testHeartbeatTimeout)
  275. // Join the server to itself.
  276. if _, err := server.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
  277. t.Fatalf("Unable to join: %v", err)
  278. }
  279. debugln("finish command")
  280. if server.State() != Leader {
  281. t.Fatalf("Unexpected server state: %v", server.State())
  282. }
  283. server.Stop()
  284. if server.State() != Stopped {
  285. t.Fatalf("Unexpected server state: %v", server.State())
  286. }
  287. }
  288. // Ensure that we can start multiple servers and determine a leader.
  289. func TestServerMultiNode(t *testing.T) {
  290. // Initialize the servers.
  291. var mutex sync.RWMutex
  292. servers := map[string]*Server{}
  293. transporter := &testTransporter{}
  294. transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  295. mutex.RLock()
  296. s := servers[peer.name]
  297. mutex.RUnlock()
  298. return s.RequestVote(req)
  299. }
  300. transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  301. mutex.RLock()
  302. s := servers[peer.name]
  303. mutex.RUnlock()
  304. return s.AppendEntries(req)
  305. }
  306. disTransporter := &testTransporter{}
  307. disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  308. return nil
  309. }
  310. disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  311. return nil
  312. }
  313. var names []string
  314. n := 5
  315. // add n servers
  316. for i := 1; i <= n; i++ {
  317. names = append(names, strconv.Itoa(i))
  318. }
  319. var leader *Server
  320. for _, name := range names {
  321. server := newTestServer(name, transporter)
  322. defer server.Stop()
  323. mutex.Lock()
  324. servers[name] = server
  325. mutex.Unlock()
  326. if name == "1" {
  327. leader = server
  328. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  329. server.Start()
  330. time.Sleep(testHeartbeatTimeout)
  331. } else {
  332. server.SetElectionTimeout(testElectionTimeout)
  333. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  334. server.Start()
  335. time.Sleep(testHeartbeatTimeout)
  336. }
  337. if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
  338. t.Fatalf("Unable to join server[%s]: %v", name, err)
  339. }
  340. }
  341. time.Sleep(2 * testElectionTimeout)
  342. // Check that two peers exist on leader.
  343. mutex.RLock()
  344. if leader.MemberCount() != n {
  345. t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
  346. }
  347. if servers["2"].State() == Leader || servers["3"].State() == Leader {
  348. t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state)
  349. }
  350. mutex.RUnlock()
  351. for i := 0; i < 20; i++ {
  352. retry := 0
  353. fmt.Println("Round ", i)
  354. num := strconv.Itoa(i%(len(servers)) + 1)
  355. num_1 := strconv.Itoa((i+3)%(len(servers)) + 1)
  356. toStop := servers[num]
  357. toStop_1 := servers[num_1]
  358. // Stop the first server and wait for a re-election.
  359. time.Sleep(2 * testElectionTimeout)
  360. debugln("Disconnect ", toStop.Name())
  361. debugln("disconnect ", num, " ", num_1)
  362. toStop.SetTransporter(disTransporter)
  363. toStop_1.SetTransporter(disTransporter)
  364. time.Sleep(2 * testElectionTimeout)
  365. // Check that either server 2 or 3 is the leader now.
  366. //mutex.Lock()
  367. leader := 0
  368. for key, value := range servers {
  369. debugln("Play begin")
  370. if key != num && key != num_1 {
  371. if value.State() == Leader {
  372. debugln("Found leader")
  373. for i := 0; i < 10; i++ {
  374. debugln("[Test] do ", value.Name())
  375. if _, err := value.Do(&testCommand2{X: 1}); err != nil {
  376. break
  377. }
  378. debugln("[Test] Done")
  379. }
  380. debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex)
  381. }
  382. debugln("Not Found leader")
  383. }
  384. }
  385. for {
  386. for key, value := range servers {
  387. if key != num && key != num_1 {
  388. if value.State() == Leader {
  389. leader++
  390. }
  391. debugln(value.Name(), " ", value.currentTerm, " ", value.state)
  392. }
  393. }
  394. if leader > 1 {
  395. if retry < 300 {
  396. debugln("retry")
  397. retry++
  398. leader = 0
  399. time.Sleep(2 * testElectionTimeout)
  400. continue
  401. }
  402. t.Fatalf("wrong leader number %v", leader)
  403. }
  404. if leader == 0 {
  405. if retry < 300 {
  406. retry++
  407. fmt.Println("retry 0")
  408. leader = 0
  409. time.Sleep(2 * testElectionTimeout)
  410. continue
  411. }
  412. t.Fatalf("wrong leader number %v", leader)
  413. }
  414. if leader == 1 {
  415. break
  416. }
  417. }
  418. //mutex.Unlock()
  419. toStop.SetTransporter(transporter)
  420. toStop_1.SetTransporter(transporter)
  421. }
  422. }