standby_server.go 7.1 KB


  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/log"
  15. uhttp "github.com/coreos/etcd/pkg/http"
  16. "github.com/coreos/etcd/store"
  17. )
  18. const standbyInfoName = "standby_info"
  19. type StandbyServerConfig struct {
  20. Name string
  21. PeerScheme string
  22. PeerURL string
  23. ClientURL string
  24. DataDir string
  25. }
  26. type standbyInfo struct {
  27. Running bool
  28. Cluster []*machineMessage
  29. SyncInterval float64
  30. }
  31. type StandbyServer struct {
  32. Config StandbyServerConfig
  33. client *Client
  34. standbyInfo
  35. joinIndex uint64
  36. removeNotify chan bool
  37. started bool
  38. closeChan chan bool
  39. routineGroup sync.WaitGroup
  40. sync.Mutex
  41. }
  42. func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
  43. s := &StandbyServer{
  44. Config: config,
  45. client: client,
  46. standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
  47. }
  48. if err := s.loadInfo(); err != nil {
  49. log.Warnf("error load standby info file: %v", err)
  50. }
  51. return s
  52. }
  53. func (s *StandbyServer) Start() {
  54. s.Lock()
  55. defer s.Unlock()
  56. if s.started {
  57. return
  58. }
  59. s.started = true
  60. s.removeNotify = make(chan bool)
  61. s.closeChan = make(chan bool)
  62. s.routineGroup.Add(1)
  63. go func() {
  64. defer s.routineGroup.Done()
  65. s.monitorCluster()
  66. }()
  67. s.Running = true
  68. }
  69. // Stop stops the server gracefully.
  70. func (s *StandbyServer) Stop() {
  71. s.Lock()
  72. defer s.Unlock()
  73. if !s.started {
  74. return
  75. }
  76. s.started = false
  77. close(s.closeChan)
  78. s.routineGroup.Wait()
  79. if err := s.saveInfo(); err != nil {
  80. log.Warnf("error saving cluster info for standby")
  81. }
  82. s.Running = false
  83. }
  84. // RemoveNotify notifies the server is removed from standby mode and ready
  85. // for peer mode. It should have joined the cluster successfully.
  86. func (s *StandbyServer) RemoveNotify() <-chan bool {
  87. return s.removeNotify
  88. }
  89. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  90. return http.HandlerFunc(s.redirectRequests)
  91. }
  92. func (s *StandbyServer) IsRunning() bool {
  93. return s.Running
  94. }
  95. func (s *StandbyServer) ClusterURLs() []string {
  96. peerURLs := make([]string, 0)
  97. for _, peer := range s.Cluster {
  98. peerURLs = append(peerURLs, peer.PeerURL)
  99. }
  100. return peerURLs
  101. }
  102. func (s *StandbyServer) ClusterSize() int {
  103. return len(s.Cluster)
  104. }
  105. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  106. s.Cluster = cluster
  107. }
  108. func (s *StandbyServer) SyncCluster(peers []string) error {
  109. for i, url := range peers {
  110. peers[i] = s.fullPeerURL(url)
  111. }
  112. if err := s.syncCluster(peers); err != nil {
  113. log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  114. return err
  115. }
  116. log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
  117. return nil
  118. }
  119. func (s *StandbyServer) SetSyncInterval(second float64) {
  120. s.SyncInterval = second
  121. }
  122. func (s *StandbyServer) ClusterLeader() *machineMessage {
  123. for _, machine := range s.Cluster {
  124. if machine.State == raft.Leader {
  125. return machine
  126. }
  127. }
  128. return nil
  129. }
  130. func (s *StandbyServer) JoinIndex() uint64 {
  131. return s.joinIndex
  132. }
  133. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  134. leader := s.ClusterLeader()
  135. if leader == nil {
  136. w.Header().Set("Content-Type", "application/json")
  137. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  138. return
  139. }
  140. uhttp.Redirect(leader.ClientURL, w, r)
  141. }
  142. // monitorCluster assumes that the machine has tried to join the cluster and
  143. // failed, so it waits for the interval at the beginning.
  144. func (s *StandbyServer) monitorCluster() {
  145. for {
  146. timer := time.NewTimer(time.Duration(int64(s.SyncInterval * float64(time.Second))))
  147. defer timer.Stop()
  148. select {
  149. case <-s.closeChan:
  150. return
  151. case <-timer.C:
  152. }
  153. if err := s.syncCluster(nil); err != nil {
  154. log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  155. continue
  156. }
  157. leader := s.ClusterLeader()
  158. if leader == nil {
  159. log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
  160. continue
  161. }
  162. if err := s.join(leader.PeerURL); err != nil {
  163. log.Debugf("fail joining through leader %v: %v", leader, err)
  164. continue
  165. }
  166. log.Infof("join through leader %v", leader.PeerURL)
  167. go func() {
  168. s.Stop()
  169. close(s.removeNotify)
  170. }()
  171. return
  172. }
  173. }
  174. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  175. peerURLs = append(s.ClusterURLs(), peerURLs...)
  176. for _, peerURL := range peerURLs {
  177. // Fetch current peer list
  178. machines, err := s.client.GetMachines(peerURL)
  179. if err != nil {
  180. log.Debugf("fail getting machine messages from %v", peerURL)
  181. continue
  182. }
  183. config, err := s.client.GetClusterConfig(peerURL)
  184. if err != nil {
  185. log.Debugf("fail getting cluster config from %v", peerURL)
  186. continue
  187. }
  188. s.setCluster(machines)
  189. s.SetSyncInterval(config.SyncInterval)
  190. if err := s.saveInfo(); err != nil {
  191. log.Warnf("fail saving cluster info into disk: %v", err)
  192. }
  193. return nil
  194. }
  195. return fmt.Errorf("unreachable cluster")
  196. }
  197. func (s *StandbyServer) join(peer string) error {
  198. // Our version must match the leaders version
  199. version, err := s.client.GetVersion(peer)
  200. if err != nil {
  201. log.Debugf("error getting peer version")
  202. return err
  203. }
  204. if version < store.MinVersion() || version > store.MaxVersion() {
  205. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  206. return fmt.Errorf("incompatible version")
  207. }
  208. // Fetch cluster config to see whether exists some place.
  209. clusterConfig, err := s.client.GetClusterConfig(peer)
  210. if err != nil {
  211. log.Debugf("error getting cluster config")
  212. return err
  213. }
  214. if clusterConfig.ActiveSize <= len(s.Cluster) {
  215. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
  216. return fmt.Errorf("out of quota")
  217. }
  218. commitIndex, err := s.client.AddMachine(peer,
  219. &JoinCommand{
  220. MinVersion: store.MinVersion(),
  221. MaxVersion: store.MaxVersion(),
  222. Name: s.Config.Name,
  223. RaftURL: s.Config.PeerURL,
  224. EtcdURL: s.Config.ClientURL,
  225. })
  226. if err != nil {
  227. log.Debugf("error on join request")
  228. return err
  229. }
  230. s.joinIndex = commitIndex
  231. return nil
  232. }
  233. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  234. u, err := url.Parse(urlStr)
  235. if err != nil {
  236. log.Warnf("fail parsing url %v", u)
  237. return urlStr
  238. }
  239. u.Scheme = s.Config.PeerScheme
  240. return u.String()
  241. }
  242. func (s *StandbyServer) loadInfo() error {
  243. var info standbyInfo
  244. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  245. file, err := os.OpenFile(path, os.O_RDONLY, 0600)
  246. if err != nil {
  247. if os.IsNotExist(err) {
  248. return nil
  249. }
  250. return err
  251. }
  252. defer file.Close()
  253. if err = json.NewDecoder(file).Decode(&info); err != nil {
  254. return err
  255. }
  256. s.standbyInfo = info
  257. return nil
  258. }
  259. func (s *StandbyServer) saveInfo() error {
  260. tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
  261. if err != nil {
  262. return err
  263. }
  264. if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
  265. tmpFile.Close()
  266. os.Remove(tmpFile.Name())
  267. return err
  268. }
  269. tmpFile.Close()
  270. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  271. if err = os.Rename(tmpFile.Name(), path); err != nil {
  272. return err
  273. }
  274. return nil
  275. }