|
|
@@ -43,8 +43,9 @@ import (
|
|
|
type RaftCluster struct {
|
|
|
lg *zap.Logger
|
|
|
|
|
|
- id types.ID
|
|
|
- token string
|
|
|
+ localID types.ID
|
|
|
+ cid types.ID
|
|
|
+ token string
|
|
|
|
|
|
v2store v2store.Store
|
|
|
be backend.Backend
|
|
|
@@ -75,7 +76,7 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap)
|
|
|
|
|
|
func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster {
|
|
|
c := NewCluster(lg, token)
|
|
|
- c.id = id
|
|
|
+ c.cid = id
|
|
|
for _, m := range membs {
|
|
|
c.members[m.ID] = m
|
|
|
}
|
|
|
@@ -91,7 +92,7 @@ func NewCluster(lg *zap.Logger, token string) *RaftCluster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (c *RaftCluster) ID() types.ID { return c.id }
|
|
|
+func (c *RaftCluster) ID() types.ID { return c.cid }
|
|
|
|
|
|
func (c *RaftCluster) Members() []*Member {
|
|
|
c.Lock()
|
|
|
@@ -178,7 +179,7 @@ func (c *RaftCluster) String() string {
|
|
|
c.Lock()
|
|
|
defer c.Unlock()
|
|
|
b := &bytes.Buffer{}
|
|
|
- fmt.Fprintf(b, "{ClusterID:%s ", c.id)
|
|
|
+ fmt.Fprintf(b, "{ClusterID:%s ", c.cid)
|
|
|
var ms []string
|
|
|
for _, m := range c.members {
|
|
|
ms = append(ms, fmt.Sprintf("%+v", m))
|
|
|
@@ -199,10 +200,13 @@ func (c *RaftCluster) genID() {
|
|
|
binary.BigEndian.PutUint64(b[8*i:], uint64(id))
|
|
|
}
|
|
|
hash := sha1.Sum(b)
|
|
|
- c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
|
|
|
+ c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
|
|
|
}
|
|
|
|
|
|
-func (c *RaftCluster) SetID(id types.ID) { c.id = id }
|
|
|
+func (c *RaftCluster) SetID(localID, cid types.ID) {
|
|
|
+ c.localID = localID
|
|
|
+ c.cid = cid
|
|
|
+}
|
|
|
|
|
|
func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st }
|
|
|
|
|
|
@@ -223,13 +227,14 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
|
|
for _, m := range c.members {
|
|
|
if c.lg != nil {
|
|
|
c.lg.Info(
|
|
|
- "added member from store",
|
|
|
- zap.String("cluster-id", c.id.String()),
|
|
|
- zap.String("member-id", m.ID.String()),
|
|
|
- zap.Strings("member-peer-urls", m.PeerURLs),
|
|
|
+ "recovered/added member from store",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("recovered-remote-peer-id", m.ID.String()),
|
|
|
+ zap.Strings("recovered-remote-peer-urls", m.PeerURLs),
|
|
|
)
|
|
|
} else {
|
|
|
- plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
|
|
|
+ plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.cid)
|
|
|
}
|
|
|
}
|
|
|
if c.version != nil {
|
|
|
@@ -337,12 +342,13 @@ func (c *RaftCluster) AddMember(m *Member) {
|
|
|
if c.lg != nil {
|
|
|
c.lg.Info(
|
|
|
"added member",
|
|
|
- zap.String("member-id", m.ID.String()),
|
|
|
- zap.Strings("member-peer-urls", m.PeerURLs),
|
|
|
- zap.String("cluster-id", c.id.String()),
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("removed-remote-peer-id", m.ID.String()),
|
|
|
+ zap.Strings("removed-remote-peer-peer-urls", m.PeerURLs),
|
|
|
)
|
|
|
} else {
|
|
|
- plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
|
|
|
+ plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.cid)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -358,23 +364,36 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
|
|
|
mustDeleteMemberFromBackend(c.be, id)
|
|
|
}
|
|
|
|
|
|
+ m, ok := c.members[id]
|
|
|
delete(c.members, id)
|
|
|
c.removed[id] = true
|
|
|
|
|
|
if c.lg != nil {
|
|
|
- c.lg.Info(
|
|
|
- "removed member",
|
|
|
- zap.String("member-id", id.String()),
|
|
|
- zap.String("cluster-id", c.id.String()),
|
|
|
- )
|
|
|
+ if ok {
|
|
|
+ c.lg.Info(
|
|
|
+ "removed member",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("removed-remote-peer-id", id.String()),
|
|
|
+ zap.Strings("removed-remote-peer-urls", m.PeerURLs),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ c.lg.Warn(
|
|
|
+ "skipped removing already removed member",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("removed-remote-peer-id", id.String()),
|
|
|
+ )
|
|
|
+ }
|
|
|
} else {
|
|
|
- plog.Infof("removed member %s from cluster %s", id, c.id)
|
|
|
+ plog.Infof("removed member %s from cluster %s", id, c.cid)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
|
|
|
c.Lock()
|
|
|
defer c.Unlock()
|
|
|
+
|
|
|
if m, ok := c.members[id]; ok {
|
|
|
m.Attributes = attr
|
|
|
if c.v2store != nil {
|
|
|
@@ -385,17 +404,28 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
_, ok := c.removed[id]
|
|
|
if !ok {
|
|
|
if c.lg != nil {
|
|
|
- c.lg.Panic("failed to update; member unknown", zap.String("member-id", id.String()))
|
|
|
+ c.lg.Panic(
|
|
|
+ "failed to update; member unknown",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("unknown-remote-peer-id", id.String()),
|
|
|
+ )
|
|
|
} else {
|
|
|
plog.Panicf("error updating attributes of unknown member %s", id)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if c.lg != nil {
|
|
|
- c.lg.Warn("skipped attributes update of removed member", zap.String("member-id", id.String()))
|
|
|
+ c.lg.Warn(
|
|
|
+ "skipped attributes update of removed member",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("removed-remote-peer-id", id.String()),
|
|
|
+ )
|
|
|
} else {
|
|
|
plog.Warningf("skipped updating attributes of removed member %s", id)
|
|
|
}
|
|
|
@@ -416,12 +446,13 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
|
|
|
if c.lg != nil {
|
|
|
c.lg.Info(
|
|
|
"updated member",
|
|
|
- zap.String("member-id", id.String()),
|
|
|
- zap.Strings("member-peer-urls", raftAttr.PeerURLs),
|
|
|
- zap.String("cluster-id", c.id.String()),
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
+ zap.String("updated-remote-peer-id", id.String()),
|
|
|
+ zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
|
|
|
)
|
|
|
} else {
|
|
|
- plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
|
|
|
+ plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.cid)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -441,6 +472,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
|
|
|
if c.lg != nil {
|
|
|
c.lg.Info(
|
|
|
"updated cluster version",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
zap.String("from", version.Cluster(c.version.String())),
|
|
|
zap.String("from", version.Cluster(ver.String())),
|
|
|
)
|
|
|
@@ -451,6 +484,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
|
|
|
if c.lg != nil {
|
|
|
c.lg.Info(
|
|
|
"set initial cluster version",
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
zap.String("cluster-version", version.Cluster(ver.String())),
|
|
|
)
|
|
|
} else {
|
|
|
@@ -497,6 +532,8 @@ func (c *RaftCluster) IsReadyToAddNewMember() bool {
|
|
|
"rejecting member add; started member will be less than quorum",
|
|
|
zap.Int("number-of-started-member", nstarted),
|
|
|
zap.Int("quorum", nquorum),
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
)
|
|
|
} else {
|
|
|
plog.Warningf("Reject add member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
|
|
|
@@ -529,6 +566,8 @@ func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool {
|
|
|
"rejecting member remove; started member will be less than quorum",
|
|
|
zap.Int("number-of-started-member", nstarted),
|
|
|
zap.Int("quorum", nquorum),
|
|
|
+ zap.String("cluster-id", c.cid.String()),
|
|
|
+ zap.String("local-member-id", c.localID.String()),
|
|
|
)
|
|
|
} else {
|
|
|
plog.Warningf("Reject remove member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
|