etcd.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package etcd
  2. import (
  3. "crypto/tls"
  4. "log"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. )
  10. const (
  11. participantMode int64 = iota
  12. standbyMode
  13. stopMode
  14. )
  15. type Server struct {
  16. config *config.Config
  17. id int64
  18. pubAddr string
  19. raftPubAddr string
  20. tickDuration time.Duration
  21. mode atomicInt
  22. nodes map[string]bool
  23. p *participant
  24. s *standby
  25. client *v2client
  26. peerHub *peerHub
  27. stopped bool
  28. mu sync.Mutex
  29. stopc chan struct{}
  30. }
  31. func New(c *config.Config, id int64) *Server {
  32. if err := c.Sanitize(); err != nil {
  33. log.Fatalf("failed sanitizing configuration: %v", err)
  34. }
  35. tc := &tls.Config{
  36. InsecureSkipVerify: true,
  37. }
  38. var err error
  39. if c.PeerTLSInfo().Scheme() == "https" {
  40. tc, err = c.PeerTLSInfo().ClientConfig()
  41. if err != nil {
  42. log.Fatal("failed to create raft transporter tls:", err)
  43. }
  44. }
  45. tr := new(http.Transport)
  46. tr.TLSClientConfig = tc
  47. client := &http.Client{Transport: tr}
  48. s := &Server{
  49. config: c,
  50. id: id,
  51. pubAddr: c.Addr,
  52. raftPubAddr: c.Peer.Addr,
  53. tickDuration: defaultTickDuration,
  54. mode: atomicInt(stopMode),
  55. nodes: make(map[string]bool),
  56. client: newClient(tc),
  57. peerHub: newPeerHub(c.Peers, client),
  58. stopc: make(chan struct{}),
  59. }
  60. for _, seed := range c.Peers {
  61. s.nodes[seed] = true
  62. }
  63. return s
  64. }
  65. func (s *Server) SetTick(tick time.Duration) {
  66. s.tickDuration = tick
  67. }
  68. // Stop stops the server elegently.
  69. func (s *Server) Stop() {
  70. if s.mode.Get() == stopMode {
  71. return
  72. }
  73. s.mu.Lock()
  74. s.stopped = true
  75. switch s.mode.Get() {
  76. case participantMode:
  77. s.p.stop()
  78. case standbyMode:
  79. s.s.stop()
  80. }
  81. s.mu.Unlock()
  82. <-s.stopc
  83. s.client.CloseConnections()
  84. s.peerHub.stop()
  85. }
  86. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  87. switch s.mode.Get() {
  88. case participantMode, standbyMode:
  89. s.p.ServeHTTP(w, r)
  90. default:
  91. http.NotFound(w, r)
  92. }
  93. }
  94. func (s *Server) RaftHandler() http.Handler {
  95. return http.HandlerFunc(s.ServeRaftHTTP)
  96. }
  97. func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
  98. switch s.mode.Get() {
  99. case participantMode:
  100. s.p.raftHandler().ServeHTTP(w, r)
  101. default:
  102. http.NotFound(w, r)
  103. }
  104. }
  105. func (s *Server) Run() {
  106. next := participantMode
  107. for {
  108. s.mu.Lock()
  109. if s.stopped {
  110. next = stopMode
  111. }
  112. switch next {
  113. case participantMode:
  114. s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub, s.tickDuration)
  115. s.mode.Set(participantMode)
  116. s.mu.Unlock()
  117. next = s.p.run()
  118. case standbyMode:
  119. s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
  120. s.mode.Set(standbyMode)
  121. s.mu.Unlock()
  122. next = s.s.run()
  123. case stopMode:
  124. s.mode.Set(stopMode)
  125. s.mu.Unlock()
  126. s.stopc <- struct{}{}
  127. return
  128. default:
  129. panic("unsupport mode")
  130. }
  131. }
  132. }