raft_handlers.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "encoding/json"
  16. "net/http"
  17. "github.com/coreos/go-raft"
  18. )
  19. //-------------------------------------------------------------
  20. // Handlers to handle raft related request via raft server port
  21. //-------------------------------------------------------------
  22. // Get all the current logs
  23. func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  24. debugf("[recv] GET %s/log", r.url)
  25. w.Header().Set("Content-Type", "application/json")
  26. w.WriteHeader(http.StatusOK)
  27. json.NewEncoder(w).Encode(r.LogEntries())
  28. }
  29. // Response to vote request
  30. func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  31. rvreq := &raft.RequestVoteRequest{}
  32. err := decodeJsonRequest(req, rvreq)
  33. if err == nil {
  34. debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
  35. if resp := r.RequestVote(rvreq); resp != nil {
  36. w.WriteHeader(http.StatusOK)
  37. json.NewEncoder(w).Encode(resp)
  38. return
  39. }
  40. }
  41. warnf("[vote] ERROR: %v", err)
  42. w.WriteHeader(http.StatusInternalServerError)
  43. }
  44. // Response to append entries request
  45. func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  46. aereq := &raft.AppendEntriesRequest{}
  47. err := decodeJsonRequest(req, aereq)
  48. if err == nil {
  49. debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
  50. r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  51. if resp := r.AppendEntries(aereq); resp != nil {
  52. w.WriteHeader(http.StatusOK)
  53. json.NewEncoder(w).Encode(resp)
  54. if !resp.Success {
  55. debugf("[Append Entry] Step back")
  56. }
  57. return
  58. }
  59. }
  60. warnf("[Append Entry] ERROR: %v", err)
  61. w.WriteHeader(http.StatusInternalServerError)
  62. }
  63. // Response to recover from snapshot request
  64. func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  65. aereq := &raft.SnapshotRequest{}
  66. err := decodeJsonRequest(req, aereq)
  67. if err == nil {
  68. debugf("[recv] POST %s/snapshot/ ", r.url)
  69. if resp := r.RequestSnapshot(aereq); resp != nil {
  70. w.WriteHeader(http.StatusOK)
  71. json.NewEncoder(w).Encode(resp)
  72. return
  73. }
  74. }
  75. warnf("[Snapshot] ERROR: %v", err)
  76. w.WriteHeader(http.StatusInternalServerError)
  77. }
  78. // Response to recover from snapshot request
  79. func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  80. aereq := &raft.SnapshotRecoveryRequest{}
  81. err := decodeJsonRequest(req, aereq)
  82. if err == nil {
  83. debugf("[recv] POST %s/snapshotRecovery/ ", r.url)
  84. if resp := r.SnapshotRecoveryRequest(aereq); resp != nil {
  85. w.WriteHeader(http.StatusOK)
  86. json.NewEncoder(w).Encode(resp)
  87. return
  88. }
  89. }
  90. warnf("[Snapshot] ERROR: %v", err)
  91. w.WriteHeader(http.StatusInternalServerError)
  92. }
  93. // Get the port that listening for etcd connecting of the server
  94. func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  95. debugf("[recv] Get %s/etcdURL/ ", r.url)
  96. w.WriteHeader(http.StatusOK)
  97. w.Write([]byte(argInfo.EtcdURL))
  98. }
  99. // Response to the join request
  100. func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
  101. command := &JoinCommand{}
  102. if err := decodeJsonRequest(req, command); err == nil {
  103. debugf("Receive Join Request from %s", command.Name)
  104. return dispatch(command, w, req, false)
  105. } else {
  106. w.WriteHeader(http.StatusInternalServerError)
  107. return nil
  108. }
  109. }
  110. // Response to remove request
  111. func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  112. if req.Method != "DELETE" {
  113. w.WriteHeader(http.StatusMethodNotAllowed)
  114. return
  115. }
  116. nodeName := req.URL.Path[len("/remove/"):]
  117. command := &RemoveCommand{
  118. Name: nodeName,
  119. }
  120. debugf("[recv] Remove Request [%s]", command.Name)
  121. dispatch(command, w, req, false)
  122. }
  123. // Response to the name request
  124. func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  125. debugf("[recv] Get %s/name/ ", r.url)
  126. w.WriteHeader(http.StatusOK)
  127. w.Write([]byte(r.name))
  128. }
  129. // Response to the name request
  130. func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  131. debugf("[recv] Get %s/version/ ", r.url)
  132. w.WriteHeader(http.StatusOK)
  133. w.Write([]byte(r.version))
  134. }