get_handler.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // +build ignore
  2. package v2
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  12. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  13. )
  14. func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
  15. vars := mux.Vars(req)
  16. key := "/" + vars["key"]
  17. recursive := (req.FormValue("recursive") == "true")
  18. sort := (req.FormValue("sorted") == "true")
  19. if req.FormValue("quorum") == "true" {
  20. c := s.Store().CommandFactory().CreateGetCommand(key, recursive, sort)
  21. return s.Dispatch(c, w, req)
  22. }
  23. // Help client to redirect the request to the current leader
  24. if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
  25. leader := s.Leader()
  26. hostname, _ := s.ClientURL(leader)
  27. url, err := url.Parse(hostname)
  28. if err != nil {
  29. log.Warn("Redirect cannot parse hostName ", hostname)
  30. return err
  31. }
  32. url.RawQuery = req.URL.RawQuery
  33. url.Path = req.URL.Path
  34. log.Debugf("Redirect consistent get to %s", url.String())
  35. http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  36. return nil
  37. }
  38. waitIndex := req.FormValue("waitIndex")
  39. stream := (req.FormValue("stream") == "true")
  40. if req.FormValue("wait") == "true" {
  41. return handleWatch(key, recursive, stream, waitIndex, w, req, s)
  42. }
  43. return handleGet(key, recursive, sort, w, req, s)
  44. }
  45. func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request, s Server) error {
  46. // Create a command to watch from a given index (default 0).
  47. var sinceIndex uint64 = 0
  48. var err error
  49. if waitIndex != "" {
  50. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  51. if err != nil {
  52. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  53. }
  54. }
  55. watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  56. if err != nil {
  57. return err
  58. }
  59. cn, _ := w.(http.CloseNotifier)
  60. closeChan := cn.CloseNotify()
  61. writeHeaders(w, s)
  62. w.(http.Flusher).Flush()
  63. if stream {
  64. // watcher hub will not help to remove stream watcher
  65. // so we need to remove here
  66. defer watcher.Remove()
  67. for {
  68. select {
  69. case <-closeChan:
  70. return nil
  71. case event, ok := <-watcher.EventChan:
  72. if !ok {
  73. // If the channel is closed this may be an indication of
  74. // that notifications are much more than we are able to
  75. // send to the client in time. Then we simply end streaming.
  76. return nil
  77. }
  78. if req.Method == "HEAD" {
  79. continue
  80. }
  81. b, _ := json.Marshal(event)
  82. _, err := w.Write(b)
  83. if err != nil {
  84. return nil
  85. }
  86. w.(http.Flusher).Flush()
  87. }
  88. }
  89. }
  90. select {
  91. case <-closeChan:
  92. watcher.Remove()
  93. case event := <-watcher.EventChan:
  94. if req.Method == "HEAD" {
  95. return nil
  96. }
  97. b, _ := json.Marshal(event)
  98. w.Write(b)
  99. }
  100. return nil
  101. }
  102. func handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request, s Server) error {
  103. event, err := s.Store().Get(key, recursive, sort)
  104. if err != nil {
  105. return err
  106. }
  107. if req.Method == "HEAD" {
  108. return nil
  109. }
  110. writeHeaders(w, s)
  111. b, _ := json.Marshal(event)
  112. w.Write(b)
  113. return nil
  114. }
  115. func writeHeaders(w http.ResponseWriter, s Server) {
  116. w.Header().Set("Content-Type", "application/json")
  117. w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  118. w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  119. w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  120. w.WriteHeader(http.StatusOK)
  121. }