test.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package raft
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "time"
  7. )
  8. const (
  9. testHeartbeatInterval = 50 * time.Millisecond
  10. testElectionTimeout = 200 * time.Millisecond
  11. )
  12. const (
  13. testListenerLoggerEnabled = false
  14. )
  15. func init() {
  16. RegisterCommand(&testCommand1{})
  17. RegisterCommand(&testCommand2{})
  18. }
  19. //------------------------------------------------------------------------------
  20. //
  21. // Helpers
  22. //
  23. //------------------------------------------------------------------------------
  24. //--------------------------------------
  25. // Logs
  26. //--------------------------------------
  27. func getLogPath() string {
  28. f, _ := ioutil.TempFile("", "raft-log-")
  29. f.Close()
  30. os.Remove(f.Name())
  31. return f.Name()
  32. }
  33. func setupLog(entries []*LogEntry) (*Log, string) {
  34. f, _ := ioutil.TempFile("", "raft-log-")
  35. for _, entry := range entries {
  36. entry.Encode(f)
  37. }
  38. err := f.Close()
  39. if err != nil {
  40. panic(err)
  41. }
  42. log := newLog()
  43. log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  44. return nil, nil
  45. }
  46. if err := log.open(f.Name()); err != nil {
  47. panic(err)
  48. }
  49. return log, f.Name()
  50. }
  51. //--------------------------------------
  52. // Servers
  53. //--------------------------------------
  54. func newTestServer(name string, transporter Transporter) Server {
  55. p, _ := ioutil.TempDir("", "raft-server-")
  56. if err := os.MkdirAll(p, 0644); err != nil {
  57. panic(err.Error())
  58. }
  59. server, _ := NewServer(name, p, transporter, nil, nil, "")
  60. if testListenerLoggerEnabled {
  61. fn := func(e Event) {
  62. server := e.Source().(Server)
  63. warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value())
  64. }
  65. server.AddEventListener(StateChangeEventType, fn)
  66. server.AddEventListener(LeaderChangeEventType, fn)
  67. server.AddEventListener(TermChangeEventType, fn)
  68. }
  69. return server
  70. }
  71. func newTestServerWithPath(name string, transporter Transporter, p string) Server {
  72. server, _ := NewServer(name, p, transporter, nil, nil, "")
  73. return server
  74. }
  75. func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
  76. server := newTestServer(name, transporter)
  77. f, err := os.Create(server.LogPath())
  78. if err != nil {
  79. panic(err)
  80. }
  81. for _, entry := range entries {
  82. entry.Encode(f)
  83. }
  84. f.Close()
  85. return server
  86. }
  87. func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
  88. servers := []Server{}
  89. e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
  90. for _, name := range names {
  91. if lookup[name] != nil {
  92. panic(fmt.Sprintf("raft: Duplicate server in test cluster! %v", name))
  93. }
  94. server := newTestServerWithLog("1", transporter, []*LogEntry{e0})
  95. server.SetElectionTimeout(testElectionTimeout)
  96. servers = append(servers, server)
  97. lookup[name] = server
  98. }
  99. for _, server := range servers {
  100. server.SetHeartbeatInterval(testHeartbeatInterval)
  101. server.Start()
  102. for _, peer := range servers {
  103. server.AddPeer(peer.Name(), "")
  104. }
  105. }
  106. return servers
  107. }
  108. //--------------------------------------
  109. // Transporter
  110. //--------------------------------------
  111. type testTransporter struct {
  112. sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
  113. sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
  114. sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
  115. }
  116. func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  117. return t.sendVoteRequestFunc(server, peer, req)
  118. }
  119. func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  120. return t.sendAppendEntriesRequestFunc(server, peer, req)
  121. }
  122. func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  123. return t.sendSnapshotRequestFunc(server, peer, req)
  124. }
  125. func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  126. return t.SendSnapshotRecoveryRequest(server, peer, req)
  127. }
  128. type testStateMachine struct {
  129. saveFunc func() ([]byte, error)
  130. recoveryFunc func([]byte) error
  131. }
  132. func (sm *testStateMachine) Save() ([]byte, error) {
  133. return sm.saveFunc()
  134. }
  135. func (sm *testStateMachine) Recovery(state []byte) error {
  136. return sm.recoveryFunc(state)
  137. }
  138. //--------------------------------------
  139. // Command1
  140. //--------------------------------------
  141. type testCommand1 struct {
  142. Val string `json:"val"`
  143. I int `json:"i"`
  144. }
  145. func (c *testCommand1) CommandName() string {
  146. return "cmd_1"
  147. }
  148. func (c *testCommand1) Apply(server Server) (interface{}, error) {
  149. return nil, nil
  150. }
  151. //--------------------------------------
  152. // Command2
  153. //--------------------------------------
  154. type testCommand2 struct {
  155. X int `json:"x"`
  156. }
  157. func (c *testCommand2) CommandName() string {
  158. return "cmd_2"
  159. }
  160. func (c *testCommand2) Apply(server Server) (interface{}, error) {
  161. return nil, nil
  162. }