Browse Source

Merge pull request #2718 from xiang90/version

support cluster-wide version sync
Xiang Li 10 năm trước cách đây
mục cha
commit
a4e35f4650

+ 4 - 0
Godeps/Godeps.json

@@ -24,6 +24,10 @@
 			"ImportPath": "github.com/coreos/pkg/capnslog",
 			"Rev": "9d5dd4632f9ece71bdf83d31253593a633e73df5"
 		},
+		{
+			"ImportPath": "github.com/coreos/go-semver/semver",
+			"Rev": "568e959cd89871e61434c1143528d9162da89ef2"
+		},
 		{
 			"ImportPath": "github.com/gogo/protobuf/proto",
 			"Rev": "bc946d07d1016848dfd2507f90f0859c9471681e"

+ 209 - 0
Godeps/_workspace/src/github.com/coreos/go-semver/semver/semver.go

@@ -0,0 +1,209 @@
+package semver
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+type Version struct {
+	Major      int64
+	Minor      int64
+	Patch      int64
+	PreRelease PreRelease
+	Metadata   string
+}
+
+type PreRelease string
+
+func splitOff(input *string, delim string) (val string) {
+	parts := strings.SplitN(*input, delim, 2)
+
+	if len(parts) == 2 {
+		*input = parts[0]
+		val = parts[1]
+	}
+
+	return val
+}
+
+func NewVersion(version string) (*Version, error) {
+	v := Version{}
+
+	dotParts := strings.SplitN(version, ".", 3)
+
+	if len(dotParts) != 3 {
+		return nil, errors.New(fmt.Sprintf("%s is not in dotted-tri format", version))
+	}
+
+	v.Metadata = splitOff(&dotParts[2], "+")
+	v.PreRelease = PreRelease(splitOff(&dotParts[2], "-"))
+
+	parsed := make([]int64, 3, 3)
+
+	for i, v := range dotParts[:3] {
+		val, err := strconv.ParseInt(v, 10, 64)
+		parsed[i] = val
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	v.Major = parsed[0]
+	v.Minor = parsed[1]
+	v.Patch = parsed[2]
+
+	return &v, nil
+}
+
+func Must(v *Version, err error) *Version {
+	if err != nil {
+		panic(err)
+	}
+	return v
+}
+
+func (v *Version) String() string {
+	var buffer bytes.Buffer
+
+	base := fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
+	buffer.WriteString(base)
+
+	if v.PreRelease != "" {
+		buffer.WriteString(fmt.Sprintf("-%s", v.PreRelease))
+	}
+
+	if v.Metadata != "" {
+		buffer.WriteString(fmt.Sprintf("+%s", v.Metadata))
+	}
+
+	return buffer.String()
+}
+
+func (v *Version) LessThan(versionB Version) bool {
+	versionA := *v
+	cmp := recursiveCompare(versionA.Slice(), versionB.Slice())
+
+	if cmp == 0 {
+		cmp = preReleaseCompare(versionA, versionB)
+	}
+
+	if cmp == -1 {
+		return true
+	}
+
+	return false
+}
+
+/* Slice converts the comparable parts of the semver into a slice of strings */
+func (v *Version) Slice() []int64 {
+	return []int64{v.Major, v.Minor, v.Patch}
+}
+
+func (p *PreRelease) Slice() []string {
+	preRelease := string(*p)
+	return strings.Split(preRelease, ".")
+}
+
+func preReleaseCompare(versionA Version, versionB Version) int {
+	a := versionA.PreRelease
+	b := versionB.PreRelease
+
+	/* Handle the case where if two versions are otherwise equal it is the
+	 * one without a PreRelease that is greater */
+	if len(a) == 0 && (len(b) > 0) {
+		return 1
+	} else if len(b) == 0 && (len(a) > 0) {
+		return -1
+	}
+
+	// If there is a prelease, check and compare each part.
+	return recursivePreReleaseCompare(a.Slice(), b.Slice())
+}
+
+func recursiveCompare(versionA []int64, versionB []int64) int {
+	if len(versionA) == 0 {
+		return 0
+	}
+
+	a := versionA[0]
+	b := versionB[0]
+
+	if a > b {
+		return 1
+	} else if a < b {
+		return -1
+	}
+
+	return recursiveCompare(versionA[1:], versionB[1:])
+}
+
+func recursivePreReleaseCompare(versionA []string, versionB []string) int {
+	// Handle slice length disparity.
+	if len(versionA) == 0 {
+		// Nothing to compare too, so we return 0
+		return 0
+	} else if len(versionB) == 0 {
+		// We're longer than versionB so return 1.
+		return 1
+	}
+
+	a := versionA[0]
+	b := versionB[0]
+
+	aInt := false; bInt := false
+
+	aI, err := strconv.Atoi(versionA[0])
+	if err == nil {
+		aInt = true
+	}
+
+	bI, err := strconv.Atoi(versionB[0])
+	if err == nil {
+		bInt = true
+	}
+
+	// Handle Integer Comparison
+	if aInt && bInt {
+		if aI > bI {
+			return 1
+		} else if aI < bI {
+			return -1
+		}
+	}
+
+	// Handle String Comparison
+	if a > b {
+		return 1
+	} else if a < b {
+		return -1
+	}
+
+	return recursivePreReleaseCompare(versionA[1:], versionB[1:])
+}
+
+// BumpMajor increments the Major field by 1 and resets all other fields to their default values
+func (v *Version) BumpMajor() {
+	v.Major += 1
+	v.Minor = 0
+	v.Patch = 0
+	v.PreRelease = PreRelease("")
+	v.Metadata = ""
+}
+
+// BumpMinor increments the Minor field by 1 and resets all other fields to their default values
+func (v *Version) BumpMinor() {
+	v.Minor += 1
+	v.Patch = 0
+	v.PreRelease = PreRelease("")
+	v.Metadata = ""
+}
+
+// BumpPatch increments the Patch field by 1 and resets all other fields to their default values
+func (v *Version) BumpPatch() {
+	v.Patch += 1
+	v.PreRelease = PreRelease("")
+	v.Metadata = ""
+}

+ 223 - 0
Godeps/_workspace/src/github.com/coreos/go-semver/semver/semver_test.go

@@ -0,0 +1,223 @@
+package semver
+
+import (
+	"errors"
+	"math/rand"
+	"reflect"
+	"testing"
+	"time"
+)
+
+type fixture struct {
+	greaterVersion string
+	lesserVersion  string
+}
+
+var fixtures = []fixture{
+	fixture{"0.0.0", "0.0.0-foo"},
+	fixture{"0.0.1", "0.0.0"},
+	fixture{"1.0.0", "0.9.9"},
+	fixture{"0.10.0", "0.9.0"},
+	fixture{"0.99.0", "0.10.0"},
+	fixture{"2.0.0", "1.2.3"},
+	fixture{"0.0.0", "0.0.0-foo"},
+	fixture{"0.0.1", "0.0.0"},
+	fixture{"1.0.0", "0.9.9"},
+	fixture{"0.10.0", "0.9.0"},
+	fixture{"0.99.0", "0.10.0"},
+	fixture{"2.0.0", "1.2.3"},
+	fixture{"0.0.0", "0.0.0-foo"},
+	fixture{"0.0.1", "0.0.0"},
+	fixture{"1.0.0", "0.9.9"},
+	fixture{"0.10.0", "0.9.0"},
+	fixture{"0.99.0", "0.10.0"},
+	fixture{"2.0.0", "1.2.3"},
+	fixture{"1.2.3", "1.2.3-asdf"},
+	fixture{"1.2.3", "1.2.3-4"},
+	fixture{"1.2.3", "1.2.3-4-foo"},
+	fixture{"1.2.3-5-foo", "1.2.3-5"},
+	fixture{"1.2.3-5", "1.2.3-4"},
+	fixture{"1.2.3-5-foo", "1.2.3-5-Foo"},
+	fixture{"3.0.0", "2.7.2+asdf"},
+	fixture{"3.0.0+foobar", "2.7.2"},
+	fixture{"1.2.3-a.10", "1.2.3-a.5"},
+	fixture{"1.2.3-a.b", "1.2.3-a.5"},
+	fixture{"1.2.3-a.b", "1.2.3-a"},
+	fixture{"1.2.3-a.b.c.10.d.5", "1.2.3-a.b.c.5.d.100"},
+	fixture{"1.0.0", "1.0.0-rc.1"},
+	fixture{"1.0.0-rc.2", "1.0.0-rc.1"},
+	fixture{"1.0.0-rc.1", "1.0.0-beta.11"},
+	fixture{"1.0.0-beta.11", "1.0.0-beta.2"},
+	fixture{"1.0.0-beta.2", "1.0.0-beta"},
+	fixture{"1.0.0-beta", "1.0.0-alpha.beta"},
+	fixture{"1.0.0-alpha.beta", "1.0.0-alpha.1"},
+	fixture{"1.0.0-alpha.1", "1.0.0-alpha"},
+}
+
+func TestCompare(t *testing.T) {
+	for _, v := range fixtures {
+		gt, err := NewVersion(v.greaterVersion)
+		if err != nil {
+			t.Error(err)
+		}
+
+		lt, err := NewVersion(v.lesserVersion)
+		if err != nil {
+			t.Error(err)
+		}
+
+		if gt.LessThan(*lt) == true {
+			t.Errorf("%s should not be less than %s", gt, lt)
+		}
+	}
+}
+
+func testString(t *testing.T, orig string, version *Version) {
+	if orig != version.String() {
+		t.Errorf("%s != %s", orig, version)
+	}
+}
+
+func TestString(t *testing.T) {
+	for _, v := range fixtures {
+		gt, err := NewVersion(v.greaterVersion)
+		if err != nil {
+			t.Error(err)
+		}
+		testString(t, v.greaterVersion, gt)
+
+		lt, err := NewVersion(v.lesserVersion)
+		if err != nil {
+			t.Error(err)
+		}
+		testString(t, v.lesserVersion, lt)
+	}
+}
+
+func shuffleStringSlice(src []string) []string {
+	dest := make([]string, len(src))
+	rand.Seed(time.Now().Unix())
+	perm := rand.Perm(len(src))
+	for i, v := range perm {
+		dest[v] = src[i]
+	}
+	return dest
+}
+
+func TestSort(t *testing.T) {
+	sortedVersions := []string{"1.0.0", "1.0.2", "1.2.0", "3.1.1"}
+	unsortedVersions := shuffleStringSlice(sortedVersions)
+
+	semvers := []*Version{}
+	for _, v := range unsortedVersions {
+		sv, err := NewVersion(v)
+		if err != nil {
+			t.Fatal(err)
+		}
+		semvers = append(semvers, sv)
+	}
+
+	Sort(semvers)
+
+	for idx, sv := range semvers {
+		if sv.String() != sortedVersions[idx] {
+			t.Fatalf("incorrect sort at index %v", idx)
+		}
+	}
+}
+
+func TestBumpMajor(t *testing.T) {
+	version, _ := NewVersion("1.0.0")
+	version.BumpMajor()
+	if version.Major != 2 {
+		t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
+	}
+
+	version, _ = NewVersion("1.5.2")
+	version.BumpMajor()
+	if version.Minor != 0 && version.Patch != 0 {
+		t.Fatalf("bumping major on 1.5.2 resulted in %v", version)
+	}
+
+	version, _ = NewVersion("1.0.0+build.1-alpha.1")
+	version.BumpMajor()
+	if version.PreRelease != "" && version.PreRelease != "" {
+		t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
+	}
+}
+
+func TestBumpMinor(t *testing.T) {
+	version, _ := NewVersion("1.0.0")
+	version.BumpMinor()
+
+	if version.Major != 1 {
+		t.Fatalf("bumping minor on 1.0.0 resulted in %v", version)
+	}
+
+	if version.Minor != 1 {
+		t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
+	}
+
+	version, _ = NewVersion("1.0.0+build.1-alpha.1")
+	version.BumpMinor()
+	if version.PreRelease != "" && version.PreRelease != "" {
+		t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
+	}
+}
+
+func TestBumpPatch(t *testing.T) {
+	version, _ := NewVersion("1.0.0")
+	version.BumpPatch()
+
+	if version.Major != 1 {
+		t.Fatalf("bumping minor on 1.0.0 resulted in %v", version)
+	}
+
+	if version.Minor != 0 {
+		t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
+	}
+
+	if version.Patch != 1 {
+		t.Fatalf("bumping major on 1.0.0 resulted in %v", version)
+	}
+
+	version, _ = NewVersion("1.0.0+build.1-alpha.1")
+	version.BumpPatch()
+	if version.PreRelease != "" && version.PreRelease != "" {
+		t.Fatalf("bumping major on 1.0.0+build.1-alpha.1 resulted in %v", version)
+	}
+}
+
+func TestMust(t *testing.T) {
+	tests := []struct {
+		versionStr string
+
+		version *Version
+		recov   interface{}
+	}{
+		{
+			versionStr: "1.0.0",
+			version:    &Version{Major: 1},
+		},
+		{
+			versionStr: "version number",
+			recov:      errors.New("version number is not in dotted-tri format"),
+		},
+	}
+
+	for _, tt := range tests {
+		func() {
+			defer func() {
+				recov := recover()
+				if !reflect.DeepEqual(tt.recov, recov) {
+					t.Fatalf("incorrect panic for %q: want %v, got %v", tt.versionStr, tt.recov, recov)
+				}
+			}()
+
+			version := Must(NewVersion(tt.versionStr))
+			if !reflect.DeepEqual(tt.version, version) {
+				t.Fatalf("incorrect version for %q: want %+v, got %+v", tt.versionStr, tt.version, version)
+			}
+		}()
+	}
+}

+ 24 - 0
Godeps/_workspace/src/github.com/coreos/go-semver/semver/sort.go

@@ -0,0 +1,24 @@
+package semver
+
+import (
+	"sort"
+)
+
+type Versions []*Version
+
+func (s Versions) Len() int {
+	return len(s)
+}
+
+func (s Versions) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func (s Versions) Less(i, j int) bool {
+	return s[i].LessThan(*s[j])
+}
+
+// Sort sorts the given slice of Version
+func Sort(versions []*Version) {
+	sort.Sort(Versions(versions))
+}

+ 50 - 0
etcdserver/cluster_util.go

@@ -23,7 +23,9 @@ import (
 	"sort"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/version"
 )
 
 // isMemberBootstrapped tries to check if the given member has been bootstrapped
@@ -106,3 +108,51 @@ func getRemotePeerURLs(cl ClusterInfo, local string) []string {
 	sort.Strings(us)
 	return us
 }
+
+// getVersions returns the versions of the members in the given cluster.
+// The key of the returned map is the member's ID. The value of the returned map
+// is the semver version string. If it fails to get the version of a member, the key
+// will be an empty string.
+func getVersions(cl ClusterInfo, tr *http.Transport) map[string]string {
+	members := cl.Members()
+	vers := make(map[string]string)
+	for _, m := range members {
+		ver, err := getVersion(m, tr)
+		if err != nil {
+			log.Printf("etcdserver: cannot get the version of member %s (%v)", m.ID, err)
+			vers[m.ID.String()] = ""
+		} else {
+			vers[m.ID.String()] = ver
+		}
+	}
+	return vers
+}
+
+// decideClusterVersion decides the cluster version based on the versions map.
+// The returned version is the min version in the map, or nil if the min
+// version in unknown.
+func decideClusterVersion(vers map[string]string) *semver.Version {
+	var cv *semver.Version
+	lv := semver.Must(semver.NewVersion(version.Version))
+
+	for mid, ver := range vers {
+		if len(ver) == 0 {
+			return nil
+		}
+		v, err := semver.NewVersion(ver)
+		if err != nil {
+			log.Printf("etcdserver: cannot understand the version of member %s (%v)", mid, err)
+			return nil
+		}
+		if lv.LessThan(*v) {
+			log.Printf("etcdserver: the etcd version %s is not up-to-date", lv.String())
+			log.Printf("etcdserver: member %s has a higher version %s", mid, ver)
+		}
+		if cv == nil {
+			cv = v
+		} else if v.LessThan(*cv) {
+			cv = v
+		}
+	}
+	return cv
+}

+ 58 - 0
etcdserver/cluster_util_test.go

@@ -0,0 +1,58 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
+)
+
+func TestDecideClusterVersion(t *testing.T) {
+	tests := []struct {
+		vers  map[string]string
+		wdver *semver.Version
+	}{
+		{
+			map[string]string{"a": "2.0.0"},
+			semver.Must(semver.NewVersion("2.0.0")),
+		},
+		// unknow
+		{
+			map[string]string{"a": ""},
+			nil,
+		},
+		{
+			map[string]string{"a": "2.0.0", "b": "2.1.0", "c": "2.1.0"},
+			semver.Must(semver.NewVersion("2.0.0")),
+		},
+		{
+			map[string]string{"a": "2.1.0", "b": "2.1.0", "c": "2.1.0"},
+			semver.Must(semver.NewVersion("2.1.0")),
+		},
+		{
+			map[string]string{"a": "", "b": "2.1.0", "c": "2.1.0"},
+			nil,
+		},
+	}
+
+	for i, tt := range tests {
+		dver := decideClusterVersion(tt.vers)
+		if !reflect.DeepEqual(dver, tt.wdver) {
+			t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
+		}
+	}
+}

+ 4 - 0
etcdserver/etcdhttp/client_test.go

@@ -28,6 +28,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	etcdErr "github.com/coreos/etcd/error"
@@ -116,6 +117,8 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) er
 	return nil
 }
 
+func (s *serverRecorder) ClusterVersion() *semver.Version { return nil }
+
 type action struct {
 	name   string
 	params []interface{}
@@ -149,6 +152,7 @@ func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error
 func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error    { return nil }
 func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error            { return nil }
 func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil }
+func (rs *resServer) ClusterVersion() *semver.Version                           { return nil }
 
 func boolp(b bool) *bool { return &b }
 

+ 3 - 0
etcdserver/etcdhttp/http_test.go

@@ -21,6 +21,7 @@ import (
 	"sort"
 	"testing"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/etcdserver"
@@ -74,6 +75,8 @@ func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) erro
 	return fs.err
 }
 
+func (fs *errServer) ClusterVersion() *semver.Version { return nil }
+
 func TestWriteError(t *testing.T) {
 	// nil error should not panic
 	rec := httptest.NewRecorder()

+ 34 - 0
etcdserver/member.go

@@ -19,14 +19,17 @@ import (
 	"encoding/binary"
 	"encoding/json"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"math/rand"
+	"net/http"
 	"path"
 	"sort"
 	"time"
 
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/store"
+	"github.com/coreos/etcd/version"
 )
 
 // RaftAttributes represents the raft related attributes of an etcd member.
@@ -149,6 +152,37 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) {
 	return m, nil
 }
 
+// getVersion returns the version of the given member via its
+// peerURLs. Returns the last error if it fails to get the version.
+func getVersion(m *Member, tr *http.Transport) (string, error) {
+	cc := &http.Client{
+		Transport: tr,
+		Timeout:   time.Second,
+	}
+	var (
+		err  error
+		resp *http.Response
+	)
+
+	for _, u := range m.PeerURLs {
+		resp, err = cc.Get(u + "/version")
+		if err != nil {
+			continue
+		}
+		b, err := ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		if err != nil {
+			continue
+		}
+		var vers version.Versions
+		if err := json.Unmarshal(b, &vers); err != nil {
+			continue
+		}
+		return vers.Server, nil
+	}
+	return "", err
+}
+
 // implement sort by ID interface
 type SortableMemberSlice []*Member
 

+ 63 - 8
etcdserver/server.go

@@ -23,9 +23,11 @@ import (
 	"net/http"
 	"path"
 	"regexp"
+	"sync"
 	"sync/atomic"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
@@ -59,7 +61,8 @@ const (
 	StoreAdminPrefix = "/0"
 	StoreKeysPrefix  = "/1"
 
-	purgeFileInterval = 30 * time.Second
+	purgeFileInterval      = 30 * time.Second
+	monitorVersionInterval = 10 * time.Second
 )
 
 var (
@@ -119,6 +122,17 @@ type Server interface {
 	// UpdateMember attempts to update a existing member in the cluster. It will
 	// return ErrIDNotFound if the member ID does not exist.
 	UpdateMember(ctx context.Context, updateMemb Member) error
+
+	// ClusterVersion is the cluster-wide minimum major.minor version.
+	// Cluster version is set to the min version that a etcd member is
+	// compatible with when first bootstrap.
+	//
+	// During a rolling upgrades, the ClusterVersion will be updated
+	// automatically after a sync. (10 second by default)
+	//
+	// The API/raft component can utilize ClusterVersion to determine if
+	// it can accept a client request or a raft RPC.
+	ClusterVersion() *semver.Version
 }
 
 // EtcdServer is the production implementation of the Server interface
@@ -145,6 +159,9 @@ type EtcdServer struct {
 	SyncTicker <-chan time.Time
 
 	reqIDGen *idutil.Generator
+
+	verMu          sync.Mutex
+	clusterVersion *semver.Version
 }
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -263,13 +280,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 			raftStorage: s,
 			storage:     NewStorage(w, ss),
 		},
-		id:         id,
-		attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
-		Cluster:    cfg.Cluster,
-		stats:      sstats,
-		lstats:     lstats,
-		SyncTicker: time.Tick(500 * time.Millisecond),
-		reqIDGen:   idutil.NewGenerator(uint8(id), time.Now()),
+		id:             id,
+		attributes:     Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
+		Cluster:        cfg.Cluster,
+		stats:          sstats,
+		lstats:         lstats,
+		SyncTicker:     time.Tick(500 * time.Millisecond),
+		reqIDGen:       idutil.NewGenerator(uint8(id), time.Now()),
+		clusterVersion: semver.Must(semver.NewVersion(version.MinClusterVersion)),
 	}
 
 	// TODO: move transport initialization near the definition of remote
@@ -297,6 +315,7 @@ func (s *EtcdServer) Start() {
 	go s.publish(defaultPublishRetryInterval)
 	go s.purgeFile()
 	go monitorFileDescriptor(s.done)
+	go s.monitorVersions()
 }
 
 // start prepares and starts server in a new goroutine. It is no longer safe to
@@ -862,3 +881,39 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
 
 func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
+
+func (s *EtcdServer) ClusterVersion() *semver.Version {
+	s.verMu.Lock()
+	defer s.verMu.Unlock()
+	// deep copy
+	return semver.Must(semver.NewVersion(s.clusterVersion.String()))
+}
+
+// monitorVersions checks the member's version every monitorVersion interval.
+// It updates the cluster version if all members agrees on a higher one.
+// It prints out log if there is a member with a higher version than the
+// local version.
+func (s *EtcdServer) monitorVersions() {
+	for {
+		select {
+		case <-time.After(monitorVersionInterval):
+			v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
+			if v == nil {
+				continue
+			}
+
+			s.verMu.Lock()
+			// clear patch version
+			v.Patch = 0
+			if s.clusterVersion.LessThan(*v) {
+				log.Printf("etcdsever: updated the cluster version from %v to %v", s.clusterVersion, v.String())
+				// TODO: persist the version upgrade via raft. Then etcdserver will be able to use the
+				// upgraded version without syncing with others after a restart.
+				s.clusterVersion = v
+			}
+			s.verMu.Unlock()
+		case <-s.done:
+			return
+		}
+	}
+}

+ 3 - 1
version/version.go

@@ -25,7 +25,9 @@ import (
 )
 
 var (
-	Version = "2.1.0-alpha.0+git"
+	// MinClusterVersion is the min cluster version this etcd binary is compatible with.
+	MinClusterVersion = "2.0.0"
+	Version           = "2.1.0-alpha.0+git"
 )
 
 // WalVersion is an enum for versions of etcd logs.