peer_server_handlers.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package server
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "github.com/coreos/etcd/log"
  7. "github.com/coreos/go-raft"
  8. )
  9. // Get all the current logs
  10. func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  11. log.Debugf("[recv] GET %s/log", s.url)
  12. w.Header().Set("Content-Type", "application/json")
  13. w.WriteHeader(http.StatusOK)
  14. json.NewEncoder(w).Encode(s.LogEntries())
  15. }
  16. // Response to vote request
  17. func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  18. rvreq := &raft.RequestVoteRequest{}
  19. err := decodeJsonRequest(req, rvreq)
  20. if err == nil {
  21. log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
  22. if resp := s.RequestVote(rvreq); resp != nil {
  23. w.WriteHeader(http.StatusOK)
  24. json.NewEncoder(w).Encode(resp)
  25. return
  26. }
  27. }
  28. log.Warnf("[vote] ERROR: %v", err)
  29. w.WriteHeader(http.StatusInternalServerError)
  30. }
  31. // Response to append entries request
  32. func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  33. aereq := &raft.AppendEntriesRequest{}
  34. err := decodeJsonRequest(req, aereq)
  35. if err == nil {
  36. log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries))
  37. s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
  38. if resp := s.AppendEntries(aereq); resp != nil {
  39. w.WriteHeader(http.StatusOK)
  40. json.NewEncoder(w).Encode(resp)
  41. if !resp.Success {
  42. log.Debugf("[Append Entry] Step back")
  43. }
  44. return
  45. }
  46. }
  47. log.Warnf("[Append Entry] ERROR: %v", err)
  48. w.WriteHeader(http.StatusInternalServerError)
  49. }
  50. // Response to recover from snapshot request
  51. func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  52. aereq := &raft.SnapshotRequest{}
  53. err := decodeJsonRequest(req, aereq)
  54. if err == nil {
  55. log.Debugf("[recv] POST %s/snapshot/ ", s.url)
  56. if resp := s.RequestSnapshot(aereq); resp != nil {
  57. w.WriteHeader(http.StatusOK)
  58. json.NewEncoder(w).Encode(resp)
  59. return
  60. }
  61. }
  62. log.Warnf("[Snapshot] ERROR: %v", err)
  63. w.WriteHeader(http.StatusInternalServerError)
  64. }
  65. // Response to recover from snapshot request
  66. func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
  67. aereq := &raft.SnapshotRecoveryRequest{}
  68. err := decodeJsonRequest(req, aereq)
  69. if err == nil {
  70. log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
  71. if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
  72. w.WriteHeader(http.StatusOK)
  73. json.NewEncoder(w).Encode(resp)
  74. return
  75. }
  76. }
  77. log.Warnf("[Snapshot] ERROR: %v", err)
  78. w.WriteHeader(http.StatusInternalServerError)
  79. }
  80. // Get the port that listening for etcd connecting of the server
  81. func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
  82. log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
  83. w.WriteHeader(http.StatusOK)
  84. w.Write([]byte(s.server.URL()))
  85. }
  86. // Response to the join request
  87. func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  88. command := &JoinCommand{}
  89. // Write CORS header.
  90. if s.server.OriginAllowed("*") {
  91. w.Header().Add("Access-Control-Allow-Origin", "*")
  92. } else if s.server.OriginAllowed(req.Header.Get("Origin")) {
  93. w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
  94. }
  95. err := decodeJsonRequest(req, command)
  96. if err != nil {
  97. w.WriteHeader(http.StatusInternalServerError)
  98. return
  99. }
  100. log.Debugf("Receive Join Request from %s", command.Name)
  101. err = s.dispatch(command, w, req)
  102. // Return status.
  103. if err != nil {
  104. if etcdErr, ok := err.(*etcdErr.Error); ok {
  105. log.Debug("Return error: ", (*etcdErr).Error())
  106. etcdErr.Write(w)
  107. } else {
  108. http.Error(w, err.Error(), http.StatusInternalServerError)
  109. }
  110. }
  111. }
  112. // Response to remove request
  113. func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
  114. if req.Method != "DELETE" {
  115. w.WriteHeader(http.StatusMethodNotAllowed)
  116. return
  117. }
  118. nodeName := req.URL.Path[len("/remove/"):]
  119. command := &RemoveCommand{
  120. Name: nodeName,
  121. }
  122. log.Debugf("[recv] Remove Request [%s]", command.Name)
  123. s.dispatch(command, w, req)
  124. }
  125. // Response to the name request
  126. func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
  127. log.Debugf("[recv] Get %s/name/ ", s.url)
  128. w.WriteHeader(http.StatusOK)
  129. w.Write([]byte(s.name))
  130. }
  131. // Response to the name request
  132. func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  133. log.Debugf("[recv] Get %s/version/ ", s.url)
  134. w.WriteHeader(http.StatusOK)
  135. w.Write([]byte(PeerVersion))
  136. }