123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722 |
- package raft
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "strconv"
- "sync"
- "testing"
- "time"
- )
- //--------------------------------------
- // Request Vote
- //--------------------------------------
- // Ensure that we can request a vote from a server that has not voted.
- func TestServerRequestVote(t *testing.T) {
- server := newTestServer("1", &testTransporter{})
- server.Start()
- if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
- t.Fatalf("Server %s unable to join: %v", server.Name(), err)
- }
- defer server.Stop()
- resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
- if resp.Term != 1 || !resp.VoteGranted {
- t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
- }
- }
- // // Ensure that a vote request is denied if it comes from an old term.
- func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
- t.Fatalf("Server %s unable to join: %v", s.Name(), err)
- }
- s.(*server).mutex.Lock()
- s.(*server).currentTerm = 2
- s.(*server).mutex.Unlock()
- defer s.Stop()
- resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
- if resp.Term != 2 || resp.VoteGranted {
- t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
- }
- if s.Term() != 2 && s.State() != Follower {
- t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State())
- }
- }
- // Ensure that a vote request is denied if we've already voted for a different candidate.
- func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
- t.Fatalf("Server %s unable to join: %v", s.Name(), err)
- }
- s.(*server).mutex.Lock()
- s.(*server).currentTerm = 2
- s.(*server).mutex.Unlock()
- defer s.Stop()
- resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
- if resp.Term != 2 || !resp.VoteGranted {
- t.Fatalf("First vote should not have been denied")
- }
- resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
- if resp.Term != 2 || resp.VoteGranted {
- t.Fatalf("Second vote should have been denied")
- }
- }
- // Ensure that a vote request is approved if vote occurs in a new term.
- func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
- t.Fatalf("Server %s unable to join: %v", s.Name(), err)
- }
- time.Sleep(time.Millisecond * 100)
- s.(*server).mutex.Lock()
- s.(*server).currentTerm = 2
- s.(*server).mutex.Unlock()
- defer s.Stop()
- resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
- if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
- t.Fatalf("First vote should not have been denied")
- }
- resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
- if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" {
- t.Fatalf("Second vote should have been approved")
- }
- }
- // Ensure that a vote request is denied if the log is out of date.
- func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
- tmpLog := newLog()
- e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
- e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
- e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
- s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
- // start as a follower with term 2 and index 3
- s.Start()
- defer s.Stop()
- // request vote from term 3 with last log entry 2, 2
- resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
- if resp.Term != 3 || resp.VoteGranted {
- t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
- }
- // request vote from term 2 with last log entry 2, 3
- resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
- if resp.Term != 3 || resp.VoteGranted {
- t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
- }
- // request vote from term 3 with last log entry 2, 3
- resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
- if resp.Term != 3 || !resp.VoteGranted {
- t.Fatalf("Matching log vote should have been granted")
- }
- // request vote from term 3 with last log entry 2, 4
- resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
- if resp.Term != 3 || !resp.VoteGranted {
- t.Fatalf("Ahead-of-log vote should have been granted")
- }
- }
- func TestProcessVoteResponse(t *testing.T) {
- // server Term: 0, status: Leader
- // response Term : 1, granted
- // Expectation: not success
- // Server Term 1 status:Leader
- server := &server{}
- server.eventDispatcher = newEventDispatcher(server)
- server.currentTerm = 0
- server.state = Leader
- response := &RequestVoteResponse{
- VoteGranted: true,
- Term: 1,
- }
- if success := server.processVoteResponse(response); success {
- t.Fatal("Process should fail if the resp's term is larger than server's")
- }
- if server.state != Follower {
- t.Fatal("Server should stepdown")
- }
- // server Term: 1, status: Follower
- // response Term: 2, granted
- // Expectation: not success
- response.Term = 2
- if success := server.processVoteResponse(response); success {
- t.Fatal("Process should fail if the resp's term is larger than server's")
- }
- if server.state != Follower {
- t.Fatal("Server should still be Follower")
- }
- server.currentTerm = 2
- // server Term: 2, status: Follower
- // response Term: 2
- // Expectation: success
- if success := server.processVoteResponse(response); !success {
- t.Fatal("Process should success if the server's term is larger than resp's")
- }
- }
- // //--------------------------------------
- // // Promotion
- // //--------------------------------------
- // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
- func TestServerPromoteSelf(t *testing.T) {
- e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
- s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
- // start as a follower
- s.Start()
- defer s.Stop()
- time.Sleep(2 * testElectionTimeout)
- if s.State() != Leader {
- t.Fatalf("Server self-promotion failed: %v", s.State())
- }
- }
- //Ensure that we can promote a server within a cluster to a leader.
- func TestServerPromote(t *testing.T) {
- lookup := map[string]Server{}
- transporter := &testTransporter{}
- transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- return lookup[peer.Name].RequestVote(req)
- }
- transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- return lookup[peer.Name].AppendEntries(req)
- }
- servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
- servers[0].Start()
- servers[1].Start()
- servers[2].Start()
- time.Sleep(2 * testElectionTimeout)
- if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
- t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
- }
- for _, s := range servers {
- s.Stop()
- }
- }
- //--------------------------------------
- // Append Entries
- //--------------------------------------
- // Ensure we can append entries to a server.
- func TestServerAppendEntries(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.SetHeartbeatInterval(time.Second * 10)
- s.Start()
- defer s.Stop()
- // Append single entry.
- e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- entries := []*LogEntry{e}
- resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
- if resp.Term() != 1 || !resp.Success() {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
- t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
- }
- // Append multiple entries + commit the last one.
- e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
- e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30})
- entries = []*LogEntry{e1, e2}
- resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
- if resp.Term() != 1 || !resp.Success() {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 {
- t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
- }
- // Send zero entries and commit everything.
- resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
- if resp.Term() != 2 || !resp.Success() {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 {
- t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
- }
- }
- //Ensure that entries with stale terms are rejected.
- func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- defer s.Stop()
- s.(*server).mutex.Lock()
- s.(*server).currentTerm = 2
- s.(*server).mutex.Unlock()
- // Append single entry.
- e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- entries := []*LogEntry{e}
- resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
- if resp.Term() != 2 || resp.Success() {
- t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
- }
- if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
- t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
- }
- }
- // Ensure that we reject entries if the commit log is different.
- func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- defer s.Stop()
- // Append single entry + commit.
- e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
- entries := []*LogEntry{e1, e2}
- resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
- if resp.Term() != 1 || !resp.Success() {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- // Append entry again (post-commit).
- e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
- entries = []*LogEntry{e}
- resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
- if resp.Term() != 1 || resp.Success() {
- t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
- }
- }
- // Ensure that we uncommitted entries are rolled back if new entries overwrite them.
- func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- defer s.Stop()
- entry1, _ := newLogEntry(s.(*server).log, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- entry2, _ := newLogEntry(s.(*server).log, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
- entry3, _ := newLogEntry(s.(*server).log, nil, 2, 2, &testCommand1{Val: "bar", I: 20})
- // Append single entry + commit.
- entries := []*LogEntry{entry1, entry2}
- resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
- if resp.Term() != 1 || !resp.Success() || s.(*server).log.commitIndex != 1 {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- for i, entry := range s.(*server).log.entries {
- if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- }
- // Append entry that overwrites the second (uncommitted) entry.
- entries = []*LogEntry{entry3}
- resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
- if resp.Term() != 2 || !resp.Success() || s.(*server).log.commitIndex != 2 {
- t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
- }
- entries = []*LogEntry{entry1, entry3}
- for i, entry := range s.(*server).log.entries {
- if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
- t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
- }
- }
- }
- //--------------------------------------
- // Command Execution
- //--------------------------------------
- // Ensure that a follower cannot execute a command.
- func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- s.Start()
- defer s.Stop()
- var err error
- if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
- t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
- }
- }
- //--------------------------------------
- // Recovery
- //--------------------------------------
- // Ensure that a follower cannot execute a command.
- func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
- // Initialize the servers.
- var mutex sync.RWMutex
- servers := map[string]Server{}
- transporter := &testTransporter{}
- transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- mutex.RLock()
- target := servers[peer.Name]
- mutex.RUnlock()
- b, _ := json.Marshal(req)
- clonedReq := &RequestVoteRequest{}
- json.Unmarshal(b, clonedReq)
- return target.RequestVote(clonedReq)
- }
- transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- mutex.RLock()
- target := servers[peer.Name]
- mutex.RUnlock()
- b, _ := json.Marshal(req)
- clonedReq := &AppendEntriesRequest{}
- json.Unmarshal(b, clonedReq)
- return target.AppendEntries(clonedReq)
- }
- disTransporter := &testTransporter{}
- disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- return nil
- }
- disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- return nil
- }
- var names []string
- var paths = make(map[string]string)
- n := 5
- // add n servers
- for i := 1; i <= n; i++ {
- names = append(names, strconv.Itoa(i))
- }
- var leader Server
- for _, name := range names {
- s := newTestServer(name, transporter)
- mutex.Lock()
- servers[name] = s
- mutex.Unlock()
- paths[name] = s.Path()
- if name == "1" {
- leader = s
- s.SetHeartbeatInterval(testHeartbeatInterval)
- s.Start()
- time.Sleep(testHeartbeatInterval)
- } else {
- s.SetElectionTimeout(testElectionTimeout)
- s.SetHeartbeatInterval(testHeartbeatInterval)
- s.Start()
- time.Sleep(testHeartbeatInterval)
- }
- if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
- t.Fatalf("Unable to join server[%s]: %v", name, err)
- }
- }
- // commit some commands
- for i := 0; i < 10; i++ {
- if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
- t.Fatalf("cannot commit command: %s", err.Error())
- }
- }
- time.Sleep(2 * testHeartbeatInterval)
- for _, name := range names {
- s := servers[name]
- if s.CommitIndex() != 16 {
- t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16)
- }
- s.Stop()
- }
- for _, name := range names {
- // with old path and disable transportation
- s := newTestServerWithPath(name, disTransporter, paths[name])
- servers[name] = s
- s.Start()
- // should only commit to the last join command
- if s.CommitIndex() != 6 {
- t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6)
- }
- // peer conf should be recovered
- if len(s.Peers()) != 4 {
- t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4)
- }
- }
- // let nodes talk to each other
- for _, name := range names {
- servers[name].SetTransporter(transporter)
- }
- time.Sleep(2 * testElectionTimeout)
- // should commit to the previous index + 1(nop command when new leader elected)
- for _, name := range names {
- s := servers[name]
- if s.CommitIndex() != 17 {
- t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17)
- }
- s.Stop()
- }
- }
- //--------------------------------------
- // Membership
- //--------------------------------------
- // Ensure that we can start a single server and append to its log.
- func TestServerSingleNode(t *testing.T) {
- s := newTestServer("1", &testTransporter{})
- if s.State() != Stopped {
- t.Fatalf("Unexpected server state: %v", s.State())
- }
- s.Start()
- time.Sleep(testHeartbeatInterval)
- // Join the server to itself.
- if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
- t.Fatalf("Unable to join: %v", err)
- }
- debugln("finish command")
- if s.State() != Leader {
- t.Fatalf("Unexpected server state: %v", s.State())
- }
- s.Stop()
- if s.State() != Stopped {
- t.Fatalf("Unexpected server state: %v", s.State())
- }
- }
- // Ensure that we can start multiple servers and determine a leader.
- func TestServerMultiNode(t *testing.T) {
- // Initialize the servers.
- var mutex sync.RWMutex
- servers := map[string]Server{}
- transporter := &testTransporter{}
- transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- mutex.RLock()
- target := servers[peer.Name]
- mutex.RUnlock()
- b, _ := json.Marshal(req)
- clonedReq := &RequestVoteRequest{}
- json.Unmarshal(b, clonedReq)
- c := make(chan *RequestVoteResponse)
- go func() {
- c <- target.RequestVote(clonedReq)
- }()
- select {
- case resp := <-c:
- return resp
- case <-time.After(time.Millisecond * 200):
- return nil
- }
- }
- transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- mutex.RLock()
- target := servers[peer.Name]
- mutex.RUnlock()
- b, _ := json.Marshal(req)
- clonedReq := &AppendEntriesRequest{}
- json.Unmarshal(b, clonedReq)
- c := make(chan *AppendEntriesResponse)
- go func() {
- c <- target.AppendEntries(clonedReq)
- }()
- select {
- case resp := <-c:
- return resp
- case <-time.After(time.Millisecond * 200):
- return nil
- }
- }
- disTransporter := &testTransporter{}
- disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- return nil
- }
- disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- return nil
- }
- var names []string
- n := 5
- // add n servers
- for i := 1; i <= n; i++ {
- names = append(names, strconv.Itoa(i))
- }
- var leader Server
- for _, name := range names {
- s := newTestServer(name, transporter)
- defer s.Stop()
- mutex.Lock()
- servers[name] = s
- mutex.Unlock()
- if name == "1" {
- leader = s
- s.SetHeartbeatInterval(testHeartbeatInterval)
- s.Start()
- time.Sleep(testHeartbeatInterval)
- } else {
- s.SetElectionTimeout(testElectionTimeout)
- s.SetHeartbeatInterval(testHeartbeatInterval)
- s.Start()
- time.Sleep(testHeartbeatInterval)
- }
- if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
- t.Fatalf("Unable to join server[%s]: %v", name, err)
- }
- }
- time.Sleep(2 * testElectionTimeout)
- // Check that two peers exist on leader.
- mutex.RLock()
- if leader.MemberCount() != n {
- t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
- }
- if servers["2"].State() == Leader || servers["3"].State() == Leader {
- t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State())
- }
- mutex.RUnlock()
- for i := 0; i < 20; i++ {
- retry := 0
- fmt.Println("Round ", i)
- num := strconv.Itoa(i%(len(servers)) + 1)
- num_1 := strconv.Itoa((i+3)%(len(servers)) + 1)
- toStop := servers[num]
- toStop_1 := servers[num_1]
- // Stop the first server and wait for a re-election.
- time.Sleep(2 * testElectionTimeout)
- debugln("Disconnect ", toStop.Name())
- debugln("disconnect ", num, " ", num_1)
- toStop.SetTransporter(disTransporter)
- toStop_1.SetTransporter(disTransporter)
- time.Sleep(2 * testElectionTimeout)
- // Check that either server 2 or 3 is the leader now.
- //mutex.Lock()
- leader := 0
- for key, value := range servers {
- debugln("Play begin")
- if key != num && key != num_1 {
- if value.State() == Leader {
- debugln("Found leader")
- for i := 0; i < 10; i++ {
- debugln("[Test] do ", value.Name())
- if _, err := value.Do(&testCommand2{X: 1}); err != nil {
- break
- }
- debugln("[Test] Done")
- }
- debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex)
- }
- debugln("Not Found leader")
- }
- }
- for {
- for key, value := range servers {
- if key != num && key != num_1 {
- if value.State() == Leader {
- leader++
- }
- debugln(value.Name(), " ", value.(*server).Term(), " ", value.State())
- }
- }
- if leader > 1 {
- if retry < 300 {
- debugln("retry")
- retry++
- leader = 0
- time.Sleep(2 * testElectionTimeout)
- continue
- }
- t.Fatalf("wrong leader number %v", leader)
- }
- if leader == 0 {
- if retry < 300 {
- retry++
- fmt.Println("retry 0")
- leader = 0
- time.Sleep(2 * testElectionTimeout)
- continue
- }
- t.Fatalf("wrong leader number %v", leader)
- }
- if leader == 1 {
- break
- }
- }
- //mutex.Unlock()
- toStop.SetTransporter(transporter)
- toStop_1.SetTransporter(transporter)
- }
- }
|