http.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "code.google.com/p/go.net/context"
  12. "github.com/coreos/etcd/elog"
  13. etcdserver "github.com/coreos/etcd/etcdserver2"
  14. "github.com/coreos/etcd/raft"
  15. "github.com/coreos/etcd/store"
  16. )
  17. var errClosed = errors.New("etcdhttp: client closed connection")
  18. const DefaultTimeout = 500 * time.Millisecond
  19. type Handler struct {
  20. Timeout time.Duration
  21. Server etcdserver.Server
  22. }
  23. func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  24. // TODO: set read/write timeout?
  25. timeout := h.Timeout
  26. if timeout == 0 {
  27. timeout = DefaultTimeout
  28. }
  29. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  30. defer cancel()
  31. switch {
  32. case strings.HasPrefix(r.URL.Path, "/raft"):
  33. h.serveRaft(ctx, w, r)
  34. case strings.HasPrefix(r.URL.Path, "/keys/"):
  35. h.serveKeys(ctx, w, r)
  36. default:
  37. http.NotFound(w, r)
  38. }
  39. }
  40. func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  41. rr, err := parseRequest(r)
  42. if err != nil {
  43. http.Error(w, err.Error(), 400)
  44. return
  45. }
  46. resp, err := h.Server.Do(ctx, rr)
  47. if err != nil {
  48. // TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod
  49. panic("TODO")
  50. }
  51. if err := encodeResponse(ctx, w, resp); err != nil {
  52. http.Error(w, "Timeout while waiting for response", 504)
  53. }
  54. }
  55. func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  56. b, err := ioutil.ReadAll(r.Body)
  57. if err != nil {
  58. elog.TODO()
  59. }
  60. var m raft.Message
  61. if err := m.Unmarshal(b); err != nil {
  62. elog.TODO()
  63. }
  64. if err := h.Server.Node.Step(ctx, m); err != nil {
  65. elog.TODO()
  66. }
  67. }
  68. func genId() int64 {
  69. panic("implement me")
  70. }
  71. func parseRequest(r *http.Request) (etcdserver.Request, error) {
  72. q := r.URL.Query()
  73. rr := etcdserver.Request{
  74. Id: genId(),
  75. Method: r.Method,
  76. Path: r.URL.Path[len("/keys/"):],
  77. Val: q.Get("value"),
  78. PrevValue: q.Get("prevValue"),
  79. PrevIndex: parseUint64(q.Get("prevIndex")),
  80. Recursive: parseBool(q.Get("recursive")),
  81. Since: parseUint64(q.Get("waitIndex")),
  82. Sorted: parseBool(q.Get("sorted")),
  83. Wait: parseBool(q.Get("wait")),
  84. }
  85. // PrevExists is nullable, so we leave it null if prevExist wasn't
  86. // specified.
  87. _, ok := q["wait"]
  88. if ok {
  89. bv := parseBool(q.Get("wait"))
  90. rr.PrevExists = &bv
  91. }
  92. ttl := parseUint64(q.Get("ttl"))
  93. if ttl > 0 {
  94. expr := time.Duration(ttl) * time.Second
  95. rr.Expiration = time.Now().Add(expr).UnixNano()
  96. }
  97. return rr, nil
  98. }
  99. func parseBool(s string) bool {
  100. v, _ := strconv.ParseBool(s)
  101. return v
  102. }
  103. func parseUint64(s string) uint64 {
  104. v, _ := strconv.ParseUint(s, 10, 64)
  105. return v
  106. }
  107. func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response) (err error) {
  108. var ev *store.Event
  109. switch {
  110. case resp.Event != nil:
  111. ev = resp.Event
  112. case resp.Watcher != nil:
  113. ev, err = waitForEvent(ctx, w, resp.Watcher)
  114. if err != nil {
  115. return err
  116. }
  117. default:
  118. panic("should not be reachable")
  119. }
  120. w.Header().Set("Content-Type", "application/json")
  121. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
  122. if ev.IsCreated() {
  123. w.WriteHeader(http.StatusCreated)
  124. } else {
  125. w.WriteHeader(http.StatusOK)
  126. }
  127. if err := json.NewEncoder(w).Encode(ev); err != nil {
  128. panic(err) // should never be reached
  129. }
  130. return nil
  131. }
  132. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) {
  133. // TODO(bmizerany): support streaming?
  134. defer wa.Remove()
  135. var nch <-chan bool
  136. if x, ok := w.(http.CloseNotifier); ok {
  137. nch = x.CloseNotify()
  138. }
  139. select {
  140. case ev := <-wa.EventChan:
  141. return ev, nil
  142. case <-nch:
  143. elog.TODO()
  144. return nil, errClosed
  145. case <-ctx.Done():
  146. return nil, ctx.Err()
  147. }
  148. }