peer_server_handlers.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. "github.com/coreos/etcd/third_party/github.com/coreos/raft"
  8. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  9. etcdErr "github.com/coreos/etcd/error"
  10. uhttp "github.com/coreos/etcd/pkg/http"
  11. "github.com/coreos/etcd/log"
  12. "github.com/coreos/etcd/store"
  13. )
  14. // Get all the current logs
  15. func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  16. log.Debugf("[recv] GET %s/log", ps.Config.URL)
  17. w.Header().Set("Content-Type", "application/json")
  18. w.WriteHeader(http.StatusOK)
  19. json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
  20. }
  21. // Response to vote request
  22. func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  23. rvreq := &raft.RequestVoteRequest{}
  24. if _, err := rvreq.Decode(req.Body); err != nil {
  25. http.Error(w, "", http.StatusBadRequest)
  26. log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err)
  27. return
  28. }
  29. log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName)
  30. resp := ps.raftServer.RequestVote(rvreq)
  31. if resp == nil {
  32. log.Warn("[vote] Error: nil response")
  33. http.Error(w, "", http.StatusInternalServerError)
  34. return
  35. }
  36. if _, err := resp.Encode(w); err != nil {
  37. log.Warn("[vote] Error: %v", err)
  38. http.Error(w, "", http.StatusInternalServerError)
  39. return
  40. }
  41. }
  42. // Response to append entries request
  43. func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  44. start := time.Now()
  45. aereq := &raft.AppendEntriesRequest{}
  46. if _, err := aereq.Decode(req.Body); err != nil {
  47. http.Error(w, "", http.StatusBadRequest)
  48. log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err)
  49. return
  50. }
  51. log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries))
  52. ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  53. resp := ps.raftServer.AppendEntries(aereq)
  54. if resp == nil {
  55. log.Warn("[ae] Error: nil response")
  56. http.Error(w, "", http.StatusInternalServerError)
  57. return
  58. }
  59. if !resp.Success() {
  60. log.Debugf("[Append Entry] Step back")
  61. }
  62. if _, err := resp.Encode(w); err != nil {
  63. log.Warn("[ae] Error: %v", err)
  64. http.Error(w, "", http.StatusInternalServerError)
  65. return
  66. }
  67. (*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start)
  68. }
  69. // Response to recover from snapshot request
  70. func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  71. ssreq := &raft.SnapshotRequest{}
  72. if _, err := ssreq.Decode(req.Body); err != nil {
  73. http.Error(w, "", http.StatusBadRequest)
  74. log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err)
  75. return
  76. }
  77. log.Debugf("[recv] POST %s/snapshot", ps.Config.URL)
  78. resp := ps.raftServer.RequestSnapshot(ssreq)
  79. if resp == nil {
  80. log.Warn("[ss] Error: nil response")
  81. http.Error(w, "", http.StatusInternalServerError)
  82. return
  83. }
  84. if _, err := resp.Encode(w); err != nil {
  85. log.Warn("[ss] Error: %v", err)
  86. http.Error(w, "", http.StatusInternalServerError)
  87. return
  88. }
  89. }
  90. // Response to recover from snapshot request
  91. func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  92. ssrreq := &raft.SnapshotRecoveryRequest{}
  93. if _, err := ssrreq.Decode(req.Body); err != nil {
  94. http.Error(w, "", http.StatusBadRequest)
  95. log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err)
  96. return
  97. }
  98. log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL)
  99. resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
  100. if resp == nil {
  101. log.Warn("[ssr] Error: nil response")
  102. http.Error(w, "", http.StatusInternalServerError)
  103. return
  104. }
  105. if _, err := resp.Encode(w); err != nil {
  106. log.Warn("[ssr] Error: %v", err)
  107. http.Error(w, "", http.StatusInternalServerError)
  108. return
  109. }
  110. }
  111. // Get the port that listening for etcd connecting of the server
  112. func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  113. log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL)
  114. w.WriteHeader(http.StatusOK)
  115. w.Write([]byte(ps.server.URL()))
  116. }
  117. // Response to the join request
  118. func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  119. command := &JoinCommand{}
  120. err := uhttp.DecodeJsonRequest(req, command)
  121. if err != nil {
  122. w.WriteHeader(http.StatusInternalServerError)
  123. return
  124. }
  125. log.Debugf("Receive Join Request from %s", command.Name)
  126. err = ps.server.Dispatch(command, w, req)
  127. // Return status.
  128. if err != nil {
  129. if etcdErr, ok := err.(*etcdErr.Error); ok {
  130. log.Debug("Return error: ", (*etcdErr).Error())
  131. etcdErr.Write(w)
  132. } else {
  133. http.Error(w, err.Error(), http.StatusInternalServerError)
  134. }
  135. }
  136. }
  137. // Response to remove request
  138. func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  139. if req.Method != "DELETE" {
  140. w.WriteHeader(http.StatusMethodNotAllowed)
  141. return
  142. }
  143. vars := mux.Vars(req)
  144. command := &RemoveCommand{
  145. Name: vars["name"],
  146. }
  147. log.Debugf("[recv] Remove Request [%s]", command.Name)
  148. ps.server.Dispatch(command, w, req)
  149. }
  150. // Returns a JSON-encoded cluster configuration.
  151. func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  152. json.NewEncoder(w).Encode(&ps.clusterConfig)
  153. }
  154. // Updates the cluster configuration.
  155. func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  156. c := &SetClusterConfigCommand{Config:&ClusterConfig{}}
  157. if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil {
  158. http.Error(w, err.Error(), http.StatusInternalServerError)
  159. return
  160. }
  161. log.Debugf("[recv] Update Cluster Config Request")
  162. ps.server.Dispatch(c, w, req)
  163. }
  164. // Response to the name request
  165. func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  166. log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
  167. w.WriteHeader(http.StatusOK)
  168. w.Write([]byte(ps.Config.Name))
  169. }
  170. // Response to the name request
  171. func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  172. log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
  173. w.WriteHeader(http.StatusOK)
  174. w.Write([]byte(strconv.Itoa(ps.store.Version())))
  175. }
  176. // Checks whether a given version is supported.
  177. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
  178. log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path)
  179. vars := mux.Vars(req)
  180. version, _ := strconv.Atoi(vars["version"])
  181. if version >= store.MinVersion() && version <= store.MaxVersion() {
  182. w.WriteHeader(http.StatusOK)
  183. } else {
  184. w.WriteHeader(http.StatusForbidden)
  185. }
  186. }
  187. // Upgrades the current store version to the next version.
  188. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
  189. log.Debugf("[recv] Get %s/version", ps.Config.URL)
  190. // Check if upgrade is possible for all nodes.
  191. if err := ps.Upgradable(); err != nil {
  192. http.Error(w, err.Error(), http.StatusInternalServerError)
  193. return
  194. }
  195. // Create an upgrade command from the current version.
  196. c := ps.store.CommandFactory().CreateUpgradeCommand()
  197. if err := ps.server.Dispatch(c, w, req); err != nil {
  198. http.Error(w, err.Error(), http.StatusInternalServerError)
  199. return
  200. }
  201. w.WriteHeader(http.StatusOK)
  202. }