standby_server.go 7.5 KB


  1. // +build ignore
  2. package server
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "time"
  13. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/log"
  16. uhttp "github.com/coreos/etcd/pkg/http"
  17. "github.com/coreos/etcd/store"
  18. )
  19. const standbyInfoName = "standby_info"
  20. type StandbyServerConfig struct {
  21. Name string
  22. PeerScheme string
  23. PeerURL string
  24. ClientURL string
  25. DataDir string
  26. }
  27. type standbyInfo struct {
  28. // stay running in standby mode
  29. Running bool
  30. Cluster []*machineMessage
  31. SyncInterval float64
  32. }
  33. type StandbyServer struct {
  34. Config StandbyServerConfig
  35. client *Client
  36. raftServer raft.Server
  37. standbyInfo
  38. joinIndex uint64
  39. removeNotify chan bool
  40. started bool
  41. closeChan chan bool
  42. routineGroup sync.WaitGroup
  43. sync.Mutex
  44. }
  45. func NewStandbyServer(cfg StandbyServerConfig, client *Client) *StandbyServer {
  46. s := &StandbyServer{
  47. Config: cfg,
  48. client: client,
  49. standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
  50. }
  51. if err := s.loadInfo(); err != nil {
  52. log.Warnf("error load standby info file: %v", err)
  53. }
  54. return s
  55. }
  56. func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
  57. s.raftServer = raftServer
  58. }
  59. func (s *StandbyServer) Start() {
  60. s.Lock()
  61. defer s.Unlock()
  62. if s.started {
  63. return
  64. }
  65. s.started = true
  66. s.removeNotify = make(chan bool)
  67. s.closeChan = make(chan bool)
  68. s.Running = true
  69. if err := s.saveInfo(); err != nil {
  70. log.Warnf("error saving cluster info for standby")
  71. }
  72. s.routineGroup.Add(1)
  73. go func() {
  74. defer s.routineGroup.Done()
  75. s.monitorCluster()
  76. }()
  77. }
  78. // Stop stops the server gracefully.
  79. func (s *StandbyServer) Stop() {
  80. s.Lock()
  81. defer s.Unlock()
  82. if !s.started {
  83. return
  84. }
  85. s.started = false
  86. close(s.closeChan)
  87. s.routineGroup.Wait()
  88. }
  89. // RemoveNotify notifies the server is removed from standby mode and ready
  90. // for peer mode. It should have joined the cluster successfully.
  91. func (s *StandbyServer) RemoveNotify() <-chan bool {
  92. return s.removeNotify
  93. }
  94. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  95. return http.HandlerFunc(s.redirectRequests)
  96. }
  97. func (s *StandbyServer) IsRunning() bool {
  98. return s.Running
  99. }
  100. func (s *StandbyServer) ClusterURLs() []string {
  101. peerURLs := make([]string, 0)
  102. for _, peer := range s.Cluster {
  103. peerURLs = append(peerURLs, peer.PeerURL)
  104. }
  105. return peerURLs
  106. }
  107. func (s *StandbyServer) ClusterSize() int {
  108. return len(s.Cluster)
  109. }
  110. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  111. s.Cluster = cluster
  112. }
  113. func (s *StandbyServer) SyncCluster(peers []string) error {
  114. for i, url := range peers {
  115. peers[i] = s.fullPeerURL(url)
  116. }
  117. if err := s.syncCluster(peers); err != nil {
  118. log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  119. return err
  120. }
  121. log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
  122. return nil
  123. }
  124. func (s *StandbyServer) SetSyncInterval(second float64) {
  125. s.SyncInterval = second
  126. }
  127. func (s *StandbyServer) ClusterLeader() *machineMessage {
  128. for _, machine := range s.Cluster {
  129. if machine.State == raft.Leader {
  130. return machine
  131. }
  132. }
  133. return nil
  134. }
  135. func (s *StandbyServer) JoinIndex() uint64 {
  136. return s.joinIndex
  137. }
  138. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  139. leader := s.ClusterLeader()
  140. if leader == nil {
  141. w.Header().Set("Content-Type", "application/json")
  142. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  143. return
  144. }
  145. uhttp.Redirect(leader.ClientURL, w, r)
  146. }
  147. // monitorCluster assumes that the machine has tried to join the cluster and
  148. // failed, so it waits for the interval at the beginning.
  149. func (s *StandbyServer) monitorCluster() {
  150. ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
  151. defer ticker.Stop()
  152. for {
  153. select {
  154. case <-s.closeChan:
  155. return
  156. case <-ticker.C:
  157. }
  158. if err := s.syncCluster(nil); err != nil {
  159. log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  160. continue
  161. }
  162. leader := s.ClusterLeader()
  163. if leader == nil {
  164. log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
  165. continue
  166. }
  167. if err := s.join(leader.PeerURL); err != nil {
  168. log.Debugf("fail joining through leader %v: %v", leader, err)
  169. continue
  170. }
  171. log.Infof("join through leader %v", leader.PeerURL)
  172. s.Running = false
  173. if err := s.saveInfo(); err != nil {
  174. log.Warnf("error saving cluster info for standby")
  175. }
  176. go func() {
  177. s.Stop()
  178. close(s.removeNotify)
  179. }()
  180. return
  181. }
  182. }
  183. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  184. peerURLs = append(s.ClusterURLs(), peerURLs...)
  185. for _, peerURL := range peerURLs {
  186. // Fetch current peer list
  187. machines, err := s.client.GetMachines(peerURL)
  188. if err != nil {
  189. log.Debugf("fail getting machine messages from %v", peerURL)
  190. continue
  191. }
  192. cfg, err := s.client.GetClusterConfig(peerURL)
  193. if err != nil {
  194. log.Debugf("fail getting cluster config from %v", peerURL)
  195. continue
  196. }
  197. s.setCluster(machines)
  198. s.SetSyncInterval(cfg.SyncInterval)
  199. if err := s.saveInfo(); err != nil {
  200. log.Warnf("fail saving cluster info into disk: %v", err)
  201. }
  202. return nil
  203. }
  204. return fmt.Errorf("unreachable cluster")
  205. }
  206. func (s *StandbyServer) join(peer string) error {
  207. for _, url := range s.ClusterURLs() {
  208. if s.Config.PeerURL == url {
  209. s.joinIndex = s.raftServer.CommitIndex()
  210. return nil
  211. }
  212. }
  213. // Our version must match the leaders version
  214. version, err := s.client.GetVersion(peer)
  215. if err != nil {
  216. log.Debugf("error getting peer version")
  217. return err
  218. }
  219. if version < store.MinVersion() || version > store.MaxVersion() {
  220. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  221. return fmt.Errorf("incompatible version")
  222. }
  223. // Fetch cluster config to see whether exists some place.
  224. clusterConfig, err := s.client.GetClusterConfig(peer)
  225. if err != nil {
  226. log.Debugf("error getting cluster config")
  227. return err
  228. }
  229. if clusterConfig.ActiveSize <= len(s.Cluster) {
  230. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
  231. return fmt.Errorf("out of quota")
  232. }
  233. commitIndex, err := s.client.AddMachine(peer,
  234. &JoinCommand{
  235. MinVersion: store.MinVersion(),
  236. MaxVersion: store.MaxVersion(),
  237. Name: s.Config.Name,
  238. RaftURL: s.Config.PeerURL,
  239. EtcdURL: s.Config.ClientURL,
  240. })
  241. if err != nil {
  242. log.Debugf("error on join request")
  243. return err
  244. }
  245. s.joinIndex = commitIndex
  246. return nil
  247. }
  248. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  249. u, err := url.Parse(urlStr)
  250. if err != nil {
  251. log.Warnf("fail parsing url %v", u)
  252. return urlStr
  253. }
  254. u.Scheme = s.Config.PeerScheme
  255. return u.String()
  256. }
  257. func (s *StandbyServer) loadInfo() error {
  258. var info standbyInfo
  259. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  260. file, err := os.OpenFile(path, os.O_RDONLY, 0600)
  261. if err != nil {
  262. if os.IsNotExist(err) {
  263. return nil
  264. }
  265. return err
  266. }
  267. defer file.Close()
  268. if err = json.NewDecoder(file).Decode(&info); err != nil {
  269. return err
  270. }
  271. s.standbyInfo = info
  272. return nil
  273. }
  274. func (s *StandbyServer) saveInfo() error {
  275. tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
  276. if err != nil {
  277. return err
  278. }
  279. if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
  280. tmpFile.Close()
  281. os.Remove(tmpFile.Name())
  282. return err
  283. }
  284. tmpFile.Close()
  285. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  286. if err = os.Rename(tmpFile.Name(), path); err != nil {
  287. return err
  288. }
  289. return nil
  290. }