123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package server
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "path/filepath"
- "sync"
- "time"
- "github.com/coreos/etcd/third_party/github.com/goraft/raft"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/log"
- uhttp "github.com/coreos/etcd/pkg/http"
- "github.com/coreos/etcd/store"
- )
- const standbyInfoName = "standby_info"
- type StandbyServerConfig struct {
- Name string
- PeerScheme string
- PeerURL string
- ClientURL string
- DataDir string
- }
- type standbyInfo struct {
- // stay running in standby mode
- Running bool
- Cluster []*machineMessage
- SyncInterval float64
- }
- type StandbyServer struct {
- Config StandbyServerConfig
- client *Client
- raftServer raft.Server
- standbyInfo
- joinIndex uint64
- removeNotify chan bool
- started bool
- closeChan chan bool
- routineGroup sync.WaitGroup
- sync.Mutex
- }
- func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
- s := &StandbyServer{
- Config: config,
- client: client,
- standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
- }
- if err := s.loadInfo(); err != nil {
- log.Warnf("error load standby info file: %v", err)
- }
- return s
- }
- func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
- s.raftServer = raftServer
- }
- func (s *StandbyServer) Start() {
- s.Lock()
- defer s.Unlock()
- if s.started {
- return
- }
- s.started = true
- s.removeNotify = make(chan bool)
- s.closeChan = make(chan bool)
- s.Running = true
- if err := s.saveInfo(); err != nil {
- log.Warnf("error saving cluster info for standby")
- }
- s.routineGroup.Add(1)
- go func() {
- defer s.routineGroup.Done()
- s.monitorCluster()
- }()
- }
- // Stop stops the server gracefully.
- func (s *StandbyServer) Stop() {
- s.Lock()
- defer s.Unlock()
- if !s.started {
- return
- }
- s.started = false
- close(s.closeChan)
- s.routineGroup.Wait()
- }
- // RemoveNotify notifies the server is removed from standby mode and ready
- // for peer mode. It should have joined the cluster successfully.
- func (s *StandbyServer) RemoveNotify() <-chan bool {
- return s.removeNotify
- }
- func (s *StandbyServer) ClientHTTPHandler() http.Handler {
- return http.HandlerFunc(s.redirectRequests)
- }
- func (s *StandbyServer) IsRunning() bool {
- return s.Running
- }
- func (s *StandbyServer) ClusterURLs() []string {
- peerURLs := make([]string, 0)
- for _, peer := range s.Cluster {
- peerURLs = append(peerURLs, peer.PeerURL)
- }
- return peerURLs
- }
- func (s *StandbyServer) ClusterSize() int {
- return len(s.Cluster)
- }
- func (s *StandbyServer) setCluster(cluster []*machineMessage) {
- s.Cluster = cluster
- }
- func (s *StandbyServer) SyncCluster(peers []string) error {
- for i, url := range peers {
- peers[i] = s.fullPeerURL(url)
- }
- if err := s.syncCluster(peers); err != nil {
- log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
- return err
- }
- log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
- return nil
- }
- func (s *StandbyServer) SetSyncInterval(second float64) {
- s.SyncInterval = second
- }
- func (s *StandbyServer) ClusterLeader() *machineMessage {
- for _, machine := range s.Cluster {
- if machine.State == raft.Leader {
- return machine
- }
- }
- return nil
- }
- func (s *StandbyServer) JoinIndex() uint64 {
- return s.joinIndex
- }
- func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
- leader := s.ClusterLeader()
- if leader == nil {
- w.Header().Set("Content-Type", "application/json")
- etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
- return
- }
- uhttp.Redirect(leader.ClientURL, w, r)
- }
- // monitorCluster assumes that the machine has tried to join the cluster and
- // failed, so it waits for the interval at the beginning.
- func (s *StandbyServer) monitorCluster() {
- ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
- defer ticker.Stop()
- for {
- select {
- case <-s.closeChan:
- return
- case <-ticker.C:
- }
- if err := s.syncCluster(nil); err != nil {
- log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
- continue
- }
- leader := s.ClusterLeader()
- if leader == nil {
- log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
- continue
- }
- if err := s.join(leader.PeerURL); err != nil {
- log.Debugf("fail joining through leader %v: %v", leader, err)
- continue
- }
- log.Infof("join through leader %v", leader.PeerURL)
- s.Running = false
- if err := s.saveInfo(); err != nil {
- log.Warnf("error saving cluster info for standby")
- }
- go func() {
- s.Stop()
- close(s.removeNotify)
- }()
- return
- }
- }
- func (s *StandbyServer) syncCluster(peerURLs []string) error {
- peerURLs = append(s.ClusterURLs(), peerURLs...)
- for _, peerURL := range peerURLs {
- // Fetch current peer list
- machines, err := s.client.GetMachines(peerURL)
- if err != nil {
- log.Debugf("fail getting machine messages from %v", peerURL)
- continue
- }
- config, err := s.client.GetClusterConfig(peerURL)
- if err != nil {
- log.Debugf("fail getting cluster config from %v", peerURL)
- continue
- }
- s.setCluster(machines)
- s.SetSyncInterval(config.SyncInterval)
- if err := s.saveInfo(); err != nil {
- log.Warnf("fail saving cluster info into disk: %v", err)
- }
- return nil
- }
- return fmt.Errorf("unreachable cluster")
- }
- func (s *StandbyServer) join(peer string) error {
- for _, url := range s.ClusterURLs() {
- if s.Config.PeerURL == url {
- s.joinIndex = s.raftServer.CommitIndex()
- return nil
- }
- }
- // Our version must match the leaders version
- version, err := s.client.GetVersion(peer)
- if err != nil {
- log.Debugf("error getting peer version")
- return err
- }
- if version < store.MinVersion() || version > store.MaxVersion() {
- log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
- return fmt.Errorf("incompatible version")
- }
- // Fetch cluster config to see whether exists some place.
- clusterConfig, err := s.client.GetClusterConfig(peer)
- if err != nil {
- log.Debugf("error getting cluster config")
- return err
- }
- if clusterConfig.ActiveSize <= len(s.Cluster) {
- log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
- return fmt.Errorf("out of quota")
- }
- commitIndex, err := s.client.AddMachine(peer,
- &JoinCommand{
- MinVersion: store.MinVersion(),
- MaxVersion: store.MaxVersion(),
- Name: s.Config.Name,
- RaftURL: s.Config.PeerURL,
- EtcdURL: s.Config.ClientURL,
- })
- if err != nil {
- log.Debugf("error on join request")
- return err
- }
- s.joinIndex = commitIndex
- return nil
- }
- func (s *StandbyServer) fullPeerURL(urlStr string) string {
- u, err := url.Parse(urlStr)
- if err != nil {
- log.Warnf("fail parsing url %v", u)
- return urlStr
- }
- u.Scheme = s.Config.PeerScheme
- return u.String()
- }
- func (s *StandbyServer) loadInfo() error {
- var info standbyInfo
- path := filepath.Join(s.Config.DataDir, standbyInfoName)
- file, err := os.OpenFile(path, os.O_RDONLY, 0600)
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
- return err
- }
- defer file.Close()
- if err = json.NewDecoder(file).Decode(&info); err != nil {
- return err
- }
- s.standbyInfo = info
- return nil
- }
- func (s *StandbyServer) saveInfo() error {
- tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
- if err != nil {
- return err
- }
- if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
- tmpFile.Close()
- os.Remove(tmpFile.Name())
- return err
- }
- tmpFile.Close()
- path := filepath.Join(s.Config.DataDir, standbyInfoName)
- if err = os.Rename(tmpFile.Name(), path); err != nil {
- return err
- }
- return nil
- }
|