etcd.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package etcd
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "log"
  6. "net/http"
  7. "time"
  8. "github.com/coreos/etcd/config"
  9. )
  10. const (
  11. raftPrefix = "/raft"
  12. participantMode int64 = iota
  13. standbyMode
  14. stopMode
  15. )
  16. var (
  17. stopErr = errors.New("stopped")
  18. )
  19. type Server struct {
  20. config *config.Config
  21. id int64
  22. pubAddr string
  23. raftPubAddr string
  24. tickDuration time.Duration
  25. mode atomicInt
  26. nodes map[string]bool
  27. p *participant
  28. s *standby
  29. client *v2client
  30. peerHub *peerHub
  31. stopc chan struct{}
  32. }
  33. func New(c *config.Config, id int64) *Server {
  34. if err := c.Sanitize(); err != nil {
  35. log.Fatalf("failed sanitizing configuration: %v", err)
  36. }
  37. tc := &tls.Config{
  38. InsecureSkipVerify: true,
  39. }
  40. var err error
  41. if c.PeerTLSInfo().Scheme() == "https" {
  42. tc, err = c.PeerTLSInfo().ClientConfig()
  43. if err != nil {
  44. log.Fatal("failed to create raft transporter tls:", err)
  45. }
  46. }
  47. tr := new(http.Transport)
  48. tr.TLSClientConfig = tc
  49. client := &http.Client{Transport: tr}
  50. s := &Server{
  51. config: c,
  52. id: id,
  53. pubAddr: c.Addr,
  54. raftPubAddr: c.Peer.Addr,
  55. tickDuration: defaultTickDuration,
  56. mode: atomicInt(stopMode),
  57. nodes: make(map[string]bool),
  58. client: newClient(tc),
  59. peerHub: newPeerHub(c.Peers, client),
  60. stopc: make(chan struct{}),
  61. }
  62. for _, seed := range c.Peers {
  63. s.nodes[seed] = true
  64. }
  65. return s
  66. }
  67. func (s *Server) SetTick(tick time.Duration) {
  68. s.tickDuration = tick
  69. }
  70. // Stop stops the server elegently.
  71. func (s *Server) Stop() {
  72. if s.mode.Get() == stopMode {
  73. return
  74. }
  75. m := s.mode.Get()
  76. s.mode.Set(stopMode)
  77. switch m {
  78. case participantMode:
  79. s.p.stop()
  80. case standbyMode:
  81. s.s.stop()
  82. }
  83. <-s.stopc
  84. }
  85. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  86. switch s.mode.Get() {
  87. case participantMode, standbyMode:
  88. s.p.ServeHTTP(w, r)
  89. default:
  90. http.NotFound(w, r)
  91. }
  92. }
  93. func (s *Server) RaftHandler() http.Handler {
  94. return http.HandlerFunc(s.ServeRaftHTTP)
  95. }
  96. func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
  97. switch s.mode.Get() {
  98. case participantMode:
  99. s.p.raftHandler().ServeHTTP(w, r)
  100. default:
  101. http.NotFound(w, r)
  102. }
  103. }
  104. func (s *Server) Run() {
  105. next := participantMode
  106. for {
  107. switch next {
  108. case participantMode:
  109. s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub, s.tickDuration)
  110. s.mode.Set(participantMode)
  111. next = s.p.run()
  112. case standbyMode:
  113. s.s = newStandby(s.id, s.pubAddr, s.raftPubAddr, s.nodes, s.client, s.peerHub)
  114. s.mode.Set(standbyMode)
  115. next = s.s.run()
  116. case stopMode:
  117. s.client.CloseConnections()
  118. s.peerHub.stop()
  119. s.stopc <- struct{}{}
  120. return
  121. default:
  122. panic("unsupport mode")
  123. }
  124. }
  125. }