standby.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package etcd
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. "github.com/coreos/etcd/store"
  10. )
  11. type standby struct {
  12. id int64
  13. pubAddr string
  14. raftPubAddr string
  15. client *v2client
  16. peerHub *peerHub
  17. nodes map[string]bool
  18. leader int64
  19. leaderAddr string
  20. clusterConf *config.ClusterConfig
  21. stopc chan struct{}
  22. *http.ServeMux
  23. }
  24. func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby {
  25. s := &standby{
  26. id: id,
  27. pubAddr: pubAddr,
  28. raftPubAddr: raftPubAddr,
  29. client: client,
  30. peerHub: peerHub,
  31. nodes: nodes,
  32. leader: noneId,
  33. leaderAddr: "",
  34. clusterConf: config.NewClusterConfig(),
  35. stopc: make(chan struct{}),
  36. ServeMux: http.NewServeMux(),
  37. }
  38. s.Handle("/", handlerErr(s.serveRedirect))
  39. return s
  40. }
  41. func (s *standby) run() int64 {
  42. var syncDuration time.Duration
  43. for {
  44. select {
  45. case <-time.After(syncDuration):
  46. case <-s.stopc:
  47. log.Printf("Standby %d stopped\n", s.id)
  48. return stopMode
  49. }
  50. if err := s.syncCluster(); err != nil {
  51. log.Println("standby sync:", err)
  52. continue
  53. }
  54. syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second))
  55. if s.clusterConf.ActiveSize <= len(s.nodes) {
  56. continue
  57. }
  58. if err := s.joinByAddr(s.leaderAddr); err != nil {
  59. log.Println("standby join:", err)
  60. continue
  61. }
  62. return participantMode
  63. }
  64. }
  65. func (s *standby) stop() {
  66. close(s.stopc)
  67. }
  68. func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error {
  69. if s.leader == noneId {
  70. return fmt.Errorf("no leader in the cluster")
  71. }
  72. redirectAddr, err := buildRedirectURL(s.leaderAddr, r.URL)
  73. if err != nil {
  74. return err
  75. }
  76. http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
  77. return nil
  78. }
  79. func (s *standby) syncCluster() error {
  80. for node := range s.nodes {
  81. machines, err := s.client.GetMachines(node)
  82. if err != nil {
  83. continue
  84. }
  85. config, err := s.client.GetClusterConfig(node)
  86. if err != nil {
  87. continue
  88. }
  89. s.nodes = make(map[string]bool)
  90. for _, machine := range machines {
  91. s.nodes[machine.PeerURL] = true
  92. if machine.State == stateLeader {
  93. id, err := strconv.ParseInt(machine.Name, 0, 64)
  94. if err != nil {
  95. return err
  96. }
  97. s.leader = id
  98. s.leaderAddr = machine.PeerURL
  99. }
  100. }
  101. s.clusterConf = config
  102. return nil
  103. }
  104. return fmt.Errorf("unreachable cluster")
  105. }
  106. func (s *standby) joinByAddr(addr string) error {
  107. info := &context{
  108. MinVersion: store.MinVersion(),
  109. MaxVersion: store.MaxVersion(),
  110. ClientURL: s.pubAddr,
  111. PeerURL: s.raftPubAddr,
  112. }
  113. if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil {
  114. return err
  115. }
  116. return nil
  117. }