http_transporter_test.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package raft
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. // Ensure that we can start several servers and have them communicate.
  11. func TestHTTPTransporter(t *testing.T) {
  12. transporter := NewHTTPTransporter("/raft")
  13. transporter.DisableKeepAlives = true
  14. servers := []*Server{}
  15. f0 := func(server *Server, httpServer *http.Server) {
  16. // Stop the leader and wait for an election.
  17. server.Stop()
  18. time.Sleep(testElectionTimeout * 2)
  19. if servers[1].State() != Leader && servers[2].State() != Leader {
  20. t.Fatal("Expected re-election:", servers[1].State(), servers[2].State())
  21. }
  22. server.Start()
  23. }
  24. f1 := func(server *Server, httpServer *http.Server) {
  25. }
  26. f2 := func(server *Server, httpServer *http.Server) {
  27. }
  28. runTestHttpServers(t, &servers, transporter, f0, f1, f2)
  29. }
  30. // Starts multiple independent Raft servers wrapped with HTTP servers.
  31. func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
  32. var wg sync.WaitGroup
  33. httpServers := []*http.Server{}
  34. listeners := []net.Listener{}
  35. for i := range callbacks {
  36. wg.Add(1)
  37. port := 9000 + i
  38. // Create raft server.
  39. server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
  40. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  41. server.SetElectionTimeout(testElectionTimeout)
  42. server.Start()
  43. defer server.Stop()
  44. *servers = append(*servers, server)
  45. // Create listener for HTTP server and start it.
  46. listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  47. if err != nil {
  48. panic(err)
  49. }
  50. defer listener.Close()
  51. listeners = append(listeners, listener)
  52. // Create wrapping HTTP server.
  53. mux := http.NewServeMux()
  54. transporter.Install(server, mux)
  55. httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
  56. httpServers = append(httpServers, httpServer)
  57. go func() { httpServer.Serve(listener) }()
  58. }
  59. // Setup configuration.
  60. for _, server := range *servers {
  61. if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
  62. t.Fatalf("Server %s unable to join: %v", server.Name(), err)
  63. }
  64. }
  65. // Wait for configuration to propagate.
  66. time.Sleep(testHeartbeatTimeout * 2)
  67. // Execute all the callbacks at the same time.
  68. for _i, _f := range callbacks {
  69. i, f := _i, _f
  70. go func() {
  71. defer wg.Done()
  72. f((*servers)[i], httpServers[i])
  73. }()
  74. }
  75. // Wait until everything is done.
  76. wg.Wait()
  77. }
  78. func BenchmarkSpeed(b *testing.B) {
  79. transporter := NewHTTPTransporter("/raft")
  80. transporter.DisableKeepAlives = true
  81. servers := []*Server{}
  82. for i := 0; i < 3; i++ {
  83. port := 9000 + i
  84. // Create raft server.
  85. server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
  86. server.SetHeartbeatTimeout(testHeartbeatTimeout)
  87. server.SetElectionTimeout(testElectionTimeout)
  88. server.Start()
  89. defer server.Stop()
  90. servers = append(servers, server)
  91. // Create listener for HTTP server and start it.
  92. listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  93. if err != nil {
  94. panic(err)
  95. }
  96. defer listener.Close()
  97. // Create wrapping HTTP server.
  98. mux := http.NewServeMux()
  99. transporter.Install(server, mux)
  100. httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
  101. go func() { httpServer.Serve(listener) }()
  102. }
  103. // Setup configuration.
  104. for _, server := range servers {
  105. (servers)[0].Do(&DefaultJoinCommand{Name: server.Name()})
  106. }
  107. c := make(chan bool)
  108. // Wait for configuration to propagate.
  109. time.Sleep(testHeartbeatTimeout * 2)
  110. b.ResetTimer()
  111. for n := 0; n < b.N; n++ {
  112. for i := 0; i < 1000; i++ {
  113. go send(c, servers[0])
  114. }
  115. for i := 0; i < 1000; i++ {
  116. <-c
  117. }
  118. }
  119. }
  120. func send(c chan bool, s *Server) {
  121. for i := 0; i < 20; i++ {
  122. s.Do(&NOPCommand{})
  123. }
  124. c <- true
  125. }