etcd.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package etcd
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "log"
  6. "net/http"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. )
  10. const (
  11. raftPrefix = "/raft"
  12. participantMode int64 = iota
  13. standbyMode
  14. stopMode
  15. )
  16. var (
  17. stopErr = errors.New("stopped")
  18. )
  19. type Server struct {
  20. config *config.Config
  21. id int64
  22. pubAddr string
  23. raftPubAddr string
  24. tickDuration time.Duration
  25. mode atomicInt
  26. nodes map[string]bool
  27. p *participant
  28. s *standby
  29. client *v2client
  30. peerHub *peerHub
  31. modeC chan int64
  32. stopc chan struct{}
  33. }
  34. func New(c *config.Config, id int64) *Server {
  35. if err := c.Sanitize(); err != nil {
  36. log.Fatalf("failed sanitizing configuration: %v", err)
  37. }
  38. tc := &tls.Config{
  39. InsecureSkipVerify: true,
  40. }
  41. var err error
  42. if c.PeerTLSInfo().Scheme() == "https" {
  43. tc, err = c.PeerTLSInfo().ClientConfig()
  44. if err != nil {
  45. log.Fatal("failed to create raft transporter tls:", err)
  46. }
  47. }
  48. tr := new(http.Transport)
  49. tr.TLSClientConfig = tc
  50. client := &http.Client{Transport: tr}
  51. s := &Server{
  52. config: c,
  53. id: id,
  54. pubAddr: c.Addr,
  55. raftPubAddr: c.Peer.Addr,
  56. tickDuration: defaultTickDuration,
  57. mode: atomicInt(stopMode),
  58. nodes: make(map[string]bool),
  59. client: newClient(tc),
  60. peerHub: newPeerHub(c.Peers, client),
  61. modeC: make(chan int64, 10),
  62. stopc: make(chan struct{}),
  63. }
  64. for _, seed := range c.Peers {
  65. s.nodes[seed] = true
  66. }
  67. return s
  68. }
  69. func (s *Server) SetTick(tick time.Duration) {
  70. s.tickDuration = tick
  71. }
  72. // Stop stops the server elegently.
  73. func (s *Server) Stop() {
  74. if s.mode.Get() == stopMode {
  75. return
  76. }
  77. m := s.mode.Get()
  78. s.mode.Set(stopMode)
  79. switch m {
  80. case participantMode:
  81. s.p.stop()
  82. case standbyMode:
  83. s.s.stop()
  84. }
  85. <-s.stopc
  86. }
  87. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  88. switch s.mode.Get() {
  89. case participantMode, standbyMode:
  90. s.p.ServeHTTP(w, r)
  91. default:
  92. http.NotFound(w, r)
  93. }
  94. }
  95. func (s *Server) RaftHandler() http.Handler {
  96. return http.HandlerFunc(s.ServeRaftHTTP)
  97. }
  98. func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
  99. switch s.mode.Get() {
  100. case participantMode:
  101. s.p.raftHandler().ServeHTTP(w, r)
  102. default:
  103. http.NotFound(w, r)
  104. }
  105. }
  106. func (s *Server) Run() {
  107. next := participantMode
  108. for {
  109. s.modeC <- next
  110. switch next {
  111. case participantMode:
  112. s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration)
  113. s.mode.Set(participantMode)
  114. // TODO: it may block here. move modeC later.
  115. next = s.p.run()
  116. case standbyMode:
  117. s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
  118. s.mode.Set(standbyMode)
  119. next = s.s.run()
  120. case stopMode:
  121. s.client.CloseConnections()
  122. s.peerHub.stop()
  123. s.stopc <- struct{}{}
  124. return
  125. default:
  126. panic("unsupport mode")
  127. }
  128. }
  129. }