standby_server.go 7.3 KB


  1. package server
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "os"
  8. "path/filepath"
  9. "sync"
  10. "time"
  11. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  12. etcdErr "github.com/coreos/etcd/error"
  13. "github.com/coreos/etcd/log"
  14. uhttp "github.com/coreos/etcd/pkg/http"
  15. "github.com/coreos/etcd/store"
  16. )
  17. const clusterInfoName = "cluster_info"
  18. type StandbyServerConfig struct {
  19. Name string
  20. PeerScheme string
  21. PeerURL string
  22. ClientURL string
  23. DataDir string
  24. }
  25. type StandbyServer struct {
  26. Config StandbyServerConfig
  27. client *Client
  28. cluster []*machineMessage
  29. syncInterval float64
  30. joinIndex uint64
  31. file *os.File
  32. recorded bool
  33. removeNotify chan bool
  34. started bool
  35. closeChan chan bool
  36. routineGroup sync.WaitGroup
  37. sync.Mutex
  38. }
  39. func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) {
  40. s := &StandbyServer{
  41. Config: config,
  42. client: client,
  43. syncInterval: DefaultSyncInterval,
  44. }
  45. if err := s.openClusterInfo(); err != nil {
  46. return nil, fmt.Errorf("error open/create cluster info file: %v", err)
  47. }
  48. if clusterInfo, err := s.loadClusterInfo(); err == nil {
  49. s.setCluster(clusterInfo)
  50. }
  51. return s, nil
  52. }
  53. func (s *StandbyServer) Start() {
  54. s.Lock()
  55. defer s.Unlock()
  56. if s.started {
  57. return
  58. }
  59. s.started = true
  60. s.removeNotify = make(chan bool)
  61. s.closeChan = make(chan bool)
  62. s.routineGroup.Add(1)
  63. go func() {
  64. defer s.routineGroup.Done()
  65. s.monitorCluster()
  66. }()
  67. }
  68. // Stop stops the server gracefully.
  69. func (s *StandbyServer) Stop() {
  70. s.Lock()
  71. defer s.Unlock()
  72. if !s.started {
  73. return
  74. }
  75. s.started = false
  76. close(s.closeChan)
  77. s.routineGroup.Wait()
  78. if err := s.clearClusterInfo(); err != nil {
  79. log.Warnf("error clearing cluster info for standby")
  80. }
  81. }
  82. // RemoveNotify notifies the server is removed from standby mode and ready
  83. // for peer mode. It should have joined the cluster successfully.
  84. func (s *StandbyServer) RemoveNotify() <-chan bool {
  85. return s.removeNotify
  86. }
  87. func (s *StandbyServer) ClientHTTPHandler() http.Handler {
  88. return http.HandlerFunc(s.redirectRequests)
  89. }
  90. func (s *StandbyServer) ClusterRecorded() bool {
  91. return s.recorded
  92. }
  93. func (s *StandbyServer) Cluster() []string {
  94. peerURLs := make([]string, 0)
  95. for _, peer := range s.cluster {
  96. peerURLs = append(peerURLs, peer.PeerURL)
  97. }
  98. return peerURLs
  99. }
  100. func (s *StandbyServer) ClusterSize() int {
  101. return len(s.cluster)
  102. }
  103. func (s *StandbyServer) setCluster(cluster []*machineMessage) {
  104. s.cluster = cluster
  105. }
  106. func (s *StandbyServer) SyncCluster(peers []string) error {
  107. for i, url := range peers {
  108. peers[i] = s.fullPeerURL(url)
  109. }
  110. if err := s.syncCluster(peers); err != nil {
  111. log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
  112. return err
  113. }
  114. log.Infof("set cluster(%v) for standby server", s.Cluster())
  115. return nil
  116. }
  117. func (s *StandbyServer) SetSyncInterval(second float64) {
  118. s.syncInterval = second
  119. }
  120. func (s *StandbyServer) ClusterLeader() *machineMessage {
  121. for _, machine := range s.cluster {
  122. if machine.State == raft.Leader {
  123. return machine
  124. }
  125. }
  126. return nil
  127. }
  128. func (s *StandbyServer) JoinIndex() uint64 {
  129. return s.joinIndex
  130. }
  131. func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
  132. leader := s.ClusterLeader()
  133. if leader == nil {
  134. w.Header().Set("Content-Type", "application/json")
  135. etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
  136. return
  137. }
  138. uhttp.Redirect(leader.ClientURL, w, r)
  139. }
  140. func (s *StandbyServer) monitorCluster() {
  141. first := true
  142. for {
  143. if !first {
  144. timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
  145. defer timer.Stop()
  146. select {
  147. case <-s.closeChan:
  148. return
  149. case <-timer.C:
  150. }
  151. }
  152. first = false
  153. if err := s.syncCluster(nil); err != nil {
  154. log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
  155. continue
  156. }
  157. leader := s.ClusterLeader()
  158. if leader == nil {
  159. log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
  160. continue
  161. }
  162. if err := s.join(leader.PeerURL); err != nil {
  163. log.Debugf("fail joining through leader %v: %v", leader, err)
  164. continue
  165. }
  166. log.Infof("join through leader %v", leader.PeerURL)
  167. go func() {
  168. s.Stop()
  169. close(s.removeNotify)
  170. }()
  171. return
  172. }
  173. }
  174. func (s *StandbyServer) syncCluster(peerURLs []string) error {
  175. peerURLs = append(s.Cluster(), peerURLs...)
  176. for _, peerURL := range peerURLs {
  177. // Fetch current peer list
  178. machines, err := s.client.GetMachines(peerURL)
  179. if err != nil {
  180. log.Debugf("fail getting machine messages from %v", peerURL)
  181. continue
  182. }
  183. config, err := s.client.GetClusterConfig(peerURL)
  184. if err != nil {
  185. log.Debugf("fail getting cluster config from %v", peerURL)
  186. continue
  187. }
  188. s.setCluster(machines)
  189. s.SetSyncInterval(config.SyncInterval)
  190. if err := s.saveClusterInfo(); err != nil {
  191. log.Warnf("fail saving cluster info into disk: %v", err)
  192. }
  193. return nil
  194. }
  195. return fmt.Errorf("unreachable cluster")
  196. }
  197. func (s *StandbyServer) join(peer string) error {
  198. // Our version must match the leaders version
  199. version, err := s.client.GetVersion(peer)
  200. if err != nil {
  201. log.Debugf("error getting peer version")
  202. return err
  203. }
  204. if version < store.MinVersion() || version > store.MaxVersion() {
  205. log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
  206. return fmt.Errorf("incompatible version")
  207. }
  208. // Fetch cluster config to see whether exists some place.
  209. clusterConfig, err := s.client.GetClusterConfig(peer)
  210. if err != nil {
  211. log.Debugf("error getting cluster config")
  212. return err
  213. }
  214. if clusterConfig.ActiveSize <= len(s.Cluster()) {
  215. log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
  216. return fmt.Errorf("out of quota")
  217. }
  218. commitIndex, err := s.client.AddMachine(peer,
  219. &JoinCommand{
  220. MinVersion: store.MinVersion(),
  221. MaxVersion: store.MaxVersion(),
  222. Name: s.Config.Name,
  223. RaftURL: s.Config.PeerURL,
  224. EtcdURL: s.Config.ClientURL,
  225. })
  226. if err != nil {
  227. log.Debugf("error on join request")
  228. return err
  229. }
  230. s.joinIndex = commitIndex
  231. return nil
  232. }
  233. func (s *StandbyServer) fullPeerURL(urlStr string) string {
  234. u, err := url.Parse(urlStr)
  235. if err != nil {
  236. log.Warnf("fail parsing url %v", u)
  237. return urlStr
  238. }
  239. u.Scheme = s.Config.PeerScheme
  240. return u.String()
  241. }
  242. func (s *StandbyServer) openClusterInfo() error {
  243. var err error
  244. path := filepath.Join(s.Config.DataDir, clusterInfoName)
  245. s.file, err = os.OpenFile(path, os.O_RDWR, 0600)
  246. if err != nil {
  247. if os.IsNotExist(err) {
  248. s.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
  249. }
  250. return err
  251. }
  252. return nil
  253. }
  254. func (s *StandbyServer) loadClusterInfo() ([]*machineMessage, error) {
  255. clusterInfo := make([]*machineMessage, 0)
  256. if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
  257. return nil, err
  258. }
  259. if err := json.NewDecoder(s.file).Decode(&clusterInfo); err != nil {
  260. return nil, err
  261. }
  262. s.recorded = true
  263. return clusterInfo, nil
  264. }
  265. func (s *StandbyServer) saveClusterInfo() error {
  266. if err := s.clearClusterInfo(); err != nil {
  267. return nil
  268. }
  269. if err := json.NewEncoder(s.file).Encode(s.cluster); err != nil {
  270. return err
  271. }
  272. if err := s.file.Sync(); err != nil {
  273. return err
  274. }
  275. s.recorded = true
  276. return nil
  277. }
  278. func (s *StandbyServer) clearClusterInfo() error {
  279. if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
  280. return err
  281. }
  282. if err := s.file.Truncate(0); err != nil {
  283. return err
  284. }
  285. s.recorded = false
  286. return nil
  287. }