server_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. package raft
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. //--------------------------------------
  12. // Request Vote
  13. //--------------------------------------
  14. // Ensure that we can request a vote from a server that has not voted.
  15. func TestServerRequestVote(t *testing.T) {
  16. server := newTestServer("1", &testTransporter{})
  17. server.Start()
  18. if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  19. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  20. }
  21. defer server.Stop()
  22. resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
  23. if resp.Term != 1 || !resp.VoteGranted {
  24. t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
  25. }
  26. }
  27. // // Ensure that a vote request is denied if it comes from an old term.
  28. func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
  29. s := newTestServer("1", &testTransporter{})
  30. s.Start()
  31. if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
  32. t.Fatalf("Server %s unable to join: %v", s.Name(), err)
  33. }
  34. s.(*server).mutex.Lock()
  35. s.(*server).currentTerm = 2
  36. s.(*server).mutex.Unlock()
  37. defer s.Stop()
  38. resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
  39. if resp.Term != 2 || resp.VoteGranted {
  40. t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
  41. }
  42. if s.Term() != 2 && s.State() != Follower {
  43. t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State())
  44. }
  45. }
  46. // Ensure that a vote request is denied if we've already voted for a different candidate.
  47. func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
  48. s := newTestServer("1", &testTransporter{})
  49. s.Start()
  50. if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
  51. t.Fatalf("Server %s unable to join: %v", s.Name(), err)
  52. }
  53. s.(*server).mutex.Lock()
  54. s.(*server).currentTerm = 2
  55. s.(*server).mutex.Unlock()
  56. defer s.Stop()
  57. resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
  58. if resp.Term != 2 || !resp.VoteGranted {
  59. t.Fatalf("First vote should not have been denied")
  60. }
  61. resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
  62. if resp.Term != 2 || resp.VoteGranted {
  63. t.Fatalf("Second vote should have been denied")
  64. }
  65. }
  66. // Ensure that a vote request is approved if vote occurs in a new term.
  67. func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
  68. s := newTestServer("1", &testTransporter{})
  69. s.Start()
  70. if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
  71. t.Fatalf("Server %s unable to join: %v", s.Name(), err)
  72. }
  73. time.Sleep(time.Millisecond * 100)
  74. s.(*server).mutex.Lock()
  75. s.(*server).currentTerm = 2
  76. s.(*server).mutex.Unlock()
  77. defer s.Stop()
  78. resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
  79. if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
  80. t.Fatalf("First vote should not have been denied")
  81. }
  82. resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
  83. if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" {
  84. t.Fatalf("Second vote should have been approved")
  85. }
  86. }
  87. // Ensure that a vote request is denied if the log is out of date.
  88. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
  89. tmpLog := newLog()
  90. e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
  91. e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
  92. e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
  93. s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
  94. // start as a follower with term 2 and index 3
  95. s.Start()
  96. defer s.Stop()
  97. // request vote from term 3 with last log entry 2, 2
  98. resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
  99. if resp.Term != 3 || resp.VoteGranted {
  100. t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
  101. }
  102. // request vote from term 2 with last log entry 2, 3
  103. resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
  104. if resp.Term != 3 || resp.VoteGranted {
  105. t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
  106. }
  107. // request vote from term 3 with last log entry 2, 3
  108. resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
  109. if resp.Term != 3 || !resp.VoteGranted {
  110. t.Fatalf("Matching log vote should have been granted")
  111. }
  112. // request vote from term 3 with last log entry 2, 4
  113. resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
  114. if resp.Term != 3 || !resp.VoteGranted {
  115. t.Fatalf("Ahead-of-log vote should have been granted")
  116. }
  117. }
  118. func TestProcessVoteResponse(t *testing.T) {
  119. // server Term: 0, status: Leader
  120. // response Term : 1, granted
  121. // Expectation: not success
  122. // Server Term 1 status:Leader
  123. server := &server{}
  124. server.eventDispatcher = newEventDispatcher(server)
  125. server.currentTerm = 0
  126. server.state = Leader
  127. response := &RequestVoteResponse{
  128. VoteGranted: true,
  129. Term: 1,
  130. }
  131. if success := server.processVoteResponse(response); success {
  132. t.Fatal("Process should fail if the resp's term is larger than server's")
  133. }
  134. if server.state != Follower {
  135. t.Fatal("Server should stepdown")
  136. }
  137. // server Term: 1, status: Follower
  138. // response Term: 2, granted
  139. // Expectation: not success
  140. response.Term = 2
  141. if success := server.processVoteResponse(response); success {
  142. t.Fatal("Process should fail if the resp's term is larger than server's")
  143. }
  144. if server.state != Follower {
  145. t.Fatal("Server should still be Follower")
  146. }
  147. server.currentTerm = 2
  148. // server Term: 2, status: Follower
  149. // response Term: 2
  150. // Expectation: success
  151. if success := server.processVoteResponse(response); !success {
  152. t.Fatal("Process should success if the server's term is larger than resp's")
  153. }
  154. }
  155. // //--------------------------------------
  156. // // Promotion
  157. // //--------------------------------------
  158. // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
  159. func TestServerPromoteSelf(t *testing.T) {
  160. e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
  161. s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
  162. // start as a follower
  163. s.Start()
  164. defer s.Stop()
  165. time.Sleep(2 * testElectionTimeout)
  166. if s.State() != Leader {
  167. t.Fatalf("Server self-promotion failed: %v", s.State())
  168. }
  169. }
  170. //Ensure that we can promote a server within a cluster to a leader.
  171. func TestServerPromote(t *testing.T) {
  172. lookup := map[string]Server{}
  173. transporter := &testTransporter{}
  174. transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  175. return lookup[peer.Name].RequestVote(req)
  176. }
  177. transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  178. return lookup[peer.Name].AppendEntries(req)
  179. }
  180. servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
  181. servers[0].Start()
  182. servers[1].Start()
  183. servers[2].Start()
  184. time.Sleep(2 * testElectionTimeout)
  185. if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
  186. t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
  187. }
  188. for _, s := range servers {
  189. s.Stop()
  190. }
  191. }
  192. //--------------------------------------
  193. // Append Entries
  194. //--------------------------------------
  195. // Ensure we can append entries to a server.
  196. func TestServerAppendEntries(t *testing.T) {
  197. s := newTestServer("1", &testTransporter{})
  198. s.SetHeartbeatInterval(time.Second * 10)
  199. s.Start()
  200. defer s.Stop()
  201. // Append single entry.
  202. e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  203. entries := []*LogEntry{e}
  204. resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
  205. if resp.Term() != 1 || !resp.Success() {
  206. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  207. }
  208. if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
  209. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  210. }
  211. // Append multiple entries + commit the last one.
  212. e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
  213. e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30})
  214. entries = []*LogEntry{e1, e2}
  215. resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
  216. if resp.Term() != 1 || !resp.Success() {
  217. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  218. }
  219. if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 {
  220. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  221. }
  222. // Send zero entries and commit everything.
  223. resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
  224. if resp.Term() != 2 || !resp.Success() {
  225. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  226. }
  227. if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 {
  228. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  229. }
  230. }
  231. //Ensure that entries with stale terms are rejected.
  232. func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
  233. s := newTestServer("1", &testTransporter{})
  234. s.Start()
  235. defer s.Stop()
  236. s.(*server).mutex.Lock()
  237. s.(*server).currentTerm = 2
  238. s.(*server).mutex.Unlock()
  239. // Append single entry.
  240. e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  241. entries := []*LogEntry{e}
  242. resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
  243. if resp.Term() != 2 || resp.Success() {
  244. t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
  245. }
  246. if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
  247. t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
  248. }
  249. }
  250. // Ensure that we reject entries if the commit log is different.
  251. func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
  252. s := newTestServer("1", &testTransporter{})
  253. s.Start()
  254. defer s.Stop()
  255. // Append single entry + commit.
  256. e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  257. e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
  258. entries := []*LogEntry{e1, e2}
  259. resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
  260. if resp.Term() != 1 || !resp.Success() {
  261. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  262. }
  263. // Append entry again (post-commit).
  264. e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
  265. entries = []*LogEntry{e}
  266. resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
  267. if resp.Term() != 1 || resp.Success() {
  268. t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
  269. }
  270. }
  271. // Ensure that we uncommitted entries are rolled back if new entries overwrite them.
  272. func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
  273. s := newTestServer("1", &testTransporter{})
  274. s.Start()
  275. defer s.Stop()
  276. entry1, _ := newLogEntry(s.(*server).log, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
  277. entry2, _ := newLogEntry(s.(*server).log, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
  278. entry3, _ := newLogEntry(s.(*server).log, nil, 2, 2, &testCommand1{Val: "bar", I: 20})
  279. // Append single entry + commit.
  280. entries := []*LogEntry{entry1, entry2}
  281. resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
  282. if resp.Term() != 1 || !resp.Success() || s.(*server).log.commitIndex != 1 {
  283. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  284. }
  285. for i, entry := range s.(*server).log.entries {
  286. if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
  287. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  288. }
  289. }
  290. // Append entry that overwrites the second (uncommitted) entry.
  291. entries = []*LogEntry{entry3}
  292. resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
  293. if resp.Term() != 2 || !resp.Success() || s.(*server).log.commitIndex != 2 {
  294. t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
  295. }
  296. entries = []*LogEntry{entry1, entry3}
  297. for i, entry := range s.(*server).log.entries {
  298. if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
  299. t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
  300. }
  301. }
  302. }
  303. //--------------------------------------
  304. // Command Execution
  305. //--------------------------------------
  306. // Ensure that a follower cannot execute a command.
  307. func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
  308. s := newTestServer("1", &testTransporter{})
  309. s.Start()
  310. defer s.Stop()
  311. var err error
  312. if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
  313. t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
  314. }
  315. }
  316. //--------------------------------------
  317. // Recovery
  318. //--------------------------------------
  319. // Ensure that a follower cannot execute a command.
  320. func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
  321. // Initialize the servers.
  322. var mutex sync.RWMutex
  323. servers := map[string]Server{}
  324. transporter := &testTransporter{}
  325. transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  326. mutex.RLock()
  327. target := servers[peer.Name]
  328. mutex.RUnlock()
  329. b, _ := json.Marshal(req)
  330. clonedReq := &RequestVoteRequest{}
  331. json.Unmarshal(b, clonedReq)
  332. return target.RequestVote(clonedReq)
  333. }
  334. transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  335. mutex.RLock()
  336. target := servers[peer.Name]
  337. mutex.RUnlock()
  338. b, _ := json.Marshal(req)
  339. clonedReq := &AppendEntriesRequest{}
  340. json.Unmarshal(b, clonedReq)
  341. return target.AppendEntries(clonedReq)
  342. }
  343. disTransporter := &testTransporter{}
  344. disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  345. return nil
  346. }
  347. disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  348. return nil
  349. }
  350. var names []string
  351. var paths = make(map[string]string)
  352. n := 5
  353. // add n servers
  354. for i := 1; i <= n; i++ {
  355. names = append(names, strconv.Itoa(i))
  356. }
  357. var leader Server
  358. for _, name := range names {
  359. s := newTestServer(name, transporter)
  360. mutex.Lock()
  361. servers[name] = s
  362. mutex.Unlock()
  363. paths[name] = s.Path()
  364. if name == "1" {
  365. leader = s
  366. s.SetHeartbeatInterval(testHeartbeatInterval)
  367. s.Start()
  368. time.Sleep(testHeartbeatInterval)
  369. } else {
  370. s.SetElectionTimeout(testElectionTimeout)
  371. s.SetHeartbeatInterval(testHeartbeatInterval)
  372. s.Start()
  373. time.Sleep(testHeartbeatInterval)
  374. }
  375. if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
  376. t.Fatalf("Unable to join server[%s]: %v", name, err)
  377. }
  378. }
  379. // commit some commands
  380. for i := 0; i < 10; i++ {
  381. if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
  382. t.Fatalf("cannot commit command: %s", err.Error())
  383. }
  384. }
  385. time.Sleep(2 * testHeartbeatInterval)
  386. for _, name := range names {
  387. s := servers[name]
  388. if s.CommitIndex() != 16 {
  389. t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16)
  390. }
  391. s.Stop()
  392. }
  393. for _, name := range names {
  394. // with old path and disable transportation
  395. s := newTestServerWithPath(name, disTransporter, paths[name])
  396. servers[name] = s
  397. s.Start()
  398. // should only commit to the last join command
  399. if s.CommitIndex() != 6 {
  400. t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6)
  401. }
  402. // peer conf should be recovered
  403. if len(s.Peers()) != 4 {
  404. t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4)
  405. }
  406. }
  407. // let nodes talk to each other
  408. for _, name := range names {
  409. servers[name].SetTransporter(transporter)
  410. }
  411. time.Sleep(2 * testElectionTimeout)
  412. // should commit to the previous index + 1(nop command when new leader elected)
  413. for _, name := range names {
  414. s := servers[name]
  415. if s.CommitIndex() != 17 {
  416. t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17)
  417. }
  418. s.Stop()
  419. }
  420. }
  421. //--------------------------------------
  422. // Membership
  423. //--------------------------------------
  424. // Ensure that we can start a single server and append to its log.
  425. func TestServerSingleNode(t *testing.T) {
  426. s := newTestServer("1", &testTransporter{})
  427. if s.State() != Stopped {
  428. t.Fatalf("Unexpected server state: %v", s.State())
  429. }
  430. s.Start()
  431. time.Sleep(testHeartbeatInterval)
  432. // Join the server to itself.
  433. if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
  434. t.Fatalf("Unable to join: %v", err)
  435. }
  436. debugln("finish command")
  437. if s.State() != Leader {
  438. t.Fatalf("Unexpected server state: %v", s.State())
  439. }
  440. s.Stop()
  441. if s.State() != Stopped {
  442. t.Fatalf("Unexpected server state: %v", s.State())
  443. }
  444. }
  445. // Ensure that we can start multiple servers and determine a leader.
  446. func TestServerMultiNode(t *testing.T) {
  447. // Initialize the servers.
  448. var mutex sync.RWMutex
  449. servers := map[string]Server{}
  450. transporter := &testTransporter{}
  451. transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  452. mutex.RLock()
  453. target := servers[peer.Name]
  454. mutex.RUnlock()
  455. b, _ := json.Marshal(req)
  456. clonedReq := &RequestVoteRequest{}
  457. json.Unmarshal(b, clonedReq)
  458. c := make(chan *RequestVoteResponse)
  459. go func() {
  460. c <- target.RequestVote(clonedReq)
  461. }()
  462. select {
  463. case resp := <-c:
  464. return resp
  465. case <-time.After(time.Millisecond * 200):
  466. return nil
  467. }
  468. }
  469. transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  470. mutex.RLock()
  471. target := servers[peer.Name]
  472. mutex.RUnlock()
  473. b, _ := json.Marshal(req)
  474. clonedReq := &AppendEntriesRequest{}
  475. json.Unmarshal(b, clonedReq)
  476. c := make(chan *AppendEntriesResponse)
  477. go func() {
  478. c <- target.AppendEntries(clonedReq)
  479. }()
  480. select {
  481. case resp := <-c:
  482. return resp
  483. case <-time.After(time.Millisecond * 200):
  484. return nil
  485. }
  486. }
  487. disTransporter := &testTransporter{}
  488. disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  489. return nil
  490. }
  491. disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  492. return nil
  493. }
  494. var names []string
  495. n := 5
  496. // add n servers
  497. for i := 1; i <= n; i++ {
  498. names = append(names, strconv.Itoa(i))
  499. }
  500. var leader Server
  501. for _, name := range names {
  502. s := newTestServer(name, transporter)
  503. defer s.Stop()
  504. mutex.Lock()
  505. servers[name] = s
  506. mutex.Unlock()
  507. if name == "1" {
  508. leader = s
  509. s.SetHeartbeatInterval(testHeartbeatInterval)
  510. s.Start()
  511. time.Sleep(testHeartbeatInterval)
  512. } else {
  513. s.SetElectionTimeout(testElectionTimeout)
  514. s.SetHeartbeatInterval(testHeartbeatInterval)
  515. s.Start()
  516. time.Sleep(testHeartbeatInterval)
  517. }
  518. if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
  519. t.Fatalf("Unable to join server[%s]: %v", name, err)
  520. }
  521. }
  522. time.Sleep(2 * testElectionTimeout)
  523. // Check that two peers exist on leader.
  524. mutex.RLock()
  525. if leader.MemberCount() != n {
  526. t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
  527. }
  528. if servers["2"].State() == Leader || servers["3"].State() == Leader {
  529. t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State())
  530. }
  531. mutex.RUnlock()
  532. for i := 0; i < 20; i++ {
  533. retry := 0
  534. fmt.Println("Round ", i)
  535. num := strconv.Itoa(i%(len(servers)) + 1)
  536. num_1 := strconv.Itoa((i+3)%(len(servers)) + 1)
  537. toStop := servers[num]
  538. toStop_1 := servers[num_1]
  539. // Stop the first server and wait for a re-election.
  540. time.Sleep(2 * testElectionTimeout)
  541. debugln("Disconnect ", toStop.Name())
  542. debugln("disconnect ", num, " ", num_1)
  543. toStop.SetTransporter(disTransporter)
  544. toStop_1.SetTransporter(disTransporter)
  545. time.Sleep(2 * testElectionTimeout)
  546. // Check that either server 2 or 3 is the leader now.
  547. //mutex.Lock()
  548. leader := 0
  549. for key, value := range servers {
  550. debugln("Play begin")
  551. if key != num && key != num_1 {
  552. if value.State() == Leader {
  553. debugln("Found leader")
  554. for i := 0; i < 10; i++ {
  555. debugln("[Test] do ", value.Name())
  556. if _, err := value.Do(&testCommand2{X: 1}); err != nil {
  557. break
  558. }
  559. debugln("[Test] Done")
  560. }
  561. debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex)
  562. }
  563. debugln("Not Found leader")
  564. }
  565. }
  566. for {
  567. for key, value := range servers {
  568. if key != num && key != num_1 {
  569. if value.State() == Leader {
  570. leader++
  571. }
  572. debugln(value.Name(), " ", value.(*server).Term(), " ", value.State())
  573. }
  574. }
  575. if leader > 1 {
  576. if retry < 300 {
  577. debugln("retry")
  578. retry++
  579. leader = 0
  580. time.Sleep(2 * testElectionTimeout)
  581. continue
  582. }
  583. t.Fatalf("wrong leader number %v", leader)
  584. }
  585. if leader == 0 {
  586. if retry < 300 {
  587. retry++
  588. fmt.Println("retry 0")
  589. leader = 0
  590. time.Sleep(2 * testElectionTimeout)
  591. continue
  592. }
  593. t.Fatalf("wrong leader number %v", leader)
  594. }
  595. if leader == 1 {
  596. break
  597. }
  598. }
  599. //mutex.Unlock()
  600. toStop.SetTransporter(transporter)
  601. toStop_1.SetTransporter(transporter)
  602. }
  603. }