v2_http_get.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "net/http"
  18. "strconv"
  19. etcdErr "github.com/coreos/etcd/error"
  20. )
  21. func (p *participant) GetHandler(w http.ResponseWriter, req *http.Request) error {
  22. key := req.URL.Path[len("/v2/keys"):]
  23. // TODO(xiangli): handle consistent get
  24. recursive := (req.FormValue("recursive") == "true")
  25. sort := (req.FormValue("sorted") == "true")
  26. waitIndex := req.FormValue("waitIndex")
  27. stream := (req.FormValue("stream") == "true")
  28. if req.FormValue("wait") == "true" {
  29. return p.handleWatch(key, recursive, stream, waitIndex, w, req)
  30. }
  31. return p.handleGet(key, recursive, sort, w, req)
  32. }
  33. func (p *participant) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
  34. // Create a command to watch from a given index (default 0).
  35. var sinceIndex uint64 = 0
  36. var err error
  37. if waitIndex != "" {
  38. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  39. if err != nil {
  40. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", p.Store.Index())
  41. }
  42. }
  43. watcher, err := p.Store.Watch(key, recursive, stream, sinceIndex)
  44. if err != nil {
  45. return err
  46. }
  47. cn, _ := w.(http.CloseNotifier)
  48. closeChan := cn.CloseNotify()
  49. p.writeHeaders(w)
  50. if stream {
  51. // watcher hub will not help to remove stream watcher
  52. // so we need to remove here
  53. defer watcher.Remove()
  54. for {
  55. select {
  56. case <-closeChan:
  57. return nil
  58. case event, ok := <-watcher.EventChan:
  59. if !ok {
  60. // If the channel is closed this may be an indication of
  61. // that notifications are much more than we are able to
  62. // send to the client in time. Then we simply end streaming.
  63. return nil
  64. }
  65. if req.Method == "HEAD" {
  66. continue
  67. }
  68. b, _ := json.Marshal(event)
  69. _, err := w.Write(b)
  70. if err != nil {
  71. return nil
  72. }
  73. w.(http.Flusher).Flush()
  74. }
  75. }
  76. }
  77. select {
  78. case <-closeChan:
  79. watcher.Remove()
  80. case event := <-watcher.EventChan:
  81. if req.Method == "HEAD" {
  82. return nil
  83. }
  84. b, _ := json.Marshal(event)
  85. w.Write(b)
  86. }
  87. return nil
  88. }
  89. func (p *participant) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  90. event, err := p.Store.Get(key, recursive, sort)
  91. if err != nil {
  92. return err
  93. }
  94. p.writeHeaders(w)
  95. if req.Method == "HEAD" {
  96. return nil
  97. }
  98. b, err := json.Marshal(event)
  99. if err != nil {
  100. panic(fmt.Sprintf("handleGet: ", err))
  101. }
  102. w.Write(b)
  103. return nil
  104. }
  105. func (p *participant) writeHeaders(w http.ResponseWriter) {
  106. w.Header().Set("Content-Type", "application/json")
  107. w.Header().Add("X-Etcd-Index", fmt.Sprint(p.Store.Index()))
  108. // TODO(xiangli): raft-index and term
  109. w.WriteHeader(http.StatusOK)
  110. }