| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package etcdhttp
- import (
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "time"
- "code.google.com/p/go.net/context"
- etcdserver "github.com/coreos/etcd/etcdserver2"
- "github.com/coreos/etcd/store"
- )
- var errClosed = errors.New("etcdhttp: client closed connection")
- const DefaultTimeout = 500 * time.Millisecond
- type Handler struct {
- Timeout time.Duration
- Server etcdserver.Server
- }
- func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- // TODO: set read/write timeout?
- timeout := h.Timeout
- if timeout == 0 {
- timeout = DefaultTimeout
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- rr, err := parseRequest(r)
- if err != nil {
- http.Error(w, err.Error(), 400)
- return
- }
- resp, err := h.Server.Do(ctx, rr)
- if err != nil {
- // TODO(bmizerany): switch on store errors and etcdserver.ErrUnknownMethod
- panic("TODO")
- }
- if err := encodeResponse(ctx, w, resp); err != nil {
- http.Error(w, "Timeout while waiting for response", 504)
- }
- }
- func parseRequest(r *http.Request) (etcdserver.Request, error) {
- return etcdserver.Request{}, nil
- }
- func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response) (err error) {
- var ev *store.Event
- switch {
- case resp.Event != nil:
- ev = resp.Event
- case resp.Watcher != nil:
- ev, err = waitForEvent(ctx, w, resp.Watcher)
- if err != nil {
- return err
- }
- default:
- panic("should not be rechable")
- }
- w.Header().Set("Content-Type", "application/json")
- w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
- if ev.IsCreated() {
- w.WriteHeader(http.StatusCreated)
- } else {
- w.WriteHeader(http.StatusOK)
- }
- if err := json.NewEncoder(w).Encode(ev); err != nil {
- panic(err) // should never be reached
- }
- return nil
- }
- func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher) (*store.Event, error) {
- // TODO(bmizerany): support streaming?
- defer wa.Remove()
- var nch <-chan bool
- if x, ok := w.(http.CloseNotifier); ok {
- nch = x.CloseNotify()
- }
- select {
- case ev := <-wa.EventChan:
- return ev, nil
- case <-nch:
- // TODO: log something?
- return nil, errClosed
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
|