|
|
@@ -24,25 +24,48 @@ import (
|
|
|
)
|
|
|
|
|
|
type V3DemoServer interface {
|
|
|
- V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message
|
|
|
+ V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error)
|
|
|
}
|
|
|
|
|
|
-func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message {
|
|
|
+func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
|
|
|
+ r.ID = s.reqIDGen.Next()
|
|
|
+
|
|
|
+ data, err := r.Marshal()
|
|
|
+ if err != nil {
|
|
|
+ return &pb.EmptyResponse{}, err
|
|
|
+ }
|
|
|
+ ch := s.w.Register(r.ID)
|
|
|
+
|
|
|
+ s.r.Propose(ctx, data)
|
|
|
+
|
|
|
+ select {
|
|
|
+ case x := <-ch:
|
|
|
+ resp := x.(proto.Message)
|
|
|
+ return resp, nil
|
|
|
+ case <-ctx.Done():
|
|
|
+ s.w.Trigger(r.ID, nil) // GC wait
|
|
|
+ return &pb.EmptyResponse{}, ctx.Err()
|
|
|
+ case <-s.done:
|
|
|
+ return &pb.EmptyResponse{}, ErrStopped
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} {
|
|
|
switch {
|
|
|
case r.Range != nil:
|
|
|
- return doRange(s.kv, r.Range)
|
|
|
+ return applyRange(s.kv, r.Range)
|
|
|
case r.Put != nil:
|
|
|
- return doPut(s.kv, r.Put)
|
|
|
+ return applyPut(s.kv, r.Put)
|
|
|
case r.DeleteRange != nil:
|
|
|
- return doDeleteRange(s.kv, r.DeleteRange)
|
|
|
+ return applyDeleteRange(s.kv, r.DeleteRange)
|
|
|
case r.Txn != nil:
|
|
|
- return doTxn(s.kv, r.Txn)
|
|
|
+ return applyTxn(s.kv, r.Txn)
|
|
|
default:
|
|
|
panic("not implemented")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse {
|
|
|
+func applyPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse {
|
|
|
resp := &pb.PutResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
rev := kv.Put(p.Key, p.Value)
|
|
|
@@ -50,7 +73,7 @@ func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse {
|
|
|
return resp
|
|
|
}
|
|
|
|
|
|
-func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse {
|
|
|
+func applyRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse {
|
|
|
resp := &pb.RangeResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0)
|
|
|
@@ -65,7 +88,7 @@ func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse {
|
|
|
return resp
|
|
|
}
|
|
|
|
|
|
-func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse {
|
|
|
+func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse {
|
|
|
resp := &pb.DeleteRangeResponse{}
|
|
|
resp.Header = &pb.ResponseHeader{}
|
|
|
_, rev := kv.DeleteRange(dr.Key, dr.RangeEnd)
|
|
|
@@ -73,12 +96,12 @@ func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeRes
|
|
|
return resp
|
|
|
}
|
|
|
|
|
|
-func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse {
|
|
|
+func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse {
|
|
|
var revision int64
|
|
|
|
|
|
ok := true
|
|
|
for _, c := range rt.Compare {
|
|
|
- if revision, ok = doCompare(kv, c); !ok {
|
|
|
+ if revision, ok = applyCompare(kv, c); !ok {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
@@ -91,7 +114,7 @@ func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse {
|
|
|
}
|
|
|
resps := make([]*pb.ResponseUnion, len(reqs))
|
|
|
for i := range reqs {
|
|
|
- resps[i] = doUnion(kv, reqs[i])
|
|
|
+ resps[i] = applyUnion(kv, reqs[i])
|
|
|
}
|
|
|
if len(resps) != 0 {
|
|
|
revision += 1
|
|
|
@@ -105,21 +128,21 @@ func doTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse {
|
|
|
return txnResp
|
|
|
}
|
|
|
|
|
|
-func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
|
|
|
+func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
|
|
|
switch {
|
|
|
case union.RequestRange != nil:
|
|
|
- return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)}
|
|
|
+ return &pb.ResponseUnion{ResponseRange: applyRange(kv, union.RequestRange)}
|
|
|
case union.RequestPut != nil:
|
|
|
- return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)}
|
|
|
+ return &pb.ResponseUnion{ResponsePut: applyPut(kv, union.RequestPut)}
|
|
|
case union.RequestDeleteRange != nil:
|
|
|
- return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)}
|
|
|
+ return &pb.ResponseUnion{ResponseDeleteRange: applyDeleteRange(kv, union.RequestDeleteRange)}
|
|
|
default:
|
|
|
// empty union
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func doCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) {
|
|
|
+func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) {
|
|
|
ckvs, rev, err := kv.Range(c.Key, nil, 1, 0)
|
|
|
if err != nil {
|
|
|
return rev, false
|