|
|
@@ -178,6 +178,8 @@ type EtcdServer struct {
|
|
|
|
|
|
store store.Store
|
|
|
|
|
|
+ applyV2 applierV2
|
|
|
+
|
|
|
applyV3 applierV3
|
|
|
kv dstorage.ConsistentWatchableKV
|
|
|
lessor lease.Lessor
|
|
|
@@ -382,6 +384,8 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|
|
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
|
|
}
|
|
|
|
|
|
+ srv.applyV2 = &applierV2store{srv}
|
|
|
+
|
|
|
srv.be = be
|
|
|
srv.lessor = lease.NewLessor(srv.be)
|
|
|
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
|
|
|
@@ -731,70 +735,6 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
|
|
|
// when the server is stopped.
|
|
|
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
|
|
|
|
|
|
-// Do interprets r and performs an operation on s.store according to r.Method
|
|
|
-// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
|
|
|
-// Quorum == true, r will be sent through consensus before performing its
|
|
|
-// respective operation. Do will block until an action is performed or there is
|
|
|
-// an error.
|
|
|
-func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
|
|
- r.ID = s.reqIDGen.Next()
|
|
|
- if r.Method == "GET" && r.Quorum {
|
|
|
- r.Method = "QGET"
|
|
|
- }
|
|
|
- switch r.Method {
|
|
|
- case "POST", "PUT", "DELETE", "QGET":
|
|
|
- data, err := r.Marshal()
|
|
|
- if err != nil {
|
|
|
- return Response{}, err
|
|
|
- }
|
|
|
- ch := s.w.Register(r.ID)
|
|
|
-
|
|
|
- // TODO: benchmark the cost of time.Now()
|
|
|
- // might be sampling?
|
|
|
- start := time.Now()
|
|
|
- s.r.Propose(ctx, data)
|
|
|
-
|
|
|
- proposePending.Inc()
|
|
|
- defer proposePending.Dec()
|
|
|
-
|
|
|
- select {
|
|
|
- case x := <-ch:
|
|
|
- proposeDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
|
|
- resp := x.(Response)
|
|
|
- return resp, resp.err
|
|
|
- case <-ctx.Done():
|
|
|
- proposeFailed.Inc()
|
|
|
- s.w.Trigger(r.ID, nil) // GC wait
|
|
|
- return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
|
|
|
- case <-s.done:
|
|
|
- return Response{}, ErrStopped
|
|
|
- }
|
|
|
- case "GET":
|
|
|
- switch {
|
|
|
- case r.Wait:
|
|
|
- wc, err := s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
|
|
|
- if err != nil {
|
|
|
- return Response{}, err
|
|
|
- }
|
|
|
- return Response{Watcher: wc}, nil
|
|
|
- default:
|
|
|
- ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted)
|
|
|
- if err != nil {
|
|
|
- return Response{}, err
|
|
|
- }
|
|
|
- return Response{Event: ev}, nil
|
|
|
- }
|
|
|
- case "HEAD":
|
|
|
- ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted)
|
|
|
- if err != nil {
|
|
|
- return Response{}, err
|
|
|
- }
|
|
|
- return Response{Event: ev}, nil
|
|
|
- default:
|
|
|
- return Response{}, ErrUnknownMethod
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
|
|
|
|
|
|
func (s *EtcdServer) LeaderStats() []byte {
|
|
|
@@ -1025,48 +965,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|
|
e := es[i]
|
|
|
switch e.Type {
|
|
|
case raftpb.EntryNormal:
|
|
|
- // raft state machine may generate noop entry when leader confirmation.
|
|
|
- // skip it in advance to avoid some potential bug in the future
|
|
|
- if len(e.Data) == 0 {
|
|
|
- select {
|
|
|
- case s.forceVersionC <- struct{}{}:
|
|
|
- default:
|
|
|
- }
|
|
|
- break
|
|
|
- }
|
|
|
-
|
|
|
- var raftReq pb.InternalRaftRequest
|
|
|
- if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
|
|
|
- var r pb.Request
|
|
|
- pbutil.MustUnmarshal(&r, e.Data)
|
|
|
- s.w.Trigger(r.ID, s.applyRequest(r))
|
|
|
- } else if raftReq.V2 != nil {
|
|
|
- req := raftReq.V2
|
|
|
- s.w.Trigger(req.ID, s.applyRequest(*req))
|
|
|
- } else {
|
|
|
- // do not re-apply applied entries.
|
|
|
- if e.Index <= s.consistIndex.ConsistentIndex() {
|
|
|
- break
|
|
|
- }
|
|
|
- // set the consistent index of current executing entry
|
|
|
- s.consistIndex.setConsistentIndex(e.Index)
|
|
|
- ar := s.applyV3Request(&raftReq)
|
|
|
- if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|
|
|
- s.w.Trigger(raftReq.ID, ar)
|
|
|
- break
|
|
|
- }
|
|
|
- plog.Errorf("applying raft message exceeded backend quota")
|
|
|
- go func() {
|
|
|
- a := &pb.AlarmRequest{
|
|
|
- MemberID: uint64(s.ID()),
|
|
|
- Action: pb.AlarmRequest_ACTIVATE,
|
|
|
- Alarm: pb.AlarmType_NOSPACE,
|
|
|
- }
|
|
|
- r := pb.InternalRaftRequest{Alarm: a}
|
|
|
- s.processInternalRaftRequest(context.TODO(), r)
|
|
|
- s.w.Trigger(raftReq.ID, ar)
|
|
|
- }()
|
|
|
- }
|
|
|
+ s.applyEntryNormal(&e)
|
|
|
case raftpb.EntryConfChange:
|
|
|
var cc raftpb.ConfChange
|
|
|
pbutil.MustUnmarshal(&cc, e.Data)
|
|
|
@@ -1083,70 +982,54 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|
|
return applied, shouldstop
|
|
|
}
|
|
|
|
|
|
-// applyRequest interprets r as a call to store.X and returns a Response interpreted
|
|
|
-// from store.Event
|
|
|
-func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|
|
- f := func(ev *store.Event, err error) Response {
|
|
|
- return Response{Event: ev, err: err}
|
|
|
- }
|
|
|
-
|
|
|
- refresh, _ := pbutil.GetBool(r.Refresh)
|
|
|
- ttlOptions := store.TTLOptionSet{Refresh: refresh}
|
|
|
- if r.Expiration != 0 {
|
|
|
- ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
|
|
|
- }
|
|
|
-
|
|
|
- switch r.Method {
|
|
|
- case "POST":
|
|
|
- return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions))
|
|
|
- case "PUT":
|
|
|
- exists, existsSet := pbutil.GetBool(r.PrevExist)
|
|
|
- switch {
|
|
|
- case existsSet:
|
|
|
- if exists {
|
|
|
- if r.PrevIndex == 0 && r.PrevValue == "" {
|
|
|
- return f(s.store.Update(r.Path, r.Val, ttlOptions))
|
|
|
- } else {
|
|
|
- return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
|
|
- }
|
|
|
- }
|
|
|
- return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
|
|
|
- case r.PrevIndex > 0 || r.PrevValue != "":
|
|
|
- return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
|
|
- default:
|
|
|
- if storeMemberAttributeRegexp.MatchString(r.Path) {
|
|
|
- id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
|
|
|
- var attr membership.Attributes
|
|
|
- if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
|
|
- plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
|
|
|
- }
|
|
|
- s.cluster.UpdateAttributes(id, attr)
|
|
|
- // return an empty response since there is no consumer.
|
|
|
- return Response{}
|
|
|
- }
|
|
|
- if r.Path == membership.StoreClusterVersionKey() {
|
|
|
- s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
|
|
- // return an empty response since there is no consumer.
|
|
|
- return Response{}
|
|
|
- }
|
|
|
- return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
|
|
- }
|
|
|
- case "DELETE":
|
|
|
- switch {
|
|
|
- case r.PrevIndex > 0 || r.PrevValue != "":
|
|
|
- return f(s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
|
|
+// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
|
|
+func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
+ // raft state machine may generate noop entry when leader confirmation.
|
|
|
+ // skip it in advance to avoid some potential bug in the future
|
|
|
+ if len(e.Data) == 0 {
|
|
|
+ select {
|
|
|
+ case s.forceVersionC <- struct{}{}:
|
|
|
default:
|
|
|
- return f(s.store.Delete(r.Path, r.Dir, r.Recursive))
|
|
|
}
|
|
|
- case "QGET":
|
|
|
- return f(s.store.Get(r.Path, r.Recursive, r.Sorted))
|
|
|
- case "SYNC":
|
|
|
- s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
|
|
|
- return Response{}
|
|
|
- default:
|
|
|
- // This should never be reached, but just in case:
|
|
|
- return Response{err: ErrUnknownMethod}
|
|
|
+ return
|
|
|
}
|
|
|
+
|
|
|
+ var raftReq pb.InternalRaftRequest
|
|
|
+ if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
|
|
|
+ var r pb.Request
|
|
|
+ pbutil.MustUnmarshal(&r, e.Data)
|
|
|
+ s.w.Trigger(r.ID, s.applyV2Request(&r))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if raftReq.V2 != nil {
|
|
|
+ req := raftReq.V2
|
|
|
+ s.w.Trigger(req.ID, s.applyV2Request(req))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // do not re-apply applied entries.
|
|
|
+ if e.Index <= s.consistIndex.ConsistentIndex() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // set the consistent index of current executing entry
|
|
|
+ s.consistIndex.setConsistentIndex(e.Index)
|
|
|
+ ar := s.applyV3Request(&raftReq)
|
|
|
+ if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|
|
|
+ s.w.Trigger(raftReq.ID, ar)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ plog.Errorf("applying raft message exceeded backend quota")
|
|
|
+ go func() {
|
|
|
+ a := &pb.AlarmRequest{
|
|
|
+ MemberID: uint64(s.ID()),
|
|
|
+ Action: pb.AlarmRequest_ACTIVATE,
|
|
|
+ Alarm: pb.AlarmType_NOSPACE,
|
|
|
+ }
|
|
|
+ r := pb.InternalRaftRequest{Alarm: a}
|
|
|
+ s.processInternalRaftRequest(context.TODO(), r)
|
|
|
+ s.w.Trigger(raftReq.ID, ar)
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
// applyConfChange applies a ConfChange to the server. It is only
|