peer_server_handlers.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  8. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. uhttp "github.com/coreos/etcd/pkg/http"
  12. "github.com/coreos/etcd/store"
  13. )
  14. // Get all the current logs
  15. func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  16. log.Debugf("[recv] GET %s/log", ps.Config.URL)
  17. w.Header().Set("Content-Type", "application/json")
  18. w.WriteHeader(http.StatusOK)
  19. json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
  20. }
  21. // Response to vote request
  22. func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  23. rvreq := &raft.RequestVoteRequest{}
  24. if _, err := rvreq.Decode(req.Body); err != nil {
  25. http.Error(w, "", http.StatusBadRequest)
  26. log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err)
  27. return
  28. }
  29. log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName)
  30. resp := ps.raftServer.RequestVote(rvreq)
  31. if resp == nil {
  32. log.Warn("[vote] Error: nil response")
  33. http.Error(w, "", http.StatusInternalServerError)
  34. return
  35. }
  36. if _, err := resp.Encode(w); err != nil {
  37. log.Warn("[vote] Error: %v", err)
  38. http.Error(w, "", http.StatusInternalServerError)
  39. return
  40. }
  41. }
  42. // Response to append entries request
  43. func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  44. start := time.Now()
  45. aereq := &raft.AppendEntriesRequest{}
  46. if _, err := aereq.Decode(req.Body); err != nil {
  47. http.Error(w, "", http.StatusBadRequest)
  48. log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err)
  49. return
  50. }
  51. log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries))
  52. ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  53. resp := ps.raftServer.AppendEntries(aereq)
  54. if resp == nil {
  55. log.Warn("[ae] Error: nil response")
  56. http.Error(w, "", http.StatusInternalServerError)
  57. return
  58. }
  59. if !resp.Success() {
  60. log.Debugf("[Append Entry] Step back")
  61. }
  62. if _, err := resp.Encode(w); err != nil {
  63. log.Warn("[ae] Error: %v", err)
  64. http.Error(w, "", http.StatusInternalServerError)
  65. return
  66. }
  67. (*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start)
  68. }
  69. // Response to recover from snapshot request
  70. func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  71. ssreq := &raft.SnapshotRequest{}
  72. if _, err := ssreq.Decode(req.Body); err != nil {
  73. http.Error(w, "", http.StatusBadRequest)
  74. log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err)
  75. return
  76. }
  77. log.Debugf("[recv] POST %s/snapshot", ps.Config.URL)
  78. resp := ps.raftServer.RequestSnapshot(ssreq)
  79. if resp == nil {
  80. log.Warn("[ss] Error: nil response")
  81. http.Error(w, "", http.StatusInternalServerError)
  82. return
  83. }
  84. if _, err := resp.Encode(w); err != nil {
  85. log.Warn("[ss] Error: %v", err)
  86. http.Error(w, "", http.StatusInternalServerError)
  87. return
  88. }
  89. }
  90. // Response to recover from snapshot request
  91. func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  92. ssrreq := &raft.SnapshotRecoveryRequest{}
  93. if _, err := ssrreq.Decode(req.Body); err != nil {
  94. http.Error(w, "", http.StatusBadRequest)
  95. log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err)
  96. return
  97. }
  98. log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL)
  99. resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
  100. if resp == nil {
  101. log.Warn("[ssr] Error: nil response")
  102. http.Error(w, "", http.StatusInternalServerError)
  103. return
  104. }
  105. if _, err := resp.Encode(w); err != nil {
  106. log.Warn("[ssr] Error: %v", err)
  107. http.Error(w, "", http.StatusInternalServerError)
  108. return
  109. }
  110. }
  111. // Get the port that listening for etcd connecting of the server
  112. func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  113. log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL)
  114. w.WriteHeader(http.StatusOK)
  115. w.Write([]byte(ps.server.URL()))
  116. }
  117. // Response to the join request
  118. func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  119. command := &JoinCommand{}
  120. if err := uhttp.DecodeJsonRequest(req, command); err != nil {
  121. w.WriteHeader(http.StatusInternalServerError)
  122. return
  123. }
  124. log.Debugf("Receive Join Request from %s", command.Name)
  125. err := ps.server.Dispatch(command, w, req)
  126. // Return status.
  127. if err != nil {
  128. if etcdErr, ok := err.(*etcdErr.Error); ok {
  129. log.Debug("Return error: ", (*etcdErr).Error())
  130. etcdErr.Write(w)
  131. } else {
  132. http.Error(w, err.Error(), http.StatusInternalServerError)
  133. }
  134. }
  135. }
  136. // Response to remove request
  137. func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  138. if req.Method != "DELETE" {
  139. w.WriteHeader(http.StatusMethodNotAllowed)
  140. return
  141. }
  142. vars := mux.Vars(req)
  143. command := &RemoveCommand{
  144. Name: vars["name"],
  145. }
  146. log.Debugf("[recv] Remove Request [%s]", command.Name)
  147. ps.server.Dispatch(command, w, req)
  148. }
  149. // Returns a JSON-encoded cluster configuration.
  150. func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  151. w.Header().Set("Content-Type", "application/json")
  152. json.NewEncoder(w).Encode(ps.ClusterConfig())
  153. }
  154. // Updates the cluster configuration.
  155. func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
  156. // Decode map.
  157. m := make(map[string]interface{})
  158. if err := json.NewDecoder(req.Body).Decode(&m); err != nil {
  159. http.Error(w, err.Error(), http.StatusInternalServerError)
  160. return
  161. }
  162. // Copy config and update fields passed in.
  163. config := ps.ClusterConfig()
  164. if activeSize, ok := m["activeSize"].(float64); ok {
  165. config.ActiveSize = int(activeSize)
  166. }
  167. if removeDelay, ok := m["removeDelay"].(float64); ok {
  168. config.RemoveDelay = removeDelay
  169. }
  170. if syncInterval, ok := m["syncInterval"].(float64); ok {
  171. config.SyncInterval = syncInterval
  172. }
  173. // Issue command to update.
  174. c := &SetClusterConfigCommand{Config: config}
  175. log.Debugf("[recv] Update Cluster Config Request")
  176. ps.server.Dispatch(c, w, req)
  177. w.Header().Set("Content-Type", "application/json")
  178. json.NewEncoder(w).Encode(ps.ClusterConfig())
  179. }
  180. // Retrieves a list of peers and standbys.
  181. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  182. machines := make([]*machineMessage, 0)
  183. leader := ps.raftServer.Leader()
  184. for _, name := range ps.registry.Names() {
  185. if msg := ps.getMachineMessage(name, leader); msg != nil {
  186. machines = append(machines, msg)
  187. }
  188. }
  189. w.Header().Set("Content-Type", "application/json")
  190. json.NewEncoder(w).Encode(&machines)
  191. }
  192. // Retrieve single peer or standby.
  193. func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
  194. vars := mux.Vars(req)
  195. m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader())
  196. w.Header().Set("Content-Type", "application/json")
  197. json.NewEncoder(w).Encode(m)
  198. }
  199. func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage {
  200. if !ps.registry.Exists(name) {
  201. return nil
  202. }
  203. clientURL, _ := ps.registry.ClientURL(name)
  204. peerURL, _ := ps.registry.PeerURL(name)
  205. msg := &machineMessage{
  206. Name: name,
  207. State: raft.Follower,
  208. ClientURL: clientURL,
  209. PeerURL: peerURL,
  210. }
  211. if name == leader {
  212. msg.State = raft.Leader
  213. }
  214. return msg
  215. }
  216. // Response to the name request
  217. func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  218. log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
  219. w.WriteHeader(http.StatusOK)
  220. w.Write([]byte(ps.Config.Name))
  221. }
  222. // Response to the name request
  223. func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  224. log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
  225. w.WriteHeader(http.StatusOK)
  226. w.Write([]byte(strconv.Itoa(ps.store.Version())))
  227. }
  228. // Checks whether a given version is supported.
  229. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
  230. log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path)
  231. vars := mux.Vars(req)
  232. version, _ := strconv.Atoi(vars["version"])
  233. if version >= store.MinVersion() && version <= store.MaxVersion() {
  234. w.WriteHeader(http.StatusOK)
  235. } else {
  236. w.WriteHeader(http.StatusForbidden)
  237. }
  238. }
  239. // Upgrades the current store version to the next version.
  240. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
  241. log.Debugf("[recv] Get %s/version", ps.Config.URL)
  242. // Check if upgrade is possible for all nodes.
  243. if err := ps.Upgradable(); err != nil {
  244. http.Error(w, err.Error(), http.StatusInternalServerError)
  245. return
  246. }
  247. // Create an upgrade command from the current version.
  248. c := ps.store.CommandFactory().CreateUpgradeCommand()
  249. if err := ps.server.Dispatch(c, w, req); err != nil {
  250. http.Error(w, err.Error(), http.StatusInternalServerError)
  251. return
  252. }
  253. w.WriteHeader(http.StatusOK)
  254. }
  255. // machineMessage represents information about a peer or standby in the registry.
  256. type machineMessage struct {
  257. Name string `json:"name"`
  258. State string `json:"state"`
  259. ClientURL string `json:"clientURL"`
  260. PeerURL string `json:"peerURL"`
  261. }