Browse Source

etcdserver, api, membership: don't race on setting version

Fixes #6029
Anthony Romano 9 years ago
parent
commit
de2c3ec3db

+ 16 - 31
etcdserver/api/capability.go

@@ -16,9 +16,7 @@ package api
 
 import (
 	"sync"
-	"time"
 
-	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/go-semver/semver"
 	"github.com/coreos/pkg/capnslog"
@@ -43,45 +41,32 @@ var (
 		"3.0.0": {AuthCapability: true, V3rpcCapability: true},
 	}
 
-	// capLoopOnce ensures we only create one capability monitor goroutine
-	capLoopOnce sync.Once
-
 	enableMapMu sync.RWMutex
 	// enabledMap points to a map in capabilityMaps
 	enabledMap map[Capability]bool
+
+	curVersion *semver.Version
 )
 
 func init() {
 	enabledMap = make(map[Capability]bool)
 }
 
-// RunCapabilityLoop checks the cluster version every 500ms and updates
-// the enabledMap when the cluster version increased.
-func RunCapabilityLoop(s *etcdserver.EtcdServer) {
-	go capLoopOnce.Do(func() { runCapabilityLoop(s) })
-}
-
-func runCapabilityLoop(s *etcdserver.EtcdServer) {
-	stopped := s.StopNotify()
-
-	var pv *semver.Version
-	for {
-		if v := s.ClusterVersion(); v != pv {
-			if pv == nil || (v != nil && pv.LessThan(*v)) {
-				pv = v
-				enableMapMu.Lock()
-				enabledMap = capabilityMaps[pv.String()]
-				enableMapMu.Unlock()
-				plog.Infof("enabled capabilities for version %s", version.Cluster(pv.String()))
-			}
-		}
-
-		select {
-		case <-stopped:
-			return
-		case <-time.After(500 * time.Millisecond):
-		}
+// UpdateCapability updates the enabledMap when the cluster version increases.
+func UpdateCapability(v *semver.Version) {
+	if v == nil {
+		// if recovered but version was never set by cluster
+		return
+	}
+	enableMapMu.Lock()
+	if curVersion != nil && !curVersion.LessThan(*v) {
+		enableMapMu.Unlock()
+		return
 	}
+	curVersion = v
+	enabledMap = capabilityMaps[curVersion.String()]
+	enableMapMu.Unlock()
+	plog.Infof("enabled capabilities for version %s", version.Cluster(v.String()))
 }
 
 func IsCapabilityEnabled(c Capability) bool {

+ 0 - 1
etcdserver/api/v2http/client.go

@@ -130,7 +130,6 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
 		mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
 	}
 
-	api.RunCapabilityLoop(server)
 	return requestLogger(mux)
 }
 

+ 0 - 2
etcdserver/api/v3rpc/grpc.go

@@ -18,7 +18,6 @@ import (
 	"crypto/tls"
 
 	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/etcdserver/api"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/pkg/capnslog"
 	"google.golang.org/grpc"
@@ -47,6 +46,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
 	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
 	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
 
-	api.RunCapabilityLoop(s)
 	return grpcServer
 }

+ 2 - 1
etcdserver/apply_v2.go

@@ -19,6 +19,7 @@ import (
 	"path"
 	"time"
 
+	"github.com/coreos/etcd/etcdserver/api"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -86,7 +87,7 @@ func (a *applierV2store) Put(r *pb.Request) Response {
 		}
 		if r.Path == membership.StoreClusterVersionKey() {
 			if a.cluster != nil {
-				a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
+				a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
 			}
 			// return an empty response since there is no consumer.
 			return Response{}

+ 4 - 2
etcdserver/membership/cluster.go

@@ -200,13 +200,14 @@ func (c *RaftCluster) SetBackend(be backend.Backend) {
 	mustCreateBackendBuckets(c.be)
 }
 
-func (c *RaftCluster) Recover() {
+func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
 	c.Lock()
 	defer c.Unlock()
 
 	c.members, c.removed = membersFromStore(c.store)
 	c.version = clusterVersionFromStore(c.store)
 	mustDetectDowngrade(c.version)
+	onSet(c.version)
 
 	for _, m := range c.members {
 		plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
@@ -356,7 +357,7 @@ func (c *RaftCluster) Version() *semver.Version {
 	return semver.Must(semver.NewVersion(c.version.String()))
 }
 
-func (c *RaftCluster) SetVersion(ver *semver.Version) {
+func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
 	c.Lock()
 	defer c.Unlock()
 	if c.version != nil {
@@ -372,6 +373,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version) {
 	if c.be != nil {
 		mustSaveClusterVersionToBackend(c.be, ver)
 	}
+	onSet(ver)
 }
 
 func (c *RaftCluster) IsReadyToAddNewMember() bool {

+ 3 - 2
etcdserver/server.go

@@ -31,6 +31,7 @@ import (
 	"github.com/coreos/etcd/auth"
 	"github.com/coreos/etcd/compactor"
 	"github.com/coreos/etcd/discovery"
+	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
@@ -342,7 +343,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 		}
 		cl.SetStore(st)
 		cl.SetBackend(be)
-		cl.Recover()
+		cl.Recover(api.UpdateCapability)
 		if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
 			os.RemoveAll(bepath)
 			return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
@@ -705,7 +706,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 
 	s.cluster.SetBackend(s.be)
 	plog.Info("recovering cluster configuration...")
-	s.cluster.Recover()
+	s.cluster.Recover(api.UpdateCapability)
 	plog.Info("finished recovering cluster configuration")
 
 	plog.Info("removing old peers from network...")