|
|
@@ -20,16 +20,9 @@ var (
|
|
|
type SendFunc func(m []raftpb.Message)
|
|
|
|
|
|
type Response struct {
|
|
|
- // The last seen term raft was at when this request was built.
|
|
|
- Term int64
|
|
|
-
|
|
|
- // The last seen index raft was at when this request was built.
|
|
|
- Commit int64
|
|
|
-
|
|
|
Event *store.Event
|
|
|
Watcher *store.Watcher
|
|
|
-
|
|
|
- err error
|
|
|
+ err error
|
|
|
}
|
|
|
|
|
|
type Server struct {
|
|
|
@@ -73,12 +66,7 @@ func (s *Server) run() {
|
|
|
if err := r.Unmarshal(e.Data); err != nil {
|
|
|
panic("TODO: this is bad, what do we do about it?")
|
|
|
}
|
|
|
-
|
|
|
- var resp Response
|
|
|
- resp.Event, resp.err = s.apply(context.TODO(), r)
|
|
|
- resp.Term = rd.Term
|
|
|
- resp.Commit = rd.Commit
|
|
|
- s.w.Trigger(r.Id, resp)
|
|
|
+ s.w.Trigger(r.Id, s.apply(r))
|
|
|
}
|
|
|
case <-s.done:
|
|
|
return
|
|
|
@@ -142,37 +130,40 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
|
|
|
}
|
|
|
|
|
|
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
|
|
|
-func (s *Server) apply(ctx context.Context, r pb.Request) (*store.Event, error) {
|
|
|
+func (s *Server) apply(r pb.Request) Response {
|
|
|
+ f := func(ev *store.Event, err error) Response {
|
|
|
+ return Response{Event: ev, err: err}
|
|
|
+ }
|
|
|
expr := time.Unix(0, r.Expiration)
|
|
|
switch r.Method {
|
|
|
case "POST":
|
|
|
- return s.Store.Create(r.Path, r.Dir, r.Val, true, expr)
|
|
|
+ return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
|
|
|
case "PUT":
|
|
|
exists, existsSet := getBool(r.PrevExists)
|
|
|
switch {
|
|
|
case existsSet:
|
|
|
if exists {
|
|
|
- return s.Store.Update(r.Path, r.Val, expr)
|
|
|
+ return f(s.Store.Update(r.Path, r.Val, expr))
|
|
|
} else {
|
|
|
- return s.Store.Create(r.Path, r.Dir, r.Val, false, expr)
|
|
|
+ return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
|
|
|
}
|
|
|
case r.PrevIndex > 0 || r.PrevValue != "":
|
|
|
- return s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr)
|
|
|
+ return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
|
|
|
default:
|
|
|
- return s.Store.Set(r.Path, r.Dir, r.Val, expr)
|
|
|
+ return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
|
|
|
}
|
|
|
case "DELETE":
|
|
|
switch {
|
|
|
case r.PrevIndex > 0 || r.PrevValue != "":
|
|
|
- return s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)
|
|
|
+ return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
|
|
default:
|
|
|
- return s.Store.Delete(r.Path, r.Recursive, r.Dir)
|
|
|
+ return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
|
|
|
}
|
|
|
case "QGET":
|
|
|
- return s.Store.Get(r.Path, r.Recursive, r.Sorted)
|
|
|
+ return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
|
|
|
default:
|
|
|
// This should never be reached, but just in case:
|
|
|
- return nil, ErrUnknownMethod
|
|
|
+ return Response{err: ErrUnknownMethod}
|
|
|
}
|
|
|
}
|
|
|
|