test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package raft
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "time"
  7. )
  8. const (
  9. testHeartbeatTimeout = 50 * time.Millisecond
  10. testElectionTimeout = 200 * time.Millisecond
  11. )
  12. func init() {
  13. RegisterCommand(&testCommand1{})
  14. RegisterCommand(&testCommand2{})
  15. }
  16. //------------------------------------------------------------------------------
  17. //
  18. // Helpers
  19. //
  20. //------------------------------------------------------------------------------
  21. //--------------------------------------
  22. // Logs
  23. //--------------------------------------
  24. func getLogPath() string {
  25. f, _ := ioutil.TempFile("", "raft-log-")
  26. f.Close()
  27. os.Remove(f.Name())
  28. return f.Name()
  29. }
  30. func setupLog(entries []*LogEntry) (*Log, string) {
  31. f, _ := ioutil.TempFile("", "raft-log-")
  32. for _, entry := range entries {
  33. entry.encode(f)
  34. }
  35. err := f.Close()
  36. if err != nil {
  37. panic(err)
  38. }
  39. log := newLog()
  40. log.ApplyFunc = func(c Command) (interface{}, error) {
  41. return nil, nil
  42. }
  43. if err := log.open(f.Name()); err != nil {
  44. panic(err)
  45. }
  46. return log, f.Name()
  47. }
  48. //--------------------------------------
  49. // Servers
  50. //--------------------------------------
  51. func newTestServer(name string, transporter Transporter) Server {
  52. p, _ := ioutil.TempDir("", "raft-server-")
  53. if err := os.MkdirAll(p, 0644); err != nil {
  54. panic(err.Error())
  55. }
  56. server, _ := NewServer(name, p, transporter, nil, nil, "")
  57. return server
  58. }
  59. func newTestServerWithPath(name string, transporter Transporter, p string) Server {
  60. server, _ := NewServer(name, p, transporter, nil, nil, "")
  61. return server
  62. }
  63. func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
  64. server := newTestServer(name, transporter)
  65. f, err := os.Create(server.LogPath())
  66. if err != nil {
  67. panic(err)
  68. }
  69. for _, entry := range entries {
  70. entry.encode(f)
  71. }
  72. f.Close()
  73. return server
  74. }
  75. func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
  76. servers := []Server{}
  77. e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
  78. for _, name := range names {
  79. if lookup[name] != nil {
  80. panic(fmt.Sprintf("raft: Duplicate server in test cluster! %v", name))
  81. }
  82. server := newTestServerWithLog("1", transporter, []*LogEntry{e0})
  83. server.SetElectionTimeout(testElectionTimeout)
  84. servers = append(servers, server)
  85. lookup[name] = server
  86. }
  87. for _, server := range servers {
  88. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  89. server.Start()
  90. for _, peer := range servers {
  91. server.AddPeer(peer.Name(), "")
  92. }
  93. }
  94. return servers
  95. }
  96. //--------------------------------------
  97. // Transporter
  98. //--------------------------------------
  99. type testTransporter struct {
  100. sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
  101. sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
  102. sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
  103. }
  104. func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  105. return t.sendVoteRequestFunc(server, peer, req)
  106. }
  107. func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  108. return t.sendAppendEntriesRequestFunc(server, peer, req)
  109. }
  110. func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  111. return t.sendSnapshotRequestFunc(server, peer, req)
  112. }
  113. func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  114. return t.SendSnapshotRecoveryRequest(server, peer, req)
  115. }
  116. type testStateMachine struct {
  117. saveFunc func() ([]byte, error)
  118. recoveryFunc func([]byte) error
  119. }
  120. func (sm *testStateMachine) Save() ([]byte, error) {
  121. return sm.saveFunc()
  122. }
  123. func (sm *testStateMachine) Recovery(state []byte) error {
  124. return sm.recoveryFunc(state)
  125. }
  126. //--------------------------------------
  127. // Command1
  128. //--------------------------------------
  129. type testCommand1 struct {
  130. Val string `json:"val"`
  131. I int `json:"i"`
  132. }
  133. func (c *testCommand1) CommandName() string {
  134. return "cmd_1"
  135. }
  136. func (c *testCommand1) Apply(server Server) (interface{}, error) {
  137. return nil, nil
  138. }
  139. //--------------------------------------
  140. // Command2
  141. //--------------------------------------
  142. type testCommand2 struct {
  143. X int `json:"x"`
  144. }
  145. func (c *testCommand2) CommandName() string {
  146. return "cmd_2"
  147. }
  148. func (c *testCommand2) Apply(server Server) (interface{}, error) {
  149. return nil, nil
  150. }