test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package raft
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "time"
  7. )
  8. const (
  9. testHeartbeatTimeout = 50 * time.Millisecond
  10. testElectionTimeout = 200 * time.Millisecond
  11. )
  12. func init() {
  13. RegisterCommand(&testCommand1{})
  14. RegisterCommand(&testCommand2{})
  15. }
  16. //------------------------------------------------------------------------------
  17. //
  18. // Helpers
  19. //
  20. //------------------------------------------------------------------------------
  21. //--------------------------------------
  22. // Logs
  23. //--------------------------------------
  24. func getLogPath() string {
  25. f, _ := ioutil.TempFile("", "raft-log-")
  26. f.Close()
  27. os.Remove(f.Name())
  28. return f.Name()
  29. }
  30. func setupLog(entries []*LogEntry) (*Log, string) {
  31. f, _ := ioutil.TempFile("", "raft-log-")
  32. for _, entry := range entries {
  33. entry.encode(f)
  34. }
  35. err := f.Close()
  36. if err != nil {
  37. panic(err)
  38. }
  39. log := newLog()
  40. log.ApplyFunc = func(c Command) (interface{}, error) {
  41. return nil, nil
  42. }
  43. if err := log.open(f.Name()); err != nil {
  44. panic(err)
  45. }
  46. return log, f.Name()
  47. }
  48. //--------------------------------------
  49. // Servers
  50. //--------------------------------------
  51. func newTestServer(name string, transporter Transporter) *Server {
  52. p, _ := ioutil.TempDir("", "raft-server-")
  53. if err := os.MkdirAll(p, 0644); err != nil {
  54. panic(err.Error())
  55. }
  56. server, _ := NewServer(name, p, transporter, nil, nil)
  57. return server
  58. }
  59. func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
  60. server := newTestServer(name, transporter)
  61. f, err := os.Create(server.LogPath())
  62. if err != nil {
  63. panic(err)
  64. }
  65. for _, entry := range entries {
  66. entry.encode(f)
  67. }
  68. f.Close()
  69. return server
  70. }
  71. func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
  72. servers := []*Server{}
  73. e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
  74. for _, name := range names {
  75. if lookup[name] != nil {
  76. panic(fmt.Sprintf("raft: Duplicate server in test cluster! %v", name))
  77. }
  78. server := newTestServerWithLog("1", transporter, []*LogEntry{e0})
  79. server.SetElectionTimeout(testElectionTimeout)
  80. servers = append(servers, server)
  81. lookup[name] = server
  82. }
  83. for _, server := range servers {
  84. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  85. server.Start()
  86. for _, peer := range servers {
  87. server.AddPeer(peer.Name())
  88. }
  89. }
  90. return servers
  91. }
  92. //--------------------------------------
  93. // Transporter
  94. //--------------------------------------
  95. type testTransporter struct {
  96. sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
  97. sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
  98. sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
  99. }
  100. func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  101. return t.sendVoteRequestFunc(server, peer, req)
  102. }
  103. func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  104. return t.sendAppendEntriesRequestFunc(server, peer, req)
  105. }
  106. func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  107. return t.sendSnapshotRequestFunc(server, peer, req)
  108. }
  109. func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  110. return t.SendSnapshotRecoveryRequest(server, peer, req)
  111. }
  112. type testStateMachine struct {
  113. saveFunc func() ([]byte, error)
  114. recoveryFunc func([]byte) error
  115. }
  116. func (sm *testStateMachine) Save() ([]byte, error) {
  117. return sm.saveFunc()
  118. }
  119. func (sm *testStateMachine) Recovery(state []byte) error {
  120. return sm.recoveryFunc(state)
  121. }
  122. //--------------------------------------
  123. // Command1
  124. //--------------------------------------
  125. type testCommand1 struct {
  126. Val string `json:"val"`
  127. I int `json:"i"`
  128. }
  129. func (c *testCommand1) CommandName() string {
  130. return "cmd_1"
  131. }
  132. func (c *testCommand1) Apply(server *Server) (interface{}, error) {
  133. return nil, nil
  134. }
  135. //--------------------------------------
  136. // Command2
  137. //--------------------------------------
  138. type testCommand2 struct {
  139. X int `json:"x"`
  140. }
  141. func (c *testCommand2) CommandName() string {
  142. return "cmd_2"
  143. }
  144. func (c *testCommand2) Apply(server *Server) (interface{}, error) {
  145. return nil, nil
  146. }