etcd.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package etcd
  2. import (
  3. "crypto/tls"
  4. "log"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. )
  10. const (
  11. raftPrefix = "/raft"
  12. participantMode int64 = iota
  13. standbyMode
  14. stopMode
  15. )
  16. type Server struct {
  17. config *config.Config
  18. id int64
  19. pubAddr string
  20. raftPubAddr string
  21. tickDuration time.Duration
  22. mode atomicInt
  23. nodes map[string]bool
  24. p *participant
  25. s *standby
  26. client *v2client
  27. peerHub *peerHub
  28. stopped bool
  29. mu sync.Mutex
  30. stopc chan struct{}
  31. }
  32. func New(c *config.Config, id int64) *Server {
  33. if err := c.Sanitize(); err != nil {
  34. log.Fatalf("failed sanitizing configuration: %v", err)
  35. }
  36. tc := &tls.Config{
  37. InsecureSkipVerify: true,
  38. }
  39. var err error
  40. if c.PeerTLSInfo().Scheme() == "https" {
  41. tc, err = c.PeerTLSInfo().ClientConfig()
  42. if err != nil {
  43. log.Fatal("failed to create raft transporter tls:", err)
  44. }
  45. }
  46. tr := new(http.Transport)
  47. tr.TLSClientConfig = tc
  48. client := &http.Client{Transport: tr}
  49. s := &Server{
  50. config: c,
  51. id: id,
  52. pubAddr: c.Addr,
  53. raftPubAddr: c.Peer.Addr,
  54. tickDuration: defaultTickDuration,
  55. mode: atomicInt(stopMode),
  56. nodes: make(map[string]bool),
  57. client: newClient(tc),
  58. peerHub: newPeerHub(c.Peers, client),
  59. stopc: make(chan struct{}),
  60. }
  61. for _, seed := range c.Peers {
  62. s.nodes[seed] = true
  63. }
  64. return s
  65. }
  66. func (s *Server) SetTick(tick time.Duration) {
  67. s.tickDuration = tick
  68. }
  69. // Stop stops the server elegently.
  70. func (s *Server) Stop() {
  71. if s.mode.Get() == stopMode {
  72. return
  73. }
  74. s.mu.Lock()
  75. s.stopped = true
  76. switch s.mode.Get() {
  77. case participantMode:
  78. s.p.stop()
  79. case standbyMode:
  80. s.s.stop()
  81. }
  82. s.mu.Unlock()
  83. <-s.stopc
  84. s.client.CloseConnections()
  85. s.peerHub.stop()
  86. }
  87. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  88. switch s.mode.Get() {
  89. case participantMode, standbyMode:
  90. s.p.ServeHTTP(w, r)
  91. default:
  92. http.NotFound(w, r)
  93. }
  94. }
  95. func (s *Server) RaftHandler() http.Handler {
  96. return http.HandlerFunc(s.ServeRaftHTTP)
  97. }
  98. func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
  99. switch s.mode.Get() {
  100. case participantMode:
  101. s.p.raftHandler().ServeHTTP(w, r)
  102. default:
  103. http.NotFound(w, r)
  104. }
  105. }
  106. func (s *Server) Run() {
  107. next := participantMode
  108. for {
  109. s.mu.Lock()
  110. if s.stopped {
  111. next = stopMode
  112. }
  113. switch next {
  114. case participantMode:
  115. s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration)
  116. s.mode.Set(participantMode)
  117. s.mu.Unlock()
  118. next = s.p.run()
  119. case standbyMode:
  120. s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
  121. s.mode.Set(standbyMode)
  122. s.mu.Unlock()
  123. next = s.s.run()
  124. case stopMode:
  125. s.mode.Set(stopMode)
  126. s.mu.Unlock()
  127. s.stopc <- struct{}{}
  128. return
  129. default:
  130. panic("unsupport mode")
  131. }
  132. }
  133. }