peer_server.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/binary"
  6. "encoding/json"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "time"
  12. etcdErr "github.com/coreos/etcd/error"
  13. "github.com/coreos/etcd/log"
  14. "github.com/coreos/etcd/store"
  15. "github.com/coreos/go-raft"
  16. )
  17. type PeerServer struct {
  18. *raft.Server
  19. server *Server
  20. joinIndex uint64
  21. name string
  22. url string
  23. listenHost string
  24. tlsConf *TLSConfig
  25. tlsInfo *TLSInfo
  26. followersStats *raftFollowersStats
  27. serverStats *raftServerStats
  28. registry *Registry
  29. store *store.Store
  30. snapConf *snapshotConf
  31. MaxClusterSize int
  32. RetryTimes int
  33. }
  34. // TODO: find a good policy to do snapshot
  35. type snapshotConf struct {
  36. // Etcd will check if snapshot is need every checkingInterval
  37. checkingInterval time.Duration
  38. // The number of writes when the last snapshot happened
  39. lastWrites uint64
  40. // If the incremental number of writes since the last snapshot
  41. // exceeds the write Threshold, etcd will do a snapshot
  42. writesThr uint64
  43. }
  44. func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer {
  45. s := &PeerServer{
  46. name: name,
  47. url: url,
  48. listenHost: listenHost,
  49. tlsConf: tlsConf,
  50. tlsInfo: tlsInfo,
  51. registry: registry,
  52. store: store,
  53. snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000},
  54. followersStats: &raftFollowersStats{
  55. Leader: name,
  56. Followers: make(map[string]*raftFollowerStats),
  57. },
  58. serverStats: &raftServerStats{
  59. StartTime: time.Now(),
  60. sendRateQueue: &statsQueue{
  61. back: -1,
  62. },
  63. recvRateQueue: &statsQueue{
  64. back: -1,
  65. },
  66. },
  67. }
  68. // Create transporter for raft
  69. raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
  70. // Create raft server
  71. server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
  72. if err != nil {
  73. log.Fatal(err)
  74. }
  75. s.Server = server
  76. return s
  77. }
  78. // Start the raft server
  79. func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
  80. // LoadSnapshot
  81. if snapshot {
  82. err := s.LoadSnapshot()
  83. if err == nil {
  84. log.Debugf("%s finished load snapshot", s.name)
  85. } else {
  86. log.Debug(err)
  87. }
  88. }
  89. s.SetElectionTimeout(ElectionTimeout)
  90. s.SetHeartbeatTimeout(HeartbeatTimeout)
  91. s.Start()
  92. if s.IsLogEmpty() {
  93. // start as a leader in a new cluster
  94. if len(cluster) == 0 {
  95. s.startAsLeader()
  96. } else {
  97. s.startAsFollower(cluster)
  98. }
  99. } else {
  100. // Rejoin the previous cluster
  101. cluster = s.registry.PeerURLs(s.Leader(), s.name)
  102. for i := 0; i < len(cluster); i++ {
  103. u, err := url.Parse(cluster[i])
  104. if err != nil {
  105. log.Debug("rejoin cannot parse url: ", err)
  106. }
  107. cluster[i] = u.Host
  108. }
  109. ok := s.joinCluster(cluster)
  110. if !ok {
  111. log.Warn("the entire cluster is down! this machine will restart the cluster.")
  112. }
  113. log.Debugf("%s restart as a follower", s.name)
  114. }
  115. // open the snapshot
  116. if snapshot {
  117. go s.monitorSnapshot()
  118. }
  119. // start to response to raft requests
  120. go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
  121. }
  122. // Retrieves the underlying Raft server.
  123. func (s *PeerServer) RaftServer() *raft.Server {
  124. return s.Server
  125. }
  126. // Associates the client server with the peer server.
  127. func (s *PeerServer) SetServer(server *Server) {
  128. s.server = server
  129. }
  130. func (s *PeerServer) startAsLeader() {
  131. // leader need to join self as a peer
  132. for {
  133. _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
  134. if err == nil {
  135. break
  136. }
  137. }
  138. log.Debugf("%s start as a leader", s.name)
  139. }
  140. func (s *PeerServer) startAsFollower(cluster []string) {
  141. // start as a follower in a existing cluster
  142. for i := 0; i < s.RetryTimes; i++ {
  143. ok := s.joinCluster(cluster)
  144. if ok {
  145. return
  146. }
  147. log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
  148. time.Sleep(time.Second * RetryInterval)
  149. }
  150. log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
  151. }
  152. // Start to listen and response raft command
  153. func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
  154. log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
  155. raftMux := http.NewServeMux()
  156. server := &http.Server{
  157. Handler: raftMux,
  158. TLSConfig: &tlsConf,
  159. Addr: s.listenHost,
  160. }
  161. // internal commands
  162. raftMux.HandleFunc("/name", s.NameHttpHandler)
  163. raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
  164. raftMux.HandleFunc("/join", s.JoinHttpHandler)
  165. raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
  166. raftMux.HandleFunc("/vote", s.VoteHttpHandler)
  167. raftMux.HandleFunc("/log", s.GetLogHttpHandler)
  168. raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
  169. raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
  170. raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
  171. raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
  172. if scheme == "http" {
  173. log.Fatal(server.ListenAndServe())
  174. } else {
  175. log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
  176. }
  177. }
  178. // getVersion fetches the raft version of a peer. This works for now but we
  179. // will need to do something more sophisticated later when we allow mixed
  180. // version clusters.
  181. func getVersion(t *transporter, versionURL url.URL) (string, error) {
  182. resp, req, err := t.Get(versionURL.String())
  183. if err != nil {
  184. return "", err
  185. }
  186. defer resp.Body.Close()
  187. t.CancelWhenTimeout(req)
  188. body, err := ioutil.ReadAll(resp.Body)
  189. return string(body), nil
  190. }
  191. func (s *PeerServer) joinCluster(cluster []string) bool {
  192. for _, machine := range cluster {
  193. if len(machine) == 0 {
  194. continue
  195. }
  196. err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
  197. if err == nil {
  198. log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
  199. return true
  200. } else {
  201. if _, ok := err.(etcdErr.Error); ok {
  202. log.Fatal(err)
  203. }
  204. log.Debugf("cannot join to cluster via machine %s %s", machine, err)
  205. }
  206. }
  207. return false
  208. }
  209. // Send join requests to machine.
  210. func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
  211. var b bytes.Buffer
  212. // t must be ok
  213. t, _ := server.Transporter().(*transporter)
  214. // Our version must match the leaders version
  215. versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
  216. version, err := getVersion(t, versionURL)
  217. if err != nil {
  218. return fmt.Errorf("Error during join version check: %v", err)
  219. }
  220. // TODO: versioning of the internal protocol. See:
  221. // Documentation/internatl-protocol-versioning.md
  222. if version != PeerVersion {
  223. return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
  224. }
  225. json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
  226. joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
  227. log.Debugf("Send Join Request to %s", joinURL.String())
  228. resp, req, err := t.Post(joinURL.String(), &b)
  229. for {
  230. if err != nil {
  231. return fmt.Errorf("Unable to join: %v", err)
  232. }
  233. if resp != nil {
  234. defer resp.Body.Close()
  235. t.CancelWhenTimeout(req)
  236. if resp.StatusCode == http.StatusOK {
  237. b, _ := ioutil.ReadAll(resp.Body)
  238. s.joinIndex, _ = binary.Uvarint(b)
  239. return nil
  240. }
  241. if resp.StatusCode == http.StatusTemporaryRedirect {
  242. address := resp.Header.Get("Location")
  243. log.Debugf("Send Join Request to %s", address)
  244. json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
  245. resp, req, err = t.Post(address, &b)
  246. } else if resp.StatusCode == http.StatusBadRequest {
  247. log.Debug("Reach max number machines in the cluster")
  248. decoder := json.NewDecoder(resp.Body)
  249. err := &etcdErr.Error{}
  250. decoder.Decode(err)
  251. return *err
  252. } else {
  253. return fmt.Errorf("Unable to join")
  254. }
  255. }
  256. }
  257. }
  258. func (s *PeerServer) Stats() []byte {
  259. s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
  260. queue := s.serverStats.sendRateQueue
  261. s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
  262. queue = s.serverStats.recvRateQueue
  263. s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
  264. b, _ := json.Marshal(s.serverStats)
  265. return b
  266. }
  267. func (s *PeerServer) PeerStats() []byte {
  268. if s.State() == raft.Leader {
  269. b, _ := json.Marshal(s.followersStats)
  270. return b
  271. }
  272. return nil
  273. }
  274. func (s *PeerServer) monitorSnapshot() {
  275. for {
  276. time.Sleep(s.snapConf.checkingInterval)
  277. currentWrites := 0
  278. if uint64(currentWrites) > s.snapConf.writesThr {
  279. s.TakeSnapshot()
  280. s.snapConf.lastWrites = 0
  281. }
  282. }
  283. }
  284. func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
  285. if s.State() == raft.Leader {
  286. if response, err := s.Do(c); err != nil {
  287. return err
  288. } else {
  289. if response == nil {
  290. return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
  291. }
  292. event, ok := response.(*store.Event)
  293. if ok {
  294. bytes, err := json.Marshal(event)
  295. if err != nil {
  296. fmt.Println(err)
  297. }
  298. w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
  299. w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
  300. w.WriteHeader(http.StatusOK)
  301. w.Write(bytes)
  302. return nil
  303. }
  304. bytes, _ := response.([]byte)
  305. w.WriteHeader(http.StatusOK)
  306. w.Write(bytes)
  307. return nil
  308. }
  309. } else {
  310. leader := s.Leader()
  311. // current no leader
  312. if leader == "" {
  313. return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
  314. }
  315. url, _ := s.registry.PeerURL(leader)
  316. log.Debugf("Not leader; Current leader: %s; redirect: %s", leader, url)
  317. redirect(url, w, req)
  318. return nil
  319. }
  320. }