standby_server.go 7.3 KB


  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/log"
  15. uhttp "github.com/coreos/etcd/pkg/http"
  16. "github.com/coreos/etcd/store"
  17. )
  18. const standbyInfoName = "standby_info"
  19. type StandbyServerConfig struct {
  20. Name string
  21. PeerScheme string
  22. PeerURL string
  23. ClientURL string
  24. DataDir string
  25. }
  26. type standbyInfo struct {
  27. Running bool
  28. Cluster []*machineMessage
  29. SyncInterval float64
  30. }
  31. type StandbyServer struct {
  32. Config StandbyServerConfig
  33. client *Client
  34. raftServer raft.Server
  35. standbyInfo
  36. joinIndex uint64
  37. removeNotify chan bool
  38. started bool
  39. closeChan chan bool
  40. routineGroup sync.WaitGroup
  41. sync.Mutex
  42. }
  43. func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
  44. s := &StandbyServer{
  45. Config: config,
  46. client: client,
  47. standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
  48. }
  49. if err := s.loadInfo(); err != nil {
  50. log.Warnf("error load standby info file: %v", err)
  51. }
  52. return s
  53. }
  54. func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
  55. s.raftServer = raftServer
  56. }
  57. func (s *StandbyServer) Start() {
  58. s.Lock()
  59. defer s.Unlock()
  60. if s.started {
  61. return
  62. }
  63. s.started = true
  64. s.removeNotify = make(chan bool)
  65. s.closeChan = make(chan bool)
  66. s.routineGroup.Add(1)
  67. go func() {
  68. defer s.routineGroup.Done()
  69. s.monitorCluster()
  70. }()
  71. s.Running = true
  72. }
  73. // Stop stops the server gracefully.
  74. func (s *StandbyServer) Stop() {
  75. s.Lock()
  76. defer s.Unlock()
  77. if !s.started {
  78. return
  79. }
  80. s.started = false
  81. close(s.closeChan)
  82. s.routineGroup.Wait()
  83. if err := s.saveInfo(); err != nil {
  84. log.Warnf("error saving cluster info for standby")
  85. }
  86. s.Running = false
  87. }
  88. // RemoveNotify notifies the server is removed from standby mode and ready
  89. // for peer mode. It should have joined the cluster successfully.
  90. func (s *StandbyServer) RemoveNotify() <-chan bool {
  91. return s.removeNotify
  92. }
  93. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  94. return http.HandlerFunc(s.redirectRequests)
  95. }
  96. func (s *StandbyServer) IsRunning() bool {
  97. return s.Running
  98. }
  99. func (s *StandbyServer) ClusterURLs() []string {
  100. peerURLs := make([]string, 0)
  101. for _, peer := range s.Cluster {
  102. peerURLs = append(peerURLs, peer.PeerURL)
  103. }
  104. return peerURLs
  105. }
  106. func (s *StandbyServer) ClusterSize() int {
  107. return len(s.Cluster)
  108. }
  109. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  110. s.Cluster = cluster
  111. }
  112. func (s *StandbyServer) SyncCluster(peers []string) error {
  113. for i, url := range peers {
  114. peers[i] = s.fullPeerURL(url)
  115. }
  116. if err := s.syncCluster(peers); err != nil {
  117. log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  118. return err
  119. }
  120. log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
  121. return nil
  122. }
  123. func (s *StandbyServer) SetSyncInterval(second float64) {
  124. s.SyncInterval = second
  125. }
  126. func (s *StandbyServer) ClusterLeader() *machineMessage {
  127. for _, machine := range s.Cluster {
  128. if machine.State == raft.Leader {
  129. return machine
  130. }
  131. }
  132. return nil
  133. }
  134. func (s *StandbyServer) JoinIndex() uint64 {
  135. return s.joinIndex
  136. }
  137. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  138. leader := s.ClusterLeader()
  139. if leader == nil {
  140. w.Header().Set("Content-Type", "application/json")
  141. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  142. return
  143. }
  144. uhttp.Redirect(leader.ClientURL, w, r)
  145. }
  146. // monitorCluster assumes that the machine has tried to join the cluster and
  147. // failed, so it waits for the interval at the beginning.
  148. func (s *StandbyServer) monitorCluster() {
  149. ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
  150. defer ticker.Stop()
  151. for {
  152. select {
  153. case <-s.closeChan:
  154. return
  155. case <-ticker.C:
  156. }
  157. if err := s.syncCluster(nil); err != nil {
  158. log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
  159. continue
  160. }
  161. leader := s.ClusterLeader()
  162. if leader == nil {
  163. log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
  164. continue
  165. }
  166. if err := s.join(leader.PeerURL); err != nil {
  167. log.Debugf("fail joining through leader %v: %v", leader, err)
  168. continue
  169. }
  170. log.Infof("join through leader %v", leader.PeerURL)
  171. go func() {
  172. s.Stop()
  173. close(s.removeNotify)
  174. }()
  175. return
  176. }
  177. }
  178. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  179. peerURLs = append(s.ClusterURLs(), peerURLs...)
  180. for _, peerURL := range peerURLs {
  181. // Fetch current peer list
  182. machines, err := s.client.GetMachines(peerURL)
  183. if err != nil {
  184. log.Debugf("fail getting machine messages from %v", peerURL)
  185. continue
  186. }
  187. config, err := s.client.GetClusterConfig(peerURL)
  188. if err != nil {
  189. log.Debugf("fail getting cluster config from %v", peerURL)
  190. continue
  191. }
  192. s.setCluster(machines)
  193. s.SetSyncInterval(config.SyncInterval)
  194. if err := s.saveInfo(); err != nil {
  195. log.Warnf("fail saving cluster info into disk: %v", err)
  196. }
  197. return nil
  198. }
  199. return fmt.Errorf("unreachable cluster")
  200. }
  201. func (s *StandbyServer) join(peer string) error {
  202. for _, url := range s.ClusterURLs() {
  203. if s.Config.PeerURL == url {
  204. s.joinIndex = s.raftServer.CommitIndex()
  205. return nil
  206. }
  207. }
  208. // Our version must match the leaders version
  209. version, err := s.client.GetVersion(peer)
  210. if err != nil {
  211. log.Debugf("error getting peer version")
  212. return err
  213. }
  214. if version < store.MinVersion() || version > store.MaxVersion() {
  215. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  216. return fmt.Errorf("incompatible version")
  217. }
  218. // Fetch cluster config to see whether exists some place.
  219. clusterConfig, err := s.client.GetClusterConfig(peer)
  220. if err != nil {
  221. log.Debugf("error getting cluster config")
  222. return err
  223. }
  224. if clusterConfig.ActiveSize <= len(s.Cluster) {
  225. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
  226. return fmt.Errorf("out of quota")
  227. }
  228. commitIndex, err := s.client.AddMachine(peer,
  229. &JoinCommand{
  230. MinVersion: store.MinVersion(),
  231. MaxVersion: store.MaxVersion(),
  232. Name: s.Config.Name,
  233. RaftURL: s.Config.PeerURL,
  234. EtcdURL: s.Config.ClientURL,
  235. })
  236. if err != nil {
  237. log.Debugf("error on join request")
  238. return err
  239. }
  240. s.joinIndex = commitIndex
  241. return nil
  242. }
  243. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  244. u, err := url.Parse(urlStr)
  245. if err != nil {
  246. log.Warnf("fail parsing url %v", u)
  247. return urlStr
  248. }
  249. u.Scheme = s.Config.PeerScheme
  250. return u.String()
  251. }
  252. func (s *StandbyServer) loadInfo() error {
  253. var info standbyInfo
  254. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  255. file, err := os.OpenFile(path, os.O_RDONLY, 0600)
  256. if err != nil {
  257. if os.IsNotExist(err) {
  258. return nil
  259. }
  260. return err
  261. }
  262. defer file.Close()
  263. if err = json.NewDecoder(file).Decode(&info); err != nil {
  264. return err
  265. }
  266. s.standbyInfo = info
  267. return nil
  268. }
  269. func (s *StandbyServer) saveInfo() error {
  270. tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
  271. if err != nil {
  272. return err
  273. }
  274. if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
  275. tmpFile.Close()
  276. os.Remove(tmpFile.Name())
  277. return err
  278. }
  279. tmpFile.Close()
  280. path := filepath.Join(s.Config.DataDir, standbyInfoName)
  281. if err = os.Rename(tmpFile.Name(), path); err != nil {
  282. return err
  283. }
  284. return nil
  285. }