peer_server_handlers.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strconv"
  6. etcdErr "github.com/coreos/etcd/error"
  7. "github.com/coreos/etcd/log"
  8. "github.com/coreos/etcd/store"
  9. "github.com/coreos/go-raft"
  10. "github.com/gorilla/mux"
  11. )
  12. // Get all the current logs
  13. func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  14. log.Debugf("[recv] GET %s/log", ps.url)
  15. w.Header().Set("Content-Type", "application/json")
  16. w.WriteHeader(http.StatusOK)
  17. json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
  18. }
  19. // Response to vote request
  20. func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  21. rvreq := &raft.RequestVoteRequest{}
  22. err := decodeJsonRequest(req, rvreq)
  23. if err == nil {
  24. log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName)
  25. if resp := ps.raftServer.RequestVote(rvreq); resp != nil {
  26. w.WriteHeader(http.StatusOK)
  27. json.NewEncoder(w).Encode(resp)
  28. return
  29. }
  30. }
  31. log.Warnf("[vote] ERROR: %v", err)
  32. w.WriteHeader(http.StatusInternalServerError)
  33. }
  34. // Response to append entries request
  35. func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  36. aereq := &raft.AppendEntriesRequest{}
  37. err := decodeJsonRequest(req, aereq)
  38. if err == nil {
  39. log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries))
  40. ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  41. if resp := ps.raftServer.AppendEntries(aereq); resp != nil {
  42. w.WriteHeader(http.StatusOK)
  43. json.NewEncoder(w).Encode(resp)
  44. if !resp.Success {
  45. log.Debugf("[Append Entry] Step back")
  46. }
  47. return
  48. }
  49. }
  50. log.Warnf("[Append Entry] ERROR: %v", err)
  51. w.WriteHeader(http.StatusInternalServerError)
  52. }
  53. // Response to recover from snapshot request
  54. func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  55. aereq := &raft.SnapshotRequest{}
  56. err := decodeJsonRequest(req, aereq)
  57. if err == nil {
  58. log.Debugf("[recv] POST %s/snapshot/ ", ps.url)
  59. if resp := ps.raftServer.RequestSnapshot(aereq); resp != nil {
  60. w.WriteHeader(http.StatusOK)
  61. json.NewEncoder(w).Encode(resp)
  62. return
  63. }
  64. }
  65. log.Warnf("[Snapshot] ERROR: %v", err)
  66. w.WriteHeader(http.StatusInternalServerError)
  67. }
  68. // Response to recover from snapshot request
  69. func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  70. aereq := &raft.SnapshotRecoveryRequest{}
  71. err := decodeJsonRequest(req, aereq)
  72. if err == nil {
  73. log.Debugf("[recv] POST %s/snapshotRecovery/ ", ps.url)
  74. if resp := ps.raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
  75. w.WriteHeader(http.StatusOK)
  76. json.NewEncoder(w).Encode(resp)
  77. return
  78. }
  79. }
  80. log.Warnf("[Snapshot] ERROR: %v", err)
  81. w.WriteHeader(http.StatusInternalServerError)
  82. }
  83. // Get the port that listening for etcd connecting of the server
  84. func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  85. log.Debugf("[recv] Get %s/etcdURL/ ", ps.url)
  86. w.WriteHeader(http.StatusOK)
  87. w.Write([]byte(ps.server.URL()))
  88. }
  89. // Response to the join request
  90. func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  91. command := &JoinCommand{}
  92. // Write CORS header.
  93. if ps.server.OriginAllowed("*") {
  94. w.Header().Add("Access-Control-Allow-Origin", "*")
  95. } else if ps.server.OriginAllowed(req.Header.Get("Origin")) {
  96. w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
  97. }
  98. err := decodeJsonRequest(req, command)
  99. if err != nil {
  100. w.WriteHeader(http.StatusInternalServerError)
  101. return
  102. }
  103. log.Debugf("Receive Join Request from %s", command.Name)
  104. err = ps.server.Dispatch(command, w, req)
  105. // Return status.
  106. if err != nil {
  107. if etcdErr, ok := err.(*etcdErr.Error); ok {
  108. log.Debug("Return error: ", (*etcdErr).Error())
  109. etcdErr.Write(w)
  110. } else {
  111. http.Error(w, err.Error(), http.StatusInternalServerError)
  112. }
  113. }
  114. }
  115. // Response to remove request
  116. func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  117. if req.Method != "DELETE" {
  118. w.WriteHeader(http.StatusMethodNotAllowed)
  119. return
  120. }
  121. vars := mux.Vars(req)
  122. command := &RemoveCommand{
  123. Name: vars["name"],
  124. }
  125. log.Debugf("[recv] Remove Request [%s]", command.Name)
  126. ps.server.Dispatch(command, w, req)
  127. }
  128. // Response to the name request
  129. func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  130. log.Debugf("[recv] Get %s/name/ ", ps.url)
  131. w.WriteHeader(http.StatusOK)
  132. w.Write([]byte(ps.name))
  133. }
  134. // Response to the name request
  135. func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  136. log.Debugf("[recv] Get %s/version/ ", ps.url)
  137. w.WriteHeader(http.StatusOK)
  138. w.Write([]byte(strconv.Itoa(ps.store.Version())))
  139. }
  140. // Checks whether a given version is supported.
  141. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
  142. log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path)
  143. vars := mux.Vars(req)
  144. version, _ := strconv.Atoi(vars["version"])
  145. if version >= store.MinVersion() && version <= store.MaxVersion() {
  146. w.WriteHeader(http.StatusOK)
  147. } else {
  148. w.WriteHeader(http.StatusForbidden)
  149. }
  150. }
  151. // Upgrades the current store version to the next version.
  152. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
  153. log.Debugf("[recv] Get %s/version", ps.url)
  154. // Check if upgrade is possible for all nodes.
  155. if err := ps.Upgradable(); err != nil {
  156. http.Error(w, err.Error(), http.StatusInternalServerError)
  157. return
  158. }
  159. // Create an upgrade command from the current version.
  160. c := ps.store.CommandFactory().CreateUpgradeCommand()
  161. if err := ps.server.Dispatch(c, w, req); err != nil {
  162. http.Error(w, err.Error(), http.StatusInternalServerError)
  163. return
  164. }
  165. w.WriteHeader(http.StatusOK)
  166. }