v2_http_get.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package etcd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. etcdErr "github.com/coreos/etcd/error"
  8. )
  9. func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error {
  10. key := req.URL.Path[len("/v2/keys"):]
  11. // TODO(xiangli): handle consistent get
  12. recursive := (req.FormValue("recursive") == "true")
  13. sort := (req.FormValue("sorted") == "true")
  14. waitIndex := req.FormValue("waitIndex")
  15. stream := (req.FormValue("stream") == "true")
  16. if req.FormValue("wait") == "true" {
  17. return s.handleWatch(key, recursive, stream, waitIndex, w, req)
  18. }
  19. return s.handleGet(key, recursive, sort, w, req)
  20. }
  21. func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
  22. // Create a command to watch from a given index (default 0).
  23. var sinceIndex uint64 = 0
  24. var err error
  25. if waitIndex != "" {
  26. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  27. if err != nil {
  28. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store.Index())
  29. }
  30. }
  31. watcher, err := s.Store.Watch(key, recursive, stream, sinceIndex)
  32. if err != nil {
  33. return err
  34. }
  35. cn, _ := w.(http.CloseNotifier)
  36. closeChan := cn.CloseNotify()
  37. s.writeHeaders(w)
  38. if stream {
  39. // watcher hub will not help to remove stream watcher
  40. // so we need to remove here
  41. defer watcher.Remove()
  42. for {
  43. select {
  44. case <-closeChan:
  45. return nil
  46. case event, ok := <-watcher.EventChan:
  47. if !ok {
  48. // If the channel is closed this may be an indication of
  49. // that notifications are much more than we are able to
  50. // send to the client in time. Then we simply end streaming.
  51. return nil
  52. }
  53. if req.Method == "HEAD" {
  54. continue
  55. }
  56. b, _ := json.Marshal(event)
  57. _, err := w.Write(b)
  58. if err != nil {
  59. return nil
  60. }
  61. w.(http.Flusher).Flush()
  62. }
  63. }
  64. }
  65. select {
  66. case <-closeChan:
  67. watcher.Remove()
  68. case event := <-watcher.EventChan:
  69. if req.Method == "HEAD" {
  70. return nil
  71. }
  72. b, _ := json.Marshal(event)
  73. w.Write(b)
  74. }
  75. return nil
  76. }
  77. func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  78. event, err := s.Store.Get(key, recursive, sort)
  79. if err != nil {
  80. return err
  81. }
  82. s.writeHeaders(w)
  83. if req.Method == "HEAD" {
  84. return nil
  85. }
  86. b, err := json.Marshal(event)
  87. if err != nil {
  88. panic(fmt.Sprintf("handleGet: ", err))
  89. }
  90. w.Write(b)
  91. return nil
  92. }
  93. func (s *Server) writeHeaders(w http.ResponseWriter) {
  94. w.Header().Set("Content-Type", "application/json")
  95. w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store.Index()))
  96. // TODO(xiangli): raft-index and term
  97. w.WriteHeader(http.StatusOK)
  98. }