v2_http_get.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "net/http"
  18. "strconv"
  19. etcdErr "github.com/coreos/etcd/error"
  20. )
  21. func (p *participant) GetHandler(w http.ResponseWriter, req *http.Request) error {
  22. key := req.URL.Path[len("/v2/keys"):]
  23. // TODO(xiangli): handle consistent get
  24. recursive := (req.FormValue("recursive") == "true")
  25. sort := (req.FormValue("sorted") == "true")
  26. waitIndex := req.FormValue("waitIndex")
  27. stream := (req.FormValue("stream") == "true")
  28. if req.FormValue("quorum") == "true" {
  29. return p.handleQuorumGet(key, recursive, sort, w, req)
  30. }
  31. if req.FormValue("wait") == "true" {
  32. return p.handleWatch(key, recursive, stream, waitIndex, w, req)
  33. }
  34. return p.handleGet(key, recursive, sort, w, req)
  35. }
  36. func (p *participant) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
  37. // Create a command to watch from a given index (default 0).
  38. var sinceIndex uint64 = 0
  39. var err error
  40. if waitIndex != "" {
  41. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  42. if err != nil {
  43. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", p.Store.Index())
  44. }
  45. }
  46. watcher, err := p.Store.Watch(key, recursive, stream, sinceIndex)
  47. if err != nil {
  48. return err
  49. }
  50. cn, _ := w.(http.CloseNotifier)
  51. closeChan := cn.CloseNotify()
  52. p.writeHeaders(w)
  53. w.(http.Flusher).Flush()
  54. if stream {
  55. // watcher hub will not help to remove stream watcher
  56. // so we need to remove here
  57. defer watcher.Remove()
  58. for {
  59. select {
  60. case <-closeChan:
  61. return nil
  62. case event, ok := <-watcher.EventChan:
  63. if !ok {
  64. // If the channel is closed this may be an indication of
  65. // that notifications are much more than we are able to
  66. // send to the client in time. Then we simply end streaming.
  67. return nil
  68. }
  69. if req.Method == "HEAD" {
  70. continue
  71. }
  72. b, _ := json.Marshal(event)
  73. _, err := w.Write(b)
  74. if err != nil {
  75. return nil
  76. }
  77. w.(http.Flusher).Flush()
  78. }
  79. }
  80. }
  81. select {
  82. case <-closeChan:
  83. watcher.Remove()
  84. case event := <-watcher.EventChan:
  85. if req.Method == "HEAD" {
  86. return nil
  87. }
  88. b, _ := json.Marshal(event)
  89. w.Write(b)
  90. }
  91. return nil
  92. }
  93. func (p *participant) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  94. event, err := p.Store.Get(key, recursive, sort)
  95. if err != nil {
  96. return err
  97. }
  98. p.writeHeaders(w)
  99. if req.Method == "HEAD" {
  100. return nil
  101. }
  102. b, err := json.Marshal(event)
  103. if err != nil {
  104. panic(fmt.Sprintf("handleGet: ", err))
  105. }
  106. w.Write(b)
  107. return nil
  108. }
  109. func (p *participant) handleQuorumGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  110. if req.Method == "HEAD" {
  111. return fmt.Errorf("not support HEAD")
  112. }
  113. event, err := p.QuorumGet(key, recursive, sort)
  114. if err != nil {
  115. return err
  116. }
  117. p.handleRet(w, event)
  118. return nil
  119. }
  120. func (p *participant) writeHeaders(w http.ResponseWriter) {
  121. w.Header().Set("Content-Type", "application/json")
  122. w.Header().Add("X-Etcd-Index", fmt.Sprint(p.Store.Index()))
  123. // TODO(xiangli): raft-index and term
  124. w.WriteHeader(http.StatusOK)
  125. }