raft_handlers.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "encoding/json"
  16. "net/http"
  17. "github.com/coreos/go-raft"
  18. )
  19. //-------------------------------------------------------------
  20. // Handlers to handle raft related request via raft server port
  21. //-------------------------------------------------------------
  22. // Get all the current logs
  23. func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  24. debugf("[recv] GET %s/log", r.url)
  25. w.Header().Set("Content-Type", "application/json")
  26. w.WriteHeader(http.StatusOK)
  27. json.NewEncoder(w).Encode(r.LogEntries())
  28. }
  29. // Response to vote request
  30. func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  31. rvreq := &raft.RequestVoteRequest{}
  32. if _, err := rvreq.Decode(req.Body); err != nil {
  33. http.Error(w, "", http.StatusBadRequest)
  34. warnf("[recv] BADREQUEST %s/vote [%v]", r.url, err)
  35. return
  36. }
  37. debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
  38. resp := r.RequestVote(rvreq)
  39. if resp == nil {
  40. warn("[vote] Error: nil response")
  41. http.Error(w, "", http.StatusInternalServerError)
  42. return
  43. }
  44. if _, err := resp.Encode(w); err != nil {
  45. warn("[vote] Error: %v", err)
  46. http.Error(w, "", http.StatusInternalServerError)
  47. return
  48. }
  49. }
  50. // Response to append entries request
  51. func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  52. aereq := &raft.AppendEntriesRequest{}
  53. if _, err := aereq.Decode(req.Body); err != nil {
  54. http.Error(w, "", http.StatusBadRequest)
  55. warnf("[recv] BADREQUEST %s/log/append [%v]", r.url, err)
  56. return
  57. }
  58. debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
  59. r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  60. resp := r.AppendEntries(aereq)
  61. if resp == nil {
  62. warn("[ae] Error: nil response")
  63. http.Error(w, "", http.StatusInternalServerError)
  64. return
  65. }
  66. if !resp.Success {
  67. debugf("[Append Entry] Step back")
  68. }
  69. if _, err := resp.Encode(w); err != nil {
  70. warn("[ae] Error: %v", err)
  71. http.Error(w, "", http.StatusInternalServerError)
  72. return
  73. }
  74. }
  75. // Response to recover from snapshot request
  76. func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  77. ssreq := &raft.SnapshotRequest{}
  78. if _, err := ssreq.Decode(req.Body); err != nil {
  79. http.Error(w, "", http.StatusBadRequest)
  80. warnf("[recv] BADREQUEST %s/snapshot [%v]", r.url, err)
  81. return
  82. }
  83. debugf("[recv] POST %s/snapshot", r.url)
  84. resp := r.RequestSnapshot(ssreq)
  85. if resp == nil {
  86. warn("[ss] Error: nil response")
  87. http.Error(w, "", http.StatusInternalServerError)
  88. return
  89. }
  90. if _, err := resp.Encode(w); err != nil {
  91. warn("[ss] Error: %v", err)
  92. http.Error(w, "", http.StatusInternalServerError)
  93. return
  94. }
  95. }
  96. // Response to recover from snapshot request
  97. func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  98. ssrreq := &raft.SnapshotRecoveryRequest{}
  99. if _, err := ssrreq.Decode(req.Body); err != nil {
  100. http.Error(w, "", http.StatusBadRequest)
  101. warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", r.url, err)
  102. return
  103. }
  104. debugf("[recv] POST %s/snapshotRecovery", r.url)
  105. resp := r.SnapshotRecoveryRequest(ssrreq)
  106. if resp == nil {
  107. warn("[ssr] Error: nil response")
  108. http.Error(w, "", http.StatusInternalServerError)
  109. return
  110. }
  111. if _, err := resp.Encode(w); err != nil {
  112. warn("[ssr] Error: %v", err)
  113. http.Error(w, "", http.StatusInternalServerError)
  114. return
  115. }
  116. }
  117. // Get the port that listening for etcd connecting of the server
  118. func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  119. debugf("[recv] Get %s/etcdURL/ ", r.url)
  120. w.WriteHeader(http.StatusOK)
  121. w.Write([]byte(argInfo.EtcdURL))
  122. }
  123. // Response to the join request
  124. func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
  125. command := &JoinCommand{}
  126. if err := decodeJsonRequest(req, command); err == nil {
  127. debugf("Receive Join Request from %s", command.Name)
  128. return dispatch(command, w, req, false)
  129. } else {
  130. w.WriteHeader(http.StatusInternalServerError)
  131. return nil
  132. }
  133. }
  134. // Response to remove request
  135. func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  136. if req.Method != "DELETE" {
  137. w.WriteHeader(http.StatusMethodNotAllowed)
  138. return
  139. }
  140. nodeName := req.URL.Path[len("/remove/"):]
  141. command := &RemoveCommand{
  142. Name: nodeName,
  143. }
  144. debugf("[recv] Remove Request [%s]", command.Name)
  145. dispatch(command, w, req, false)
  146. }
  147. // Response to the name request
  148. func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  149. debugf("[recv] Get %s/name/ ", r.url)
  150. w.WriteHeader(http.StatusOK)
  151. w.Write([]byte(r.name))
  152. }
  153. // Response to the name request
  154. func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  155. debugf("[recv] Get %s/version/ ", r.url)
  156. w.WriteHeader(http.StatusOK)
  157. w.Write([]byte(r.version))
  158. }