|
@@ -27,7 +27,6 @@ import (
|
|
|
"github.com/coreos/etcd/mvcc"
|
|
"github.com/coreos/etcd/mvcc"
|
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft"
|
|
|
|
|
|
|
|
- "github.com/coreos/go-semver/semver"
|
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -45,10 +44,6 @@ const (
|
|
|
maxGapBetweenApplyAndCommitIndex = 5000
|
|
maxGapBetweenApplyAndCommitIndex = 5000
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-var (
|
|
|
|
|
- newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0"))
|
|
|
|
|
-)
|
|
|
|
|
-
|
|
|
|
|
type RaftKV interface {
|
|
type RaftKV interface {
|
|
|
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
|
|
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
|
|
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
|
|
@@ -91,11 +86,6 @@ type Authenticator interface {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
|
- // TODO: remove this checking when we release etcd 3.2
|
|
|
|
|
- if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
|
|
|
|
- return s.legacyRange(ctx, r)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
if !r.Serializable {
|
|
if !r.Serializable {
|
|
|
err := s.linearizableReadNotify(ctx)
|
|
err := s.linearizableReadNotify(ctx)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -114,30 +104,6 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
|
|
return resp, err
|
|
return resp, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// TODO: remove this func when we release etcd 3.2
|
|
|
|
|
-func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
|
|
|
|
- if r.Serializable {
|
|
|
|
|
- var resp *pb.RangeResponse
|
|
|
|
|
- var err error
|
|
|
|
|
- chk := func(ai *auth.AuthInfo) error {
|
|
|
|
|
- return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
|
|
|
|
- }
|
|
|
|
|
- get := func() { resp, err = s.applyV3Base.Range(nil, r) }
|
|
|
|
|
- if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
|
|
|
- return nil, serr
|
|
|
|
|
- }
|
|
|
|
|
- return resp, err
|
|
|
|
|
- }
|
|
|
|
|
- result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- if result.err != nil {
|
|
|
|
|
- return nil, result.err
|
|
|
|
|
- }
|
|
|
|
|
- return result.resp.(*pb.RangeResponse), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
|
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
|
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
|
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -161,11 +127,6 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
- // TODO: remove this checking when we release etcd 3.2
|
|
|
|
|
- if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
|
|
|
|
|
- return s.legacyTxn(ctx, r)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
if isTxnReadonly(r) {
|
|
if isTxnReadonly(r) {
|
|
|
if !isTxnSerializable(r) {
|
|
if !isTxnSerializable(r) {
|
|
|
err := s.linearizableReadNotify(ctx)
|
|
err := s.linearizableReadNotify(ctx)
|
|
@@ -194,30 +155,6 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
|
|
|
return result.resp.(*pb.TxnResponse), nil
|
|
return result.resp.(*pb.TxnResponse), nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// TODO: remove this func when we release etcd 3.2
|
|
|
|
|
-func (s *EtcdServer) legacyTxn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
|
|
|
|
|
- if isTxnSerializable(r) {
|
|
|
|
|
- var resp *pb.TxnResponse
|
|
|
|
|
- var err error
|
|
|
|
|
- chk := func(ai *auth.AuthInfo) error {
|
|
|
|
|
- return checkTxnAuth(s.authStore, ai, r)
|
|
|
|
|
- }
|
|
|
|
|
- get := func() { resp, err = s.applyV3Base.Txn(r) }
|
|
|
|
|
- if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
|
|
|
|
- return nil, serr
|
|
|
|
|
- }
|
|
|
|
|
- return resp, err
|
|
|
|
|
- }
|
|
|
|
|
- result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- if result.err != nil {
|
|
|
|
|
- return nil, result.err
|
|
|
|
|
- }
|
|
|
|
|
- return result.resp.(*pb.TxnResponse), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func isTxnSerializable(r *pb.TxnRequest) bool {
|
|
func isTxnSerializable(r *pb.TxnRequest) bool {
|
|
|
for _, u := range r.Success {
|
|
for _, u := range r.Success {
|
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|
|
if r := u.GetRequestRange(); r == nil || !r.Serializable {
|