get_handler.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package v2
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "net/http/httputil"
  7. "net/url"
  8. "strconv"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/raft"
  12. "github.com/gorilla/mux"
  13. )
  14. func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
  15. vars := mux.Vars(req)
  16. key := "/" + vars["key"]
  17. // Help client to redirect the request to the current leader
  18. if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
  19. leader := s.Leader()
  20. hostname, _ := s.ClientURL(leader)
  21. url, err := url.Parse(hostname)
  22. if err != nil {
  23. log.Warn("Redirect cannot parse hostName ", hostname)
  24. return err
  25. }
  26. url.RawQuery = req.URL.RawQuery
  27. url.Path = req.URL.Path
  28. log.Debugf("Redirect consistent get to %s", url.String())
  29. http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
  30. return nil
  31. }
  32. recursive := (req.FormValue("recursive") == "true")
  33. sort := (req.FormValue("sorted") == "true")
  34. waitIndex := req.FormValue("waitIndex")
  35. stream := (req.FormValue("stream") == "true")
  36. if req.FormValue("wait") == "true" {
  37. return handleWatch(key, recursive, stream, waitIndex, w, s)
  38. }
  39. return handleGet(key, recursive, sort, w, s)
  40. }
  41. func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, s Server) error {
  42. // Create a command to watch from a given index (default 0).
  43. var sinceIndex uint64 = 0
  44. var err error
  45. if waitIndex != "" {
  46. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  47. if err != nil {
  48. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
  49. }
  50. }
  51. watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
  52. if err != nil {
  53. return err
  54. }
  55. cn, _ := w.(http.CloseNotifier)
  56. closeChan := cn.CloseNotify()
  57. writeHeaders(w, s)
  58. if stream {
  59. // watcher hub will not help to remove stream watcher
  60. // so we need to remove here
  61. defer watcher.Remove()
  62. chunkWriter := httputil.NewChunkedWriter(w)
  63. for {
  64. select {
  65. case <-closeChan:
  66. chunkWriter.Close()
  67. return nil
  68. case event := <-watcher.EventChan:
  69. b, _ := json.Marshal(event)
  70. _, err := chunkWriter.Write(b)
  71. if err != nil {
  72. return nil
  73. }
  74. w.(http.Flusher).Flush()
  75. }
  76. }
  77. }
  78. select {
  79. case <-closeChan:
  80. watcher.Remove()
  81. case event := <-watcher.EventChan:
  82. b, _ := json.Marshal(event)
  83. w.Write(b)
  84. }
  85. return nil
  86. }
  87. func handleGet(key string, recursive, sort bool, w http.ResponseWriter, s Server) error {
  88. event, err := s.Store().Get(key, recursive, sort)
  89. if err != nil {
  90. return err
  91. }
  92. writeHeaders(w, s)
  93. b, _ := json.Marshal(event)
  94. w.Write(b)
  95. return nil
  96. }
  97. func writeHeaders(w http.ResponseWriter, s Server) {
  98. w.Header().Set("Content-Type", "application/json")
  99. w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
  100. w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
  101. w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
  102. w.WriteHeader(http.StatusOK)
  103. }