raft_server.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "io/ioutil"
  21. "net/http"
  22. "net/url"
  23. "time"
  24. etcdErr "github.com/coreos/etcd/error"
  25. "github.com/coreos/go-raft"
  26. )
  27. type raftServer struct {
  28. *raft.Server
  29. version string
  30. joinIndex uint64
  31. name string
  32. url string
  33. listenHost string
  34. tlsConf *TLSConfig
  35. tlsInfo *TLSInfo
  36. followersStats *raftFollowersStats
  37. serverStats *raftServerStats
  38. }
  39. var r *raftServer
  40. func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
  41. // Create transporter for raft
  42. raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
  43. // Create raft server
  44. server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil, "")
  45. check(err)
  46. return &raftServer{
  47. Server: server,
  48. version: raftVersion,
  49. name: name,
  50. url: url,
  51. listenHost: listenHost,
  52. tlsConf: tlsConf,
  53. tlsInfo: tlsInfo,
  54. followersStats: &raftFollowersStats{
  55. Leader: name,
  56. Followers: make(map[string]*raftFollowerStats),
  57. },
  58. serverStats: &raftServerStats{
  59. StartTime: time.Now(),
  60. sendRateQueue: &statsQueue{
  61. back: -1,
  62. },
  63. recvRateQueue: &statsQueue{
  64. back: -1,
  65. },
  66. },
  67. }
  68. }
  69. // Start the raft server
  70. func (r *raftServer) ListenAndServe() {
  71. // Setup commands.
  72. registerCommands()
  73. // LoadSnapshot
  74. if snapshot {
  75. err := r.LoadSnapshot()
  76. if err == nil {
  77. debugf("%s finished load snapshot", r.name)
  78. } else {
  79. debug(err)
  80. }
  81. }
  82. r.SetElectionTimeout(ElectionTimeout)
  83. r.SetHeartbeatTimeout(HeartbeatTimeout)
  84. r.Start()
  85. if r.IsLogEmpty() {
  86. // start as a leader in a new cluster
  87. if len(cluster) == 0 {
  88. startAsLeader()
  89. } else {
  90. startAsFollower()
  91. }
  92. } else {
  93. // rejoin the previous cluster
  94. cluster = getMachines(nameToRaftURL)
  95. for i := 0; i < len(cluster); i++ {
  96. u, err := url.Parse(cluster[i])
  97. if err != nil {
  98. debug("rejoin cannot parse url: ", err)
  99. }
  100. cluster[i] = u.Host
  101. }
  102. ok := joinCluster(cluster)
  103. if !ok {
  104. warn("the entire cluster is down! this machine will restart the cluster.")
  105. }
  106. debugf("%s restart as a follower", r.name)
  107. }
  108. // open the snapshot
  109. if snapshot {
  110. go monitorSnapshot()
  111. }
  112. // start to response to raft requests
  113. go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server)
  114. }
  115. func startAsLeader() {
  116. // leader need to join self as a peer
  117. for {
  118. _, err := r.Do(newJoinCommand())
  119. if err == nil {
  120. break
  121. }
  122. }
  123. debugf("%s start as a leader", r.name)
  124. }
  125. func startAsFollower() {
  126. // start as a follower in a existing cluster
  127. for i := 0; i < retryTimes; i++ {
  128. ok := joinCluster(cluster)
  129. if ok {
  130. return
  131. }
  132. warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
  133. time.Sleep(time.Second * RetryInterval)
  134. }
  135. fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
  136. }
  137. // Start to listen and response raft command
  138. func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
  139. infof("raft server [name %s, listen on %s, advertised url %s]", r.name, r.listenHost, r.url)
  140. raftMux := http.NewServeMux()
  141. server := &http.Server{
  142. Handler: raftMux,
  143. TLSConfig: &tlsConf,
  144. Addr: r.listenHost,
  145. }
  146. // internal commands
  147. raftMux.HandleFunc("/name", NameHttpHandler)
  148. raftMux.HandleFunc("/version", RaftVersionHttpHandler)
  149. raftMux.Handle("/join", errorHandler(JoinHttpHandler))
  150. raftMux.HandleFunc("/remove/", RemoveHttpHandler)
  151. raftMux.HandleFunc("/vote", VoteHttpHandler)
  152. raftMux.HandleFunc("/log", GetLogHttpHandler)
  153. raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
  154. raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
  155. raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
  156. raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
  157. if scheme == "http" {
  158. fatal(server.ListenAndServe())
  159. } else {
  160. fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile))
  161. }
  162. }
  163. // getVersion fetches the raft version of a peer. This works for now but we
  164. // will need to do something more sophisticated later when we allow mixed
  165. // version clusters.
  166. func getVersion(t *transporter, versionURL url.URL) (string, error) {
  167. resp, req, err := t.Get(versionURL.String())
  168. if err != nil {
  169. return "", err
  170. }
  171. defer resp.Body.Close()
  172. t.CancelWhenTimeout(req)
  173. body, err := ioutil.ReadAll(resp.Body)
  174. return string(body), nil
  175. }
  176. func joinCluster(cluster []string) bool {
  177. for _, machine := range cluster {
  178. if len(machine) == 0 {
  179. continue
  180. }
  181. err := joinByMachine(r.Server, machine, r.tlsConf.Scheme)
  182. if err == nil {
  183. debugf("%s success join to the cluster via machine %s", r.name, machine)
  184. return true
  185. } else {
  186. if _, ok := err.(etcdErr.Error); ok {
  187. fatal(err)
  188. }
  189. debugf("cannot join to cluster via machine %s %s", machine, err)
  190. }
  191. }
  192. return false
  193. }
  194. // Send join requests to machine.
  195. func joinByMachine(s *raft.Server, machine string, scheme string) error {
  196. var b bytes.Buffer
  197. // t must be ok
  198. t, _ := r.Transporter().(*transporter)
  199. // Our version must match the leaders version
  200. versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
  201. version, err := getVersion(t, versionURL)
  202. if err != nil {
  203. return fmt.Errorf("Unable to join: %v", err)
  204. }
  205. // TODO: versioning of the internal protocol. See:
  206. // Documentation/internatl-protocol-versioning.md
  207. if version != r.version {
  208. return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
  209. }
  210. json.NewEncoder(&b).Encode(newJoinCommand())
  211. joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
  212. debugf("Send Join Request to %s", joinURL.String())
  213. resp, req, err := t.Post(joinURL.String(), &b)
  214. for {
  215. if err != nil {
  216. return fmt.Errorf("Unable to join: %v", err)
  217. }
  218. if resp != nil {
  219. defer resp.Body.Close()
  220. t.CancelWhenTimeout(req)
  221. if resp.StatusCode == http.StatusOK {
  222. b, _ := ioutil.ReadAll(resp.Body)
  223. r.joinIndex, _ = binary.Uvarint(b)
  224. return nil
  225. }
  226. if resp.StatusCode == http.StatusTemporaryRedirect {
  227. address := resp.Header.Get("Location")
  228. debugf("Send Join Request to %s", address)
  229. json.NewEncoder(&b).Encode(newJoinCommand())
  230. resp, req, err = t.Post(address, &b)
  231. } else if resp.StatusCode == http.StatusBadRequest {
  232. debug("Reach max number machines in the cluster")
  233. decoder := json.NewDecoder(resp.Body)
  234. err := &etcdErr.Error{}
  235. decoder.Decode(err)
  236. return *err
  237. } else {
  238. return fmt.Errorf("Unable to join")
  239. }
  240. }
  241. }
  242. return fmt.Errorf("Unable to join: %v", err)
  243. }
  244. func (r *raftServer) Stats() []byte {
  245. r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String()
  246. queue := r.serverStats.sendRateQueue
  247. r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate()
  248. queue = r.serverStats.recvRateQueue
  249. r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
  250. b, _ := json.Marshal(r.serverStats)
  251. return b
  252. }
  253. func (r *raftServer) PeerStats() []byte {
  254. if r.State() == raft.Leader {
  255. b, _ := json.Marshal(r.followersStats)
  256. return b
  257. }
  258. return nil
  259. }
  260. // Register commands to raft server
  261. func registerCommands() {
  262. raft.RegisterCommand(&JoinCommand{})
  263. raft.RegisterCommand(&RemoveCommand{})
  264. raft.RegisterCommand(&SetCommand{})
  265. raft.RegisterCommand(&GetCommand{})
  266. raft.RegisterCommand(&DeleteCommand{})
  267. raft.RegisterCommand(&WatchCommand{})
  268. raft.RegisterCommand(&TestAndSetCommand{})
  269. }