فهرست منبع

etcdserver: save cluster version into backend

Xiang Li 9 سال پیش
والد
کامیت
e9735b7bd0
3فایلهای تغییر یافته به همراه39 افزوده شده و 4 حذف شده
  1. 7 1
      etcdserver/membership/cluster.go
  2. 28 1
      etcdserver/membership/store.go
  3. 4 2
      etcdserver/server.go

+ 7 - 1
etcdserver/membership/cluster.go

@@ -197,7 +197,7 @@ func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
 
 
 func (c *RaftCluster) SetBackend(be backend.Backend) {
 func (c *RaftCluster) SetBackend(be backend.Backend) {
 	c.be = be
 	c.be = be
-	mustCreateBackendMemberBucket(c.be)
+	mustCreateBackendBuckets(c.be)
 }
 }
 
 
 func (c *RaftCluster) Recover() {
 func (c *RaftCluster) Recover() {
@@ -360,6 +360,12 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
 	}
 	}
 	c.version = ver
 	c.version = ver
 	mustDetectDowngrade(c.version)
 	mustDetectDowngrade(c.version)
+	if c.store != nil {
+		mustSaveClusterVersionToStore(c.store, ver)
+	}
+	if c.be != nil {
+		mustSaveClusterVersionToBackend(c.be, ver)
+	}
 }
 }
 
 
 func (c *RaftCluster) IsReadyToAddNewMember() bool {
 func (c *RaftCluster) IsReadyToAddNewMember() bool {

+ 28 - 1
etcdserver/membership/store.go

@@ -22,6 +22,8 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
+
+	"github.com/coreos/go-semver/semver"
 )
 )
 
 
 const (
 const (
@@ -35,6 +37,7 @@ const (
 var (
 var (
 	membersBucketName        = []byte("members")
 	membersBucketName        = []byte("members")
 	membersRemovedBuckedName = []byte("members_removed")
 	membersRemovedBuckedName = []byte("members_removed")
+	clusterBucketName        = []byte("cluster")
 
 
 	StoreMembersPrefix        = path.Join(storePrefix, "members")
 	StoreMembersPrefix        = path.Join(storePrefix, "members")
 	storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
 	storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
@@ -63,6 +66,15 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
 	tx.Unlock()
 	tx.Unlock()
 }
 }
 
 
+func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
+	ckey := backendClusterVersionKey()
+
+	tx := be.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
+}
+
 func mustSaveMemberToStore(s store.Store, m *Member) {
 func mustSaveMemberToStore(s store.Store, m *Member) {
 	b, err := json.Marshal(m.RaftAttributes)
 	b, err := json.Marshal(m.RaftAttributes)
 	if err != nil {
 	if err != nil {
@@ -105,6 +117,12 @@ func mustUpdateMemberAttrInStore(s store.Store, m *Member) {
 	}
 	}
 }
 }
 
 
+func mustSaveClusterVersionToStore(s store.Store, ver *semver.Version) {
+	if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
+		plog.Panicf("save cluster version should never fail: %v", err)
+	}
+}
+
 // nodeToMember builds member from a key value node.
 // nodeToMember builds member from a key value node.
 // the child nodes of the given node MUST be sorted by key.
 // the child nodes of the given node MUST be sorted by key.
 func nodeToMember(n *store.NodeExtern) (*Member, error) {
 func nodeToMember(n *store.NodeExtern) (*Member, error) {
@@ -137,18 +155,27 @@ func backendMemberKey(id types.ID) []byte {
 	return []byte(id.String())
 	return []byte(id.String())
 }
 }
 
 
-func mustCreateBackendMemberBucket(be backend.Backend) {
+func backendClusterVersionKey() []byte {
+	return []byte("clusterVersion")
+}
+
+func mustCreateBackendBuckets(be backend.Backend) {
 	tx := be.BatchTx()
 	tx := be.BatchTx()
 	tx.Lock()
 	tx.Lock()
 	defer tx.Unlock()
 	defer tx.Unlock()
 	tx.UnsafeCreateBucket(membersBucketName)
 	tx.UnsafeCreateBucket(membersBucketName)
 	tx.UnsafeCreateBucket(membersRemovedBuckedName)
 	tx.UnsafeCreateBucket(membersRemovedBuckedName)
+	tx.UnsafeCreateBucket(clusterBucketName)
 }
 }
 
 
 func MemberStoreKey(id types.ID) string {
 func MemberStoreKey(id types.ID) string {
 	return path.Join(StoreMembersPrefix, id.String())
 	return path.Join(StoreMembersPrefix, id.String())
 }
 }
 
 
+func StoreClusterVersionKey() string {
+	return path.Join(storePrefix, "version")
+}
+
 func MemberAttributesStorePath(id types.ID) string {
 func MemberAttributesStorePath(id types.ID) string {
 	return path.Join(MemberStoreKey(id), attributesSuffix)
 	return path.Join(MemberStoreKey(id), attributesSuffix)
 }
 }

+ 4 - 2
etcdserver/server.go

@@ -1123,8 +1123,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 				// return an empty response since there is no consumer.
 				// return an empty response since there is no consumer.
 				return Response{}
 				return Response{}
 			}
 			}
-			if r.Path == path.Join(StoreClusterPrefix, "version") {
+			if r.Path == membership.StoreClusterVersionKey() {
 				s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
 				s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
+				// return an empty response since there is no consumer.
+				return Response{}
 			}
 			}
 			return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
 			return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
 		}
 		}
@@ -1312,7 +1314,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
 	}
 	}
 	req := pb.Request{
 	req := pb.Request{
 		Method: "PUT",
 		Method: "PUT",
-		Path:   path.Join(StoreClusterPrefix, "version"),
+		Path:   membership.StoreClusterVersionKey(),
 		Val:    ver,
 		Val:    ver,
 	}
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
 	ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())