| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package raft
- import (
- "fmt"
- "net"
- "net/http"
- "sync"
- "testing"
- "time"
- )
- // Ensure that we can start several servers and have them communicate.
- func TestHTTPTransporter(t *testing.T) {
- transporter := NewHTTPTransporter("/raft")
- transporter.DisableKeepAlives = true
- servers := []*Server{}
- f0 := func(server *Server, httpServer *http.Server) {
- // Stop the leader and wait for an election.
- server.Stop()
- time.Sleep(testElectionTimeout * 2)
- if servers[1].State() != Leader && servers[2].State() != Leader {
- t.Fatal("Expected re-election:", servers[1].State(), servers[2].State())
- }
- server.Start()
- }
- f1 := func(server *Server, httpServer *http.Server) {
- }
- f2 := func(server *Server, httpServer *http.Server) {
- }
- runTestHttpServers(t, &servers, transporter, f0, f1, f2)
- }
- // Starts multiple independent Raft servers wrapped with HTTP servers.
- func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
- var wg sync.WaitGroup
- httpServers := []*http.Server{}
- listeners := []net.Listener{}
- for i := range callbacks {
- wg.Add(1)
- port := 9000 + i
- // Create raft server.
- server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
- server.SetHeartbeatTimeout(testHeartbeatTimeout)
- server.SetElectionTimeout(testElectionTimeout)
- server.Start()
- defer server.Stop()
- *servers = append(*servers, server)
- // Create listener for HTTP server and start it.
- listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
- if err != nil {
- panic(err)
- }
- defer listener.Close()
- listeners = append(listeners, listener)
- // Create wrapping HTTP server.
- mux := http.NewServeMux()
- transporter.Install(server, mux)
- httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
- httpServers = append(httpServers, httpServer)
- go func() { httpServer.Serve(listener) }()
- }
- // Setup configuration.
- for _, server := range *servers {
- if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
- t.Fatalf("Server %s unable to join: %v", server.Name(), err)
- }
- }
- // Wait for configuration to propagate.
- time.Sleep(testHeartbeatTimeout * 2)
- // Execute all the callbacks at the same time.
- for _i, _f := range callbacks {
- i, f := _i, _f
- go func() {
- defer wg.Done()
- f((*servers)[i], httpServers[i])
- }()
- }
- // Wait until everything is done.
- wg.Wait()
- }
- func BenchmarkSpeed(b *testing.B) {
- transporter := NewHTTPTransporter("/raft")
- transporter.DisableKeepAlives = true
- servers := []*Server{}
- for i := 0; i < 3; i++ {
- port := 9000 + i
- // Create raft server.
- server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
- server.SetHeartbeatTimeout(testHeartbeatTimeout)
- server.SetElectionTimeout(testElectionTimeout)
- server.Start()
- defer server.Stop()
- servers = append(servers, server)
- // Create listener for HTTP server and start it.
- listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
- if err != nil {
- panic(err)
- }
- defer listener.Close()
- // Create wrapping HTTP server.
- mux := http.NewServeMux()
- transporter.Install(server, mux)
- httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
- go func() { httpServer.Serve(listener) }()
- }
- // Setup configuration.
- for _, server := range servers {
- (servers)[0].Do(&DefaultJoinCommand{Name: server.Name()})
- }
- c := make(chan bool)
- // Wait for configuration to propagate.
- time.Sleep(testHeartbeatTimeout * 2)
- b.ResetTimer()
- for n := 0; n < b.N; n++ {
- for i := 0; i < 1000; i++ {
- go send(c, servers[0])
- }
- for i := 0; i < 1000; i++ {
- <-c
- }
- }
- }
- func send(c chan bool, s *Server) {
- for i := 0; i < 20; i++ {
- s.Do(&NOPCommand{})
- }
- c <- true
- }
|