http.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "time"
  8. "code.google.com/p/go.net/context"
  9. etcdserver "github.com/coreos/etcd/etcdserver2"
  10. "github.com/coreos/etcd/store"
  11. )
  12. var errClosed = errors.New("etcdhttp: client closed connection")
  13. const DefaultTimeout = 500 * time.Millisecond
  14. type Handler struct {
  15. Timeout time.Duration
  16. Server etcdserver.Server
  17. }
  18. func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  19. // TODO: set read/write timeout?
  20. timeout := h.Timeout
  21. if timeout == 0 {
  22. timeout = DefaultTimeout
  23. }
  24. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  25. defer cancel()
  26. rr, err := parseRequest(r)
  27. if err != nil {
  28. http.Error(w, err.Error(), 400)
  29. return
  30. }
  31. resp, err := h.Server.Do(ctx, rr)
  32. if err != nil {
  33. // TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod
  34. panic("TODO")
  35. }
  36. if err := encodeResponse(ctx, w, resp); err != nil {
  37. http.Error(w, "Timeout while waiting for response", 504)
  38. }
  39. }
  40. func parseRequest(r *http.Request) (etcdserver.Request, error) {
  41. return etcdserver.Request{}, nil
  42. }
  43. func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response) (err error) {
  44. var ev *store.Event
  45. switch {
  46. case resp.Event != nil:
  47. ev = resp.Event
  48. case resp.Watcher != nil:
  49. ev, err = waitForEvent(ctx, w, resp.Watcher)
  50. if err != nil {
  51. return err
  52. }
  53. default:
  54. panic("should not be rechable")
  55. }
  56. w.Header().Set("Content-Type", "application/json")
  57. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
  58. if ev.IsCreated() {
  59. w.WriteHeader(http.StatusCreated)
  60. } else {
  61. w.WriteHeader(http.StatusOK)
  62. }
  63. if err := json.NewEncoder(w).Encode(ev); err != nil {
  64. panic(err) // should never be reached
  65. }
  66. return nil
  67. }
  68. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) {
  69. // TODO(bmizerany): support streaming?
  70. defer wa.Remove()
  71. var nch <-chan bool
  72. if x, ok := w.(http.CloseNotifier); ok {
  73. nch = x.CloseNotify()
  74. }
  75. select {
  76. case ev := <-wa.EventChan:
  77. return ev, nil
  78. case <-nch:
  79. // TODO: log something?
  80. return nil, errClosed
  81. case <-ctx.Done():
  82. return nil, ctx.Err()
  83. }
  84. }