standby_server.go 7.4 KB


  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/log"
  15. uhttp "github.com/coreos/etcd/pkg/http"
  16. "github.com/coreos/etcd/store"
  17. )
  18. const standbyInfoName = "standby_info"
  19. type StandbyServerConfig struct {
  20. Name string
  21. PeerScheme string
  22. PeerURL string
  23. ClientURL string
  24. DataDir string
  25. }
  26. type standbyInfo struct {
  27. // stay running in standby mode
  28. Running bool
  29. Cluster []*machineMessage
  30. SyncInterval float64
  31. }
  32. type StandbyServer struct {
  33. Config StandbyServerConfig
  34. client *Client
  35. raftServer raft.Server
  36. standbyInfo
  37. joinIndex uint64
  38. removeNotify chan bool
  39. started bool
  40. closeChan chan bool
  41. routineGroup sync.WaitGroup
  42. sync.Mutex
  43. }
  44. func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
  45. s := &StandbyServer{
  46. Config: config,
  47. client: client,
  48. standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
  49. }
  50. if err := s.loadInfo(); err != nil {
  51. log.Warnf("error load standby info file: %v", err)
  52. }
  53. return s
  54. }
  55. func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
  56. s.raftServer = raftServer
  57. }
  58. func (s *StandbyServer) Start() {
  59. s.Lock()
  60. defer s.Unlock()
  61. if s.started {
  62. return
  63. }
  64. s.started = true
  65. s.removeNotify = make(chan bool)
  66. s.closeChan = make(chan bool)
  67. s.Running = true
  68. if err := s.saveInfo(); err != nil {
  69. log.Warnf("error saving cluster info for standby")
  70. }
  71. s.routineGroup.Add(1)
  72. go func() {
  73. defer s.routineGroup.Done()
  74. s.monitorCluster()
  75. }()
  76. }
  77. // Stop stops the server gracefully.
  78. func (s *StandbyServer) Stop() {
  79. s.Lock()
  80. defer s.Unlock()
  81. if !s.started {
  82. return
  83. }
  84. s.started = false
  85. close(s.closeChan)
  86. s.routineGroup.Wait()
  87. }
  88. // RemoveNotify notifies the server is removed from standby mode and ready
  89. // for peer mode. It should have joined the cluster successfully.
  90. func (s *StandbyServer) RemoveNotify() <-chan bool {
  91. return s.removeNotify
  92. }
  93. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  94. return http.HandlerFunc(s.redirectRequests)
  95. }
  96. func (s *StandbyServer) IsRunning() bool {
  97. return s.Running
  98. }
  99. func (s *StandbyServer) ClusterURLs() []string {
  100. peerURLs := make([]string, 0)
  101. for _, peer := range s.Cluster {
  102. peerURLs = append(peerURLs, peer.PeerURL)
  103. }
  104. return peerURLs
  105. }
  106. func (s *StandbyServer) ClusterSize() int {
  107. return len(s.Cluster)
  108. }
  109. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  110. s.Cluster = cluster
  111. }
  112. func (s *StandbyServer) SyncCluster(peers []string) error {
  113. for i, url := range peers {
  114. peers[i] = s.fullPeerURL(url)
  115. }
  116. if err := s.syncCluster(peers); err != nil {
  117. log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  118. return err
  119. }
  120. log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
  121. return nil
  122. }
  123. func (s *StandbyServer) SetSyncInterval(second float64) {
  124. s.SyncInterval = second
  125. }
  126. func (s *StandbyServer) ClusterLeader() *machineMessage {
  127. for _, machine := range s.Cluster {
  128. if machine.State == raft.Leader {
  129. return machine
  130. }
  131. }
  132. return nil
  133. }
  134. func (s *StandbyServer) JoinIndex() uint64 {
  135. return s.joinIndex
  136. }
  137. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  138. leader := s.ClusterLeader()
  139. if leader == nil {
  140. w.Header().Set("Content-Type", "application/json")
  141. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  142. return
  143. }
  144. uhttp.Redirect(leader.ClientURL, w, r)
  145. }
  146. // monitorCluster assumes that the machine has tried to join the cluster and
  147. // failed, so it waits for the interval at the beginning.
  148. func (s *StandbyServer) monitorCluster() {
  149. ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
  150. defer ticker.Stop()
  151. for {
  152. select {
  153. case <-s.closeChan:
  154. return
  155. case <-ticker.C:
  156. }
  157. if err := s.syncCluster(nil); err != nil {
  158. log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  159. continue
  160. }
  161. leader := s.ClusterLeader()
  162. if leader == nil {
  163. log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
  164. continue
  165. }
  166. if err := s.join(leader.PeerURL); err != nil {
  167. log.Debugf("fail joining through leader %v: %v", leader, err)
  168. continue
  169. }
  170. log.Infof("join through leader %v", leader.PeerURL)
  171. s.Running = false
  172. if err := s.saveInfo(); err != nil {
  173. log.Warnf("error saving cluster info for standby")
  174. }
  175. go func() {
  176. s.Stop()
  177. close(s.removeNotify)
  178. }()
  179. return
  180. }
  181. }
  182. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  183. peerURLs = append(s.ClusterURLs(), peerURLs...)
  184. for _, peerURL := range peerURLs {
  185. // Fetch current peer list
  186. machines, err := s.client.GetMachines(peerURL)
  187. if err != nil {
  188. log.Debugf("fail getting machine messages from %v", peerURL)
  189. continue
  190. }
  191. config, err := s.client.GetClusterConfig(peerURL)
  192. if err != nil {
  193. log.Debugf("fail getting cluster config from %v", peerURL)
  194. continue
  195. }
  196. s.setCluster(machines)
  197. s.SetSyncInterval(config.SyncInterval)
  198. if err := s.saveInfo(); err != nil {
  199. log.Warnf("fail saving cluster info into disk: %v", err)
  200. }
  201. return nil
  202. }
  203. return fmt.Errorf("unreachable cluster")
  204. }
  205. func (s *StandbyServer) join(peer string) error {
  206. for _, url := range s.ClusterURLs() {
  207. if s.Config.PeerURL == url {
  208. s.joinIndex = s.raftServer.CommitIndex()
  209. return nil
  210. }
  211. }
  212. // Our version must match the leaders version
  213. version, err := s.client.GetVersion(peer)
  214. if err != nil {
  215. log.Debugf("error getting peer version")
  216. return err
  217. }
  218. if version < store.MinVersion() || version > store.MaxVersion() {
  219. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  220. return fmt.Errorf("incompatible version")
  221. }
  222. // Fetch cluster config to see whether exists some place.
  223. clusterConfig, err := s.client.GetClusterConfig(peer)
  224. if err != nil {
  225. log.Debugf("error getting cluster config")
  226. return err
  227. }
  228. if clusterConfig.ActiveSize <= len(s.Cluster) {
  229. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
  230. return fmt.Errorf("out of quota")
  231. }
  232. commitIndex, err := s.client.AddMachine(peer,
  233. &JoinCommand{
  234. MinVersion: store.MinVersion(),
  235. MaxVersion: store.MaxVersion(),
  236. Name: s.Config.Name,
  237. RaftURL: s.Config.PeerURL,
  238. EtcdURL: s.Config.ClientURL,
  239. })
  240. if err != nil {
  241. log.Debugf("error on join request")
  242. return err
  243. }
  244. s.joinIndex = commitIndex
  245. return nil
  246. }
  247. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  248. u, err := url.Parse(urlStr)
  249. if err != nil {
  250. log.Warnf("fail parsing url %v", u)
  251. return urlStr
  252. }
  253. u.Scheme = s.Config.PeerScheme
  254. return u.String()
  255. }
  256. func (s *StandbyServer) loadInfo() error {
  257. var info standbyInfo
  258. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  259. file, err := os.OpenFile(path, os.O_RDONLY, 0600)
  260. if err != nil {
  261. if os.IsNotExist(err) {
  262. return nil
  263. }
  264. return err
  265. }
  266. defer file.Close()
  267. if err = json.NewDecoder(file).Decode(&info); err != nil {
  268. return err
  269. }
  270. s.standbyInfo = info
  271. return nil
  272. }
  273. func (s *StandbyServer) saveInfo() error {
  274. tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
  275. if err != nil {
  276. return err
  277. }
  278. if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
  279. tmpFile.Close()
  280. os.Remove(tmpFile.Name())
  281. return err
  282. }
  283. tmpFile.Close()
  284. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  285. if err = os.Rename(tmpFile.Name(), path); err != nil {
  286. return err
  287. }
  288. return nil
  289. }