raft_server.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "net/url"
  9. "time"
  10. "github.com/coreos/go-raft"
  11. )
  12. type raftServer struct {
  13. *raft.Server
  14. name string
  15. url string
  16. tlsConf *TLSConfig
  17. tlsInfo *TLSInfo
  18. }
  19. var r *raftServer
  20. func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
  21. // Create transporter for raft
  22. raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
  23. // Create raft server
  24. server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
  25. check(err)
  26. return &raftServer{
  27. Server: server,
  28. name: name,
  29. url: url,
  30. tlsConf: tlsConf,
  31. tlsInfo: tlsInfo,
  32. }
  33. }
  34. // Start the raft server
  35. func (r *raftServer) ListenAndServe() {
  36. // Setup commands.
  37. registerCommands()
  38. // LoadSnapshot
  39. if snapshot {
  40. err := r.LoadSnapshot()
  41. if err == nil {
  42. debugf("%s finished load snapshot", r.name)
  43. } else {
  44. debug(err)
  45. }
  46. }
  47. r.SetElectionTimeout(ElectionTimeout)
  48. r.SetHeartbeatTimeout(HeartbeatTimeout)
  49. r.Start()
  50. if r.IsLogEmpty() {
  51. // start as a leader in a new cluster
  52. if len(cluster) == 0 {
  53. time.Sleep(time.Millisecond * 20)
  54. // leader need to join self as a peer
  55. for {
  56. _, err := r.Do(newJoinCommand())
  57. if err == nil {
  58. break
  59. }
  60. }
  61. debugf("%s start as a leader", r.name)
  62. // start as a follower in a existing cluster
  63. } else {
  64. time.Sleep(time.Millisecond * 20)
  65. var err error
  66. for i := 0; i < retryTimes; i++ {
  67. success := false
  68. for _, machine := range cluster {
  69. if len(machine) == 0 {
  70. continue
  71. }
  72. err = joinCluster(r.Server, machine, r.tlsConf.Scheme)
  73. if err != nil {
  74. if err.Error() == errors[103] {
  75. fatal(err)
  76. }
  77. debugf("cannot join to cluster via machine %s %s", machine, err)
  78. } else {
  79. success = true
  80. break
  81. }
  82. }
  83. if success {
  84. break
  85. }
  86. warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
  87. time.Sleep(time.Second * RetryInterval)
  88. }
  89. if err != nil {
  90. fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
  91. }
  92. debugf("%s success join to the cluster", r.name)
  93. }
  94. } else {
  95. // rejoin the previous cluster
  96. debugf("%s restart as a follower", r.name)
  97. }
  98. // open the snapshot
  99. if snapshot {
  100. go monitorSnapshot()
  101. }
  102. // start to response to raft requests
  103. go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server)
  104. }
  105. // Start to listen and response raft command
  106. func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
  107. u, _ := url.Parse(r.url)
  108. infof("raft server [%s:%s]", r.name, u)
  109. raftMux := http.NewServeMux()
  110. server := &http.Server{
  111. Handler: raftMux,
  112. TLSConfig: &tlsConf,
  113. Addr: u.Host,
  114. }
  115. // internal commands
  116. raftMux.HandleFunc("/name", NameHttpHandler)
  117. raftMux.HandleFunc("/join", JoinHttpHandler)
  118. raftMux.HandleFunc("/vote", VoteHttpHandler)
  119. raftMux.HandleFunc("/log", GetLogHttpHandler)
  120. raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
  121. raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
  122. raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
  123. raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
  124. if scheme == "http" {
  125. fatal(server.ListenAndServe())
  126. } else {
  127. fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile))
  128. }
  129. }
  130. // Send join requests to the leader.
  131. func joinCluster(s *raft.Server, raftURL string, scheme string) error {
  132. var b bytes.Buffer
  133. json.NewEncoder(&b).Encode(newJoinCommand())
  134. // t must be ok
  135. t, ok := r.Transporter().(transporter)
  136. if !ok {
  137. panic("wrong type")
  138. }
  139. joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
  140. debugf("Send Join Request to %s", raftURL)
  141. resp, err := t.Post(joinURL.String(), &b)
  142. for {
  143. if err != nil {
  144. return fmt.Errorf("Unable to join: %v", err)
  145. }
  146. if resp != nil {
  147. defer resp.Body.Close()
  148. if resp.StatusCode == http.StatusOK {
  149. return nil
  150. }
  151. if resp.StatusCode == http.StatusTemporaryRedirect {
  152. address := resp.Header.Get("Location")
  153. debugf("Send Join Request to %s", address)
  154. json.NewEncoder(&b).Encode(newJoinCommand())
  155. resp, err = t.Post(address, &b)
  156. } else if resp.StatusCode == http.StatusBadRequest {
  157. debug("Reach max number machines in the cluster")
  158. return fmt.Errorf(errors[103])
  159. } else {
  160. return fmt.Errorf("Unable to join")
  161. }
  162. }
  163. }
  164. return fmt.Errorf("Unable to join: %v", err)
  165. }
  166. // Register commands to raft server
  167. func registerCommands() {
  168. raft.RegisterCommand(&JoinCommand{})
  169. raft.RegisterCommand(&SetCommand{})
  170. raft.RegisterCommand(&GetCommand{})
  171. raft.RegisterCommand(&DeleteCommand{})
  172. raft.RegisterCommand(&WatchCommand{})
  173. raft.RegisterCommand(&TestAndSetCommand{})
  174. }