get_handler.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package v2
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strconv"
  8. etcdErr "github.com/coreos/etcd/error"
  9. "github.com/coreos/etcd/log"
  10. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  11. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  12. )
  13. func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
  14. vars := mux.Vars(req)
  15. key := "/" + vars["key"]
  16. // Help client to redirect the request to the current leader
  17. if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
  18. leader := s.Leader()
  19. hostname, _ := s.ClientURL(leader)
  20. url, err := url.Parse(hostname)
  21. if err != nil {
  22. log.Warn("Redirect cannot parse hostName ", hostname)
  23. return err
  24. }
  25. url.RawQuery = req.URL.RawQuery
  26. url.Path = req.URL.Path
  27. log.Debugf("Redirect consistent get to %s", url.String())
  28. http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  29. return nil
  30. }
  31. recursive := (req.FormValue("recursive") == "true")
  32. sort := (req.FormValue("sorted") == "true")
  33. waitIndex := req.FormValue("waitIndex")
  34. stream := (req.FormValue("stream") == "true")
  35. if req.FormValue("wait") == "true" {
  36. return handleWatch(key, recursive, stream, waitIndex, w, req, s)
  37. }
  38. return handleGet(key, recursive, sort, w, req, s)
  39. }
  40. func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request, s Server) error {
  41. // Create a command to watch from a given index (default 0).
  42. var sinceIndex uint64 = 0
  43. var err error
  44. if waitIndex != "" {
  45. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  46. if err != nil {
  47. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  48. }
  49. }
  50. watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  51. if err != nil {
  52. return err
  53. }
  54. cn, _ := w.(http.CloseNotifier)
  55. closeChan := cn.CloseNotify()
  56. writeHeaders(w, s)
  57. if stream {
  58. // watcher hub will not help to remove stream watcher
  59. // so we need to remove here
  60. defer watcher.Remove()
  61. for {
  62. select {
  63. case <-closeChan:
  64. return nil
  65. case event, ok := <-watcher.EventChan:
  66. if !ok {
  67. // If the channel is closed this may be an indication of
  68. // that notifications are much more than we are able to
  69. // send to the client in time. Then we simply end streaming.
  70. return nil
  71. }
  72. if req.Method == "HEAD" {
  73. continue
  74. }
  75. b, _ := json.Marshal(event)
  76. _, err := w.Write(b)
  77. if err != nil {
  78. return nil
  79. }
  80. w.(http.Flusher).Flush()
  81. }
  82. }
  83. }
  84. select {
  85. case <-closeChan:
  86. watcher.Remove()
  87. case event := <-watcher.EventChan:
  88. if req.Method == "HEAD" {
  89. return nil
  90. }
  91. b, _ := json.Marshal(event)
  92. w.Write(b)
  93. }
  94. return nil
  95. }
  96. func handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request, s Server) error {
  97. event, err := s.Store().Get(key, recursive, sort)
  98. if err != nil {
  99. return err
  100. }
  101. if req.Method == "HEAD" {
  102. return nil
  103. }
  104. writeHeaders(w, s)
  105. b, _ := json.Marshal(event)
  106. w.Write(b)
  107. return nil
  108. }
  109. func writeHeaders(w http.ResponseWriter, s Server) {
  110. w.Header().Set("Content-Type", "application/json")
  111. w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  112. w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  113. w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  114. if url, ok := s.ClientURL(s.Leader()); ok {
  115. w.Header().Set("X-Leader-Client-URL", url)
  116. }
  117. if url, ok := s.PeerURL(s.Leader()); ok {
  118. w.Header().Set("X-Leader-Peer-URL", url)
  119. }
  120. w.WriteHeader(http.StatusOK)
  121. }