standby_server.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package server
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/url"
  6. "sync"
  7. "time"
  8. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. uhttp "github.com/coreos/etcd/pkg/http"
  12. "github.com/coreos/etcd/store"
  13. )
  14. type StandbyServerConfig struct {
  15. Name string
  16. PeerScheme string
  17. PeerURL string
  18. ClientURL string
  19. }
  20. type StandbyServer struct {
  21. Config StandbyServerConfig
  22. client *Client
  23. cluster []*machineMessage
  24. syncInterval float64
  25. joinIndex uint64
  26. removeNotify chan bool
  27. started bool
  28. closeChan chan bool
  29. routineGroup sync.WaitGroup
  30. sync.Mutex
  31. }
  32. func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
  33. return &StandbyServer{
  34. Config: config,
  35. client: client,
  36. syncInterval: DefaultSyncInterval,
  37. }
  38. }
  39. func (s *StandbyServer) Start() {
  40. s.Lock()
  41. defer s.Unlock()
  42. if s.started {
  43. return
  44. }
  45. s.started = true
  46. s.removeNotify = make(chan bool)
  47. s.closeChan = make(chan bool)
  48. s.routineGroup.Add(1)
  49. go func() {
  50. defer s.routineGroup.Done()
  51. s.monitorCluster()
  52. }()
  53. }
  54. // Stop stops the server gracefully.
  55. func (s *StandbyServer) Stop() {
  56. s.Lock()
  57. defer s.Unlock()
  58. if !s.started {
  59. return
  60. }
  61. s.started = false
  62. close(s.closeChan)
  63. s.routineGroup.Wait()
  64. }
  65. // RemoveNotify notifies the server is removed from standby mode and ready
  66. // for peer mode. It should have joined the cluster successfully.
  67. func (s *StandbyServer) RemoveNotify() <-chan bool {
  68. return s.removeNotify
  69. }
  70. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  71. return http.HandlerFunc(s.redirectRequests)
  72. }
  73. func (s *StandbyServer) Cluster() []string {
  74. peerURLs := make([]string, 0)
  75. for _, peer := range s.cluster {
  76. peerURLs = append(peerURLs, peer.PeerURL)
  77. }
  78. return peerURLs
  79. }
  80. func (s *StandbyServer) ClusterSize() int {
  81. return len(s.cluster)
  82. }
  83. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  84. s.cluster = cluster
  85. }
  86. func (s *StandbyServer) SyncCluster(peers []string) error {
  87. for i, url := range peers {
  88. peers[i] = s.fullPeerURL(url)
  89. }
  90. if err := s.syncCluster(peers); err != nil {
  91. log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
  92. return err
  93. }
  94. log.Infof("set cluster(%v) for standby server", s.Cluster())
  95. return nil
  96. }
  97. func (s *StandbyServer) SetSyncInterval(second float64) {
  98. s.syncInterval = second
  99. }
  100. func (s *StandbyServer) ClusterLeader() *machineMessage {
  101. for _, machine := range s.cluster {
  102. if machine.State == raft.Leader {
  103. return machine
  104. }
  105. }
  106. return nil
  107. }
  108. func (s *StandbyServer) JoinIndex() uint64 {
  109. return s.joinIndex
  110. }
  111. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  112. leader := s.ClusterLeader()
  113. if leader == nil {
  114. w.Header().Set("Content-Type", "application/json")
  115. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  116. return
  117. }
  118. uhttp.Redirect(leader.ClientURL, w, r)
  119. }
  120. func (s *StandbyServer) monitorCluster() {
  121. for {
  122. timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
  123. defer timer.Stop()
  124. select {
  125. case <-s.closeChan:
  126. return
  127. case <-timer.C:
  128. }
  129. if err := s.syncCluster(nil); err != nil {
  130. log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
  131. continue
  132. }
  133. leader := s.ClusterLeader()
  134. if leader == nil {
  135. log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
  136. continue
  137. }
  138. if err := s.join(leader.PeerURL); err != nil {
  139. log.Debugf("fail joining through leader %v: %v", leader, err)
  140. continue
  141. }
  142. log.Infof("join through leader %v", leader.PeerURL)
  143. go func() {
  144. s.Stop()
  145. close(s.removeNotify)
  146. }()
  147. return
  148. }
  149. }
  150. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  151. peerURLs = append(s.Cluster(), peerURLs...)
  152. for _, peerURL := range peerURLs {
  153. // Fetch current peer list
  154. machines, err := s.client.GetMachines(peerURL)
  155. if err != nil {
  156. log.Debugf("fail getting machine messages from %v", peerURL)
  157. continue
  158. }
  159. config, err := s.client.GetClusterConfig(peerURL)
  160. if err != nil {
  161. log.Debugf("fail getting cluster config from %v", peerURL)
  162. continue
  163. }
  164. s.setCluster(machines)
  165. s.SetSyncInterval(config.SyncInterval)
  166. return nil
  167. }
  168. return fmt.Errorf("unreachable cluster")
  169. }
  170. func (s *StandbyServer) join(peer string) error {
  171. // Our version must match the leaders version
  172. version, err := s.client.GetVersion(peer)
  173. if err != nil {
  174. log.Debugf("error getting peer version")
  175. return err
  176. }
  177. if version < store.MinVersion() || version > store.MaxVersion() {
  178. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  179. return fmt.Errorf("incompatible version")
  180. }
  181. // Fetch cluster config to see whether exists some place.
  182. clusterConfig, err := s.client.GetClusterConfig(peer)
  183. if err != nil {
  184. log.Debugf("error getting cluster config")
  185. return err
  186. }
  187. if clusterConfig.ActiveSize <= len(s.Cluster()) {
  188. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
  189. return fmt.Errorf("out of quota")
  190. }
  191. commitIndex, err := s.client.AddMachine(peer,
  192. &JoinCommand{
  193. MinVersion: store.MinVersion(),
  194. MaxVersion: store.MaxVersion(),
  195. Name: s.Config.Name,
  196. RaftURL: s.Config.PeerURL,
  197. EtcdURL: s.Config.ClientURL,
  198. })
  199. if err != nil {
  200. log.Debugf("error on join request")
  201. return err
  202. }
  203. s.joinIndex = commitIndex
  204. return nil
  205. }
  206. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  207. u, err := url.Parse(urlStr)
  208. if err != nil {
  209. log.Warnf("fail parsing url %v", u)
  210. return urlStr
  211. }
  212. u.Scheme = s.Config.PeerScheme
  213. return u.String()
  214. }