v2_http_get.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "net/http"
  18. "strconv"
  19. etcdErr "github.com/coreos/etcd/error"
  20. )
  21. func (p *participant) GetHandler(w http.ResponseWriter, req *http.Request) error {
  22. if req.FormValue("consistent") == "true" && !p.node.IsLeader() {
  23. return p.redirect(w, req, p.node.Leader())
  24. }
  25. key := req.URL.Path[len("/v2/keys"):]
  26. recursive := (req.FormValue("recursive") == "true")
  27. sort := (req.FormValue("sorted") == "true")
  28. waitIndex := req.FormValue("waitIndex")
  29. stream := (req.FormValue("stream") == "true")
  30. if req.FormValue("quorum") == "true" {
  31. return p.handleQuorumGet(key, recursive, sort, w, req)
  32. }
  33. if req.FormValue("wait") == "true" {
  34. return p.handleWatch(key, recursive, stream, waitIndex, w, req)
  35. }
  36. return p.handleGet(key, recursive, sort, w, req)
  37. }
  38. func (p *participant) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
  39. // Create a command to watch from a given index (default 0).
  40. var sinceIndex uint64 = 0
  41. var err error
  42. if waitIndex != "" {
  43. sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
  44. if err != nil {
  45. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", p.Store.Index())
  46. }
  47. }
  48. watcher, err := p.Store.Watch(key, recursive, stream, sinceIndex)
  49. if err != nil {
  50. return err
  51. }
  52. cn, _ := w.(http.CloseNotifier)
  53. closeChan := cn.CloseNotify()
  54. p.writeHeaders(w)
  55. w.(http.Flusher).Flush()
  56. if stream {
  57. // watcher hub will not help to remove stream watcher
  58. // so we need to remove here
  59. defer watcher.Remove()
  60. for {
  61. select {
  62. case <-closeChan:
  63. return nil
  64. case event, ok := <-watcher.EventChan:
  65. if !ok {
  66. // If the channel is closed this may be an indication of
  67. // that notifications are much more than we are able to
  68. // send to the client in time. Then we simply end streaming.
  69. return nil
  70. }
  71. if req.Method == "HEAD" {
  72. continue
  73. }
  74. b, _ := json.Marshal(event)
  75. _, err := w.Write(b)
  76. if err != nil {
  77. return nil
  78. }
  79. w.(http.Flusher).Flush()
  80. }
  81. }
  82. }
  83. select {
  84. case <-closeChan:
  85. watcher.Remove()
  86. case event := <-watcher.EventChan:
  87. if req.Method == "HEAD" {
  88. return nil
  89. }
  90. b, _ := json.Marshal(event)
  91. w.Write(b)
  92. }
  93. return nil
  94. }
  95. func (p *participant) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  96. event, err := p.Store.Get(key, recursive, sort)
  97. if err != nil {
  98. return err
  99. }
  100. p.writeHeaders(w)
  101. if req.Method == "HEAD" {
  102. return nil
  103. }
  104. b, err := json.Marshal(event)
  105. if err != nil {
  106. panic(fmt.Sprintf("handleGet: ", err))
  107. }
  108. w.Write(b)
  109. return nil
  110. }
  111. func (p *participant) handleQuorumGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
  112. if req.Method == "HEAD" {
  113. return fmt.Errorf("not support HEAD")
  114. }
  115. event, err := p.QuorumGet(key, recursive, sort)
  116. if err != nil {
  117. return err
  118. }
  119. p.handleRet(w, event)
  120. return nil
  121. }
  122. func (p *participant) writeHeaders(w http.ResponseWriter) {
  123. w.Header().Set("Content-Type", "application/json")
  124. w.Header().Add("X-Etcd-Index", fmt.Sprint(p.Store.Index()))
  125. // TODO(xiangli): raft-index and term
  126. w.WriteHeader(http.StatusOK)
  127. }