raft_server.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "time"
  10. "github.com/coreos/go-raft"
  11. )
  12. var raftServer *raft.Server
  13. // Start the raft server
  14. func startRaft(tlsConfig TLSConfig) {
  15. raftName := info.Name
  16. // Setup commands.
  17. registerCommands()
  18. // Create transporter for raft
  19. raftTransporter := newTransporter(tlsConfig.Scheme, tlsConfig.Client)
  20. // Create raft server
  21. var err error
  22. raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
  23. if err != nil {
  24. fatal(err)
  25. }
  26. // LoadSnapshot
  27. if snapshot {
  28. err = raftServer.LoadSnapshot()
  29. if err == nil {
  30. debugf("%s finished load snapshot", raftServer.Name())
  31. } else {
  32. debug(err)
  33. }
  34. }
  35. raftServer.SetElectionTimeout(ElectionTimeout)
  36. raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
  37. raftServer.Start()
  38. if raftServer.IsLogEmpty() {
  39. // start as a leader in a new cluster
  40. if len(cluster) == 0 {
  41. time.Sleep(time.Millisecond * 20)
  42. // leader need to join self as a peer
  43. for {
  44. command := &JoinCommand{
  45. Name: raftServer.Name(),
  46. RaftURL: argInfo.RaftURL,
  47. EtcdURL: argInfo.EtcdURL,
  48. }
  49. _, err := raftServer.Do(command)
  50. if err == nil {
  51. break
  52. }
  53. }
  54. debugf("%s start as a leader", raftServer.Name())
  55. // start as a follower in a existing cluster
  56. } else {
  57. time.Sleep(time.Millisecond * 20)
  58. for i := 0; i < retryTimes; i++ {
  59. success := false
  60. for _, machine := range cluster {
  61. if len(machine) == 0 {
  62. continue
  63. }
  64. err = joinCluster(raftServer, machine, tlsConfig.Scheme)
  65. if err != nil {
  66. if err.Error() == errors[103] {
  67. fatal(err)
  68. }
  69. debugf("cannot join to cluster via machine %s %s", machine, err)
  70. } else {
  71. success = true
  72. break
  73. }
  74. }
  75. if success {
  76. break
  77. }
  78. warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
  79. time.Sleep(time.Second * RetryInterval)
  80. }
  81. if err != nil {
  82. fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
  83. }
  84. debugf("%s success join to the cluster", raftServer.Name())
  85. }
  86. } else {
  87. // rejoin the previous cluster
  88. debugf("%s restart as a follower", raftServer.Name())
  89. }
  90. // open the snapshot
  91. if snapshot {
  92. go monitorSnapshot()
  93. }
  94. // start to response to raft requests
  95. go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
  96. }
  97. // Start to listen and response raft command
  98. func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
  99. u, _ := url.Parse(info.RaftURL)
  100. infof("raft server [%s:%s]", info.Name, u)
  101. raftMux := http.NewServeMux()
  102. server := &http.Server{
  103. Handler: raftMux,
  104. TLSConfig: &tlsConf,
  105. Addr: u.Host,
  106. }
  107. // internal commands
  108. raftMux.HandleFunc("/name", NameHttpHandler)
  109. raftMux.HandleFunc("/join", JoinHttpHandler)
  110. raftMux.HandleFunc("/vote", VoteHttpHandler)
  111. raftMux.HandleFunc("/log", GetLogHttpHandler)
  112. raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
  113. raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
  114. raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
  115. raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
  116. if scheme == "http" {
  117. fatal(server.ListenAndServe())
  118. } else {
  119. fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
  120. }
  121. }
  122. // Send join requests to the leader.
  123. func joinCluster(s *raft.Server, raftURL string, scheme string) error {
  124. var b bytes.Buffer
  125. command := &JoinCommand{
  126. Name: s.Name(),
  127. RaftURL: info.RaftURL,
  128. EtcdURL: info.EtcdURL,
  129. }
  130. json.NewEncoder(&b).Encode(command)
  131. // t must be ok
  132. t, ok := raftServer.Transporter().(transporter)
  133. if !ok {
  134. panic("wrong type")
  135. }
  136. joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
  137. debugf("Send Join Request to %s", raftURL)
  138. resp, err := t.Post(joinURL.String(), &b)
  139. for {
  140. if err != nil {
  141. return fmt.Errorf("Unable to join: %v", err)
  142. }
  143. if resp != nil {
  144. defer resp.Body.Close()
  145. if resp.StatusCode == http.StatusOK {
  146. return nil
  147. }
  148. if resp.StatusCode == http.StatusTemporaryRedirect {
  149. address := resp.Header.Get("Location")
  150. debugf("Send Join Request to %s", address)
  151. json.NewEncoder(&b).Encode(command)
  152. resp, err = t.Post(address, &b)
  153. } else if resp.StatusCode == http.StatusBadRequest {
  154. debug("Reach max number machines in the cluster")
  155. return fmt.Errorf(errors[103])
  156. } else {
  157. return fmt.Errorf("Unable to join")
  158. }
  159. }
  160. }
  161. return fmt.Errorf("Unable to join: %v", err)
  162. }
  163. // Register commands to raft server
  164. func registerCommands() {
  165. raft.RegisterCommand(&JoinCommand{})
  166. raft.RegisterCommand(&SetCommand{})
  167. raft.RegisterCommand(&GetCommand{})
  168. raft.RegisterCommand(&DeleteCommand{})
  169. raft.RegisterCommand(&WatchCommand{})
  170. raft.RegisterCommand(&TestAndSetCommand{})
  171. }