standby_server.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package server
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/url"
  6. "sync"
  7. "time"
  8. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. uhttp "github.com/coreos/etcd/pkg/http"
  12. "github.com/coreos/etcd/store"
  13. )
  14. const UninitedSyncInterval = time.Duration(5) * time.Second
  15. type StandbyServerConfig struct {
  16. Name string
  17. PeerScheme string
  18. PeerURL string
  19. ClientURL string
  20. }
  21. type StandbyServer struct {
  22. Config StandbyServerConfig
  23. client *Client
  24. cluster []*machineMessage
  25. syncInterval time.Duration
  26. joinIndex uint64
  27. removeNotify chan bool
  28. started bool
  29. closeChan chan bool
  30. routineGroup sync.WaitGroup
  31. sync.Mutex
  32. }
  33. func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
  34. return &StandbyServer{
  35. Config: config,
  36. client: client,
  37. syncInterval: UninitedSyncInterval,
  38. }
  39. }
  40. func (s *StandbyServer) Start() {
  41. s.Lock()
  42. defer s.Unlock()
  43. if s.started {
  44. return
  45. }
  46. s.started = true
  47. s.removeNotify = make(chan bool)
  48. s.closeChan = make(chan bool)
  49. s.routineGroup.Add(1)
  50. go func() {
  51. defer s.routineGroup.Done()
  52. s.monitorCluster()
  53. }()
  54. }
  55. // Stop stops the server gracefully.
  56. func (s *StandbyServer) Stop() {
  57. s.Lock()
  58. defer s.Unlock()
  59. if !s.started {
  60. return
  61. }
  62. s.started = false
  63. close(s.closeChan)
  64. s.routineGroup.Wait()
  65. }
  66. // RemoveNotify notifies the server is removed from standby mode and ready
  67. // for peer mode. It should have joined the cluster successfully.
  68. func (s *StandbyServer) RemoveNotify() <-chan bool {
  69. return s.removeNotify
  70. }
  71. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  72. return http.HandlerFunc(s.redirectRequests)
  73. }
  74. func (s *StandbyServer) Cluster() []string {
  75. peerURLs := make([]string, 0)
  76. for _, peer := range s.cluster {
  77. peerURLs = append(peerURLs, peer.PeerURL)
  78. }
  79. return peerURLs
  80. }
  81. func (s *StandbyServer) ClusterSize() int {
  82. return len(s.cluster)
  83. }
  84. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  85. s.cluster = cluster
  86. }
  87. func (s *StandbyServer) SyncCluster(peers []string) error {
  88. for i, url := range peers {
  89. peers[i] = s.fullPeerURL(url)
  90. }
  91. if err := s.syncCluster(peers); err != nil {
  92. log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
  93. return err
  94. }
  95. log.Infof("set cluster(%v) for standby server", s.Cluster())
  96. return nil
  97. }
  98. func (s *StandbyServer) SetSyncInterval(second float64) {
  99. s.syncInterval = time.Duration(int64(second * float64(time.Second)))
  100. }
  101. func (s *StandbyServer) ClusterLeader() *machineMessage {
  102. for _, machine := range s.cluster {
  103. if machine.State == raft.Leader {
  104. return machine
  105. }
  106. }
  107. return nil
  108. }
  109. func (s *StandbyServer) JoinIndex() uint64 {
  110. return s.joinIndex
  111. }
  112. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  113. leader := s.ClusterLeader()
  114. if leader == nil {
  115. w.Header().Set("Content-Type", "application/json")
  116. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  117. return
  118. }
  119. uhttp.Redirect(leader.ClientURL, w, r)
  120. }
  121. func (s *StandbyServer) monitorCluster() {
  122. for {
  123. timer := time.NewTimer(s.syncInterval)
  124. defer timer.Stop()
  125. select {
  126. case <-s.closeChan:
  127. return
  128. case <-timer.C:
  129. }
  130. if err := s.syncCluster(nil); err != nil {
  131. log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
  132. continue
  133. }
  134. leader := s.ClusterLeader()
  135. if leader == nil {
  136. log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
  137. continue
  138. }
  139. if err := s.join(leader.PeerURL); err != nil {
  140. log.Debugf("fail joining through leader %v: %v", leader, err)
  141. continue
  142. }
  143. log.Infof("join through leader %v", leader.PeerURL)
  144. go func() {
  145. s.Stop()
  146. close(s.removeNotify)
  147. }()
  148. return
  149. }
  150. }
  151. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  152. peerURLs = append(s.Cluster(), peerURLs...)
  153. for _, peerURL := range peerURLs {
  154. // Fetch current peer list
  155. machines, err := s.client.GetMachines(peerURL)
  156. if err != nil {
  157. log.Debugf("fail getting machine messages from %v", peerURL)
  158. continue
  159. }
  160. config, err := s.client.GetClusterConfig(peerURL)
  161. if err != nil {
  162. log.Debugf("fail getting cluster config from %v", peerURL)
  163. continue
  164. }
  165. s.setCluster(machines)
  166. s.SetSyncInterval(config.SyncInterval)
  167. return nil
  168. }
  169. return fmt.Errorf("unreachable cluster")
  170. }
  171. func (s *StandbyServer) join(peer string) error {
  172. // Our version must match the leaders version
  173. version, err := s.client.GetVersion(peer)
  174. if err != nil {
  175. log.Debugf("fail checking join version")
  176. return err
  177. }
  178. if version < store.MinVersion() || version > store.MaxVersion() {
  179. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  180. return fmt.Errorf("incompatible version")
  181. }
  182. // Fetch cluster config to see whether exists some place.
  183. clusterConfig, err := s.client.GetClusterConfig(peer)
  184. if err != nil {
  185. log.Debugf("fail getting cluster config")
  186. return err
  187. }
  188. if clusterConfig.ActiveSize <= len(s.Cluster()) {
  189. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
  190. return fmt.Errorf("out of quota")
  191. }
  192. commitIndex, err := s.client.AddMachine(peer,
  193. &JoinCommand{
  194. MinVersion: store.MinVersion(),
  195. MaxVersion: store.MaxVersion(),
  196. Name: s.Config.Name,
  197. RaftURL: s.Config.PeerURL,
  198. EtcdURL: s.Config.ClientURL,
  199. })
  200. if err != nil {
  201. log.Debugf("fail on join request")
  202. return err
  203. }
  204. s.joinIndex = commitIndex
  205. return nil
  206. }
  207. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  208. u, err := url.Parse(urlStr)
  209. if err != nil {
  210. log.Warnf("fail parsing url %v", u)
  211. return urlStr
  212. }
  213. u.Scheme = s.Config.PeerScheme
  214. return u.String()
  215. }