peer_server_handlers.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  8. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. uhttp "github.com/coreos/etcd/pkg/http"
  12. "github.com/coreos/etcd/store"
  13. )
  14. // Get all the current logs
  15. func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  16. log.Debugf("[recv] GET %s/log", ps.Config.URL)
  17. w.Header().Set("Content-Type", "application/json")
  18. w.WriteHeader(http.StatusOK)
  19. json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
  20. }
  21. // Response to vote request
  22. func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  23. rvreq := &raft.RequestVoteRequest{}
  24. if _, err := rvreq.Decode(req.Body); err != nil {
  25. http.Error(w, "", http.StatusBadRequest)
  26. log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err)
  27. return
  28. }
  29. log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName)
  30. resp := ps.raftServer.RequestVote(rvreq)
  31. if resp == nil {
  32. log.Warn("[vote] Error: nil response")
  33. http.Error(w, "", http.StatusInternalServerError)
  34. return
  35. }
  36. if _, err := resp.Encode(w); err != nil {
  37. log.Warn("[vote] Error: %v", err)
  38. http.Error(w, "", http.StatusInternalServerError)
  39. return
  40. }
  41. }
  42. // Response to append entries request
  43. func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  44. start := time.Now()
  45. aereq := &raft.AppendEntriesRequest{}
  46. if _, err := aereq.Decode(req.Body); err != nil {
  47. http.Error(w, "", http.StatusBadRequest)
  48. log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err)
  49. return
  50. }
  51. log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries))
  52. ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  53. resp := ps.raftServer.AppendEntries(aereq)
  54. if resp == nil {
  55. log.Warn("[ae] Error: nil response")
  56. http.Error(w, "", http.StatusInternalServerError)
  57. return
  58. }
  59. if !resp.Success() {
  60. log.Debugf("[Append Entry] Step back")
  61. }
  62. if _, err := resp.Encode(w); err != nil {
  63. log.Warn("[ae] Error: %v", err)
  64. http.Error(w, "", http.StatusInternalServerError)
  65. return
  66. }
  67. (*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start)
  68. }
  69. // Response to recover from snapshot request
  70. func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  71. ssreq := &raft.SnapshotRequest{}
  72. if _, err := ssreq.Decode(req.Body); err != nil {
  73. http.Error(w, "", http.StatusBadRequest)
  74. log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err)
  75. return
  76. }
  77. log.Debugf("[recv] POST %s/snapshot", ps.Config.URL)
  78. resp := ps.raftServer.RequestSnapshot(ssreq)
  79. if resp == nil {
  80. log.Warn("[ss] Error: nil response")
  81. http.Error(w, "", http.StatusInternalServerError)
  82. return
  83. }
  84. if _, err := resp.Encode(w); err != nil {
  85. log.Warn("[ss] Error: %v", err)
  86. http.Error(w, "", http.StatusInternalServerError)
  87. return
  88. }
  89. }
  90. // Response to recover from snapshot request
  91. func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  92. ssrreq := &raft.SnapshotRecoveryRequest{}
  93. if _, err := ssrreq.Decode(req.Body); err != nil {
  94. http.Error(w, "", http.StatusBadRequest)
  95. log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err)
  96. return
  97. }
  98. log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL)
  99. resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
  100. if resp == nil {
  101. log.Warn("[ssr] Error: nil response")
  102. http.Error(w, "", http.StatusInternalServerError)
  103. return
  104. }
  105. if _, err := resp.Encode(w); err != nil {
  106. log.Warn("[ssr] Error: %v", err)
  107. http.Error(w, "", http.StatusInternalServerError)
  108. return
  109. }
  110. }
  111. // Get the port that listening for etcd connecting of the server
  112. func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  113. log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL)
  114. w.WriteHeader(http.StatusOK)
  115. w.Write([]byte(ps.server.URL()))
  116. }
  117. // Response to the join request
  118. func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  119. command := &JoinCommand{}
  120. if err := uhttp.DecodeJsonRequest(req, command); err != nil {
  121. w.WriteHeader(http.StatusInternalServerError)
  122. return
  123. }
  124. log.Debugf("Receive Join Request from %s", command.Name)
  125. err := ps.server.Dispatch(command, w, req)
  126. // Return status.
  127. if err != nil {
  128. if etcdErr, ok := err.(*etcdErr.Error); ok {
  129. log.Debug("Return error: ", (*etcdErr).Error())
  130. etcdErr.Write(w)
  131. } else {
  132. http.Error(w, err.Error(), http.StatusInternalServerError)
  133. }
  134. }
  135. }
  136. // Response to remove request
  137. func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  138. if req.Method != "DELETE" {
  139. w.WriteHeader(http.StatusMethodNotAllowed)
  140. return
  141. }
  142. vars := mux.Vars(req)
  143. command := &RemoveCommand{
  144. Name: vars["name"],
  145. }
  146. log.Debugf("[recv] Remove Request [%s]", command.Name)
  147. ps.server.Dispatch(command, w, req)
  148. }
  149. // Returns a JSON-encoded cluster configuration.
  150. func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  151. json.NewEncoder(w).Encode(ps.ClusterConfig())
  152. }
  153. // Updates the cluster configuration.
  154. func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  155. // Decode map.
  156. m := make(map[string]interface{})
  157. if err := json.NewDecoder(req.Body).Decode(&m); err != nil {
  158. http.Error(w, err.Error(), http.StatusInternalServerError)
  159. return
  160. }
  161. // Copy config and update fields passed in.
  162. config := ps.ClusterConfig()
  163. if activeSize, ok := m["activeSize"].(float64); ok {
  164. config.ActiveSize = int(activeSize)
  165. }
  166. if removeDelay, ok := m["removeDelay"].(float64); ok {
  167. config.RemoveDelay = removeDelay
  168. }
  169. if syncInterval, ok := m["syncInterval"].(float64); ok {
  170. config.SyncInterval = syncInterval
  171. }
  172. // Issue command to update.
  173. c := &SetClusterConfigCommand{Config: config}
  174. log.Debugf("[recv] Update Cluster Config Request")
  175. ps.server.Dispatch(c, w, req)
  176. json.NewEncoder(w).Encode(ps.ClusterConfig())
  177. }
  178. // Retrieves a list of peers and standbys.
  179. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  180. machines := make([]*machineMessage, 0)
  181. leader := ps.raftServer.Leader()
  182. for _, name := range ps.registry.Names() {
  183. if msg := ps.getMachineMessage(name, leader); msg != nil {
  184. machines = append(machines, msg)
  185. }
  186. }
  187. json.NewEncoder(w).Encode(&machines)
  188. }
  189. // Retrieve single peer or standby.
  190. func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
  191. vars := mux.Vars(req)
  192. m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader())
  193. json.NewEncoder(w).Encode(m)
  194. }
  195. func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage {
  196. if !ps.registry.Exists(name) {
  197. return nil
  198. }
  199. clientURL, _ := ps.registry.ClientURL(name)
  200. peerURL, _ := ps.registry.PeerURL(name)
  201. msg := &machineMessage{
  202. Name: name,
  203. State: raft.Follower,
  204. ClientURL: clientURL,
  205. PeerURL: peerURL,
  206. }
  207. if name == leader {
  208. msg.State = raft.Leader
  209. }
  210. return msg
  211. }
  212. // Response to the name request
  213. func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  214. log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
  215. w.WriteHeader(http.StatusOK)
  216. w.Write([]byte(ps.Config.Name))
  217. }
  218. // Response to the name request
  219. func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  220. log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
  221. w.WriteHeader(http.StatusOK)
  222. w.Write([]byte(strconv.Itoa(ps.store.Version())))
  223. }
  224. // Checks whether a given version is supported.
  225. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
  226. log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path)
  227. vars := mux.Vars(req)
  228. version, _ := strconv.Atoi(vars["version"])
  229. if version >= store.MinVersion() && version <= store.MaxVersion() {
  230. w.WriteHeader(http.StatusOK)
  231. } else {
  232. w.WriteHeader(http.StatusForbidden)
  233. }
  234. }
  235. // Upgrades the current store version to the next version.
  236. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
  237. log.Debugf("[recv] Get %s/version", ps.Config.URL)
  238. // Check if upgrade is possible for all nodes.
  239. if err := ps.Upgradable(); err != nil {
  240. http.Error(w, err.Error(), http.StatusInternalServerError)
  241. return
  242. }
  243. // Create an upgrade command from the current version.
  244. c := ps.store.CommandFactory().CreateUpgradeCommand()
  245. if err := ps.server.Dispatch(c, w, req); err != nil {
  246. http.Error(w, err.Error(), http.StatusInternalServerError)
  247. return
  248. }
  249. w.WriteHeader(http.StatusOK)
  250. }
  251. // machineMessage represents information about a peer or standby in the registry.
  252. type machineMessage struct {
  253. Name string `json:"name"`
  254. State string `json:"state"`
  255. ClientURL string `json:"clientURL"`
  256. PeerURL string `json:"peerURL"`
  257. }