standby.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package etcd
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. "github.com/coreos/etcd/store"
  10. )
  11. var (
  12. noneId int64 = -1
  13. )
  14. type standby struct {
  15. id int64
  16. pubAddr string
  17. raftPubAddr string
  18. client *v2client
  19. peerHub *peerHub
  20. nodes map[string]bool
  21. leader int64
  22. leaderAddr string
  23. clusterConf *config.ClusterConfig
  24. stopc chan struct{}
  25. *http.ServeMux
  26. }
  27. func newStandby(id int64, pubAddr string, raftPubAddr string, nodes map[string]bool, client *v2client, peerHub *peerHub) *standby {
  28. s := &standby{
  29. id: id,
  30. pubAddr: pubAddr,
  31. raftPubAddr: raftPubAddr,
  32. client: client,
  33. peerHub: peerHub,
  34. nodes: nodes,
  35. leader: noneId,
  36. leaderAddr: "",
  37. clusterConf: config.NewClusterConfig(),
  38. stopc: make(chan struct{}),
  39. ServeMux: http.NewServeMux(),
  40. }
  41. s.Handle("/", handlerErr(s.serveRedirect))
  42. return s
  43. }
  44. func (s *standby) run() int64 {
  45. var syncDuration time.Duration
  46. for {
  47. select {
  48. case <-time.After(syncDuration):
  49. case <-s.stopc:
  50. log.Printf("Standby %d stopped\n", s.id)
  51. return stopMode
  52. }
  53. if err := s.syncCluster(); err != nil {
  54. log.Println("standby sync:", err)
  55. continue
  56. }
  57. syncDuration = time.Duration(s.clusterConf.SyncInterval * float64(time.Second))
  58. if s.clusterConf.ActiveSize <= len(s.nodes) {
  59. continue
  60. }
  61. if err := s.joinByAddr(s.leaderAddr); err != nil {
  62. log.Println("standby join:", err)
  63. continue
  64. }
  65. return participantMode
  66. }
  67. }
  68. func (s *standby) stop() {
  69. close(s.stopc)
  70. }
  71. func (s *standby) serveRedirect(w http.ResponseWriter, r *http.Request) error {
  72. if s.leader == noneId {
  73. return fmt.Errorf("no leader in the cluster")
  74. }
  75. redirectAddr, err := buildRedirectURL(s.leaderAddr, r.URL)
  76. if err != nil {
  77. return err
  78. }
  79. http.Redirect(w, r, redirectAddr, http.StatusTemporaryRedirect)
  80. return nil
  81. }
  82. func (s *standby) syncCluster() error {
  83. for node := range s.nodes {
  84. machines, err := s.client.GetMachines(node)
  85. if err != nil {
  86. continue
  87. }
  88. config, err := s.client.GetClusterConfig(node)
  89. if err != nil {
  90. continue
  91. }
  92. s.nodes = make(map[string]bool)
  93. for _, machine := range machines {
  94. s.nodes[machine.PeerURL] = true
  95. if machine.State == stateLeader {
  96. id, err := strconv.ParseInt(machine.Name, 0, 64)
  97. if err != nil {
  98. return err
  99. }
  100. s.leader = id
  101. s.leaderAddr = machine.PeerURL
  102. }
  103. }
  104. s.clusterConf = config
  105. return nil
  106. }
  107. return fmt.Errorf("unreachable cluster")
  108. }
  109. func (s *standby) joinByAddr(addr string) error {
  110. info := &context{
  111. MinVersion: store.MinVersion(),
  112. MaxVersion: store.MaxVersion(),
  113. ClientURL: s.pubAddr,
  114. PeerURL: s.raftPubAddr,
  115. }
  116. if err := s.client.AddMachine(s.leaderAddr, fmt.Sprint(s.id), info); err != nil {
  117. return err
  118. }
  119. return nil
  120. }