|
|
@@ -27,7 +27,6 @@ import (
|
|
|
"os"
|
|
|
"path"
|
|
|
"regexp"
|
|
|
- "strconv"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
@@ -36,7 +35,7 @@ import (
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
|
- "github.com/coreos/etcd/pkg/strutil"
|
|
|
+ "github.com/coreos/etcd/pkg/types"
|
|
|
"github.com/coreos/etcd/raft"
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
"github.com/coreos/etcd/snap"
|
|
|
@@ -132,7 +131,7 @@ type Stats interface {
|
|
|
// StoreStats returns statistics of the store backing this EtcdServer
|
|
|
StoreStats() []byte
|
|
|
// UpdateRecvApp updates the underlying statistics in response to a receiving an Append request
|
|
|
- UpdateRecvApp(from uint64, length int64)
|
|
|
+ UpdateRecvApp(from types.ID, length int64)
|
|
|
}
|
|
|
|
|
|
type RaftTimer interface {
|
|
|
@@ -145,7 +144,7 @@ type EtcdServer struct {
|
|
|
w wait.Wait
|
|
|
done chan struct{}
|
|
|
stopped chan struct{}
|
|
|
- id uint64
|
|
|
+ id types.ID
|
|
|
attributes Attributes
|
|
|
|
|
|
Cluster *Cluster
|
|
|
@@ -184,7 +183,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
|
|
|
st := store.New()
|
|
|
var w *wal.WAL
|
|
|
var n raft.Node
|
|
|
- var id uint64
|
|
|
+ var id types.ID
|
|
|
haveWAL := wal.Exist(cfg.WALDir())
|
|
|
switch {
|
|
|
case !haveWAL && cfg.ClusterState == ClusterStateValueExisting:
|
|
|
@@ -240,9 +239,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
|
|
|
|
|
|
sstats := &stats.ServerStats{
|
|
|
Name: cfg.Name,
|
|
|
- ID: strutil.IDAsHex(id),
|
|
|
+ ID: id.String(),
|
|
|
}
|
|
|
- lstats := stats.NewLeaderStats(strutil.IDAsHex(id))
|
|
|
+ lstats := stats.NewLeaderStats(id.String())
|
|
|
|
|
|
s := &EtcdServer{
|
|
|
store: st,
|
|
|
@@ -290,7 +289,7 @@ func (s *EtcdServer) start() {
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
|
|
- if s.Cluster.IsIDRemoved(m.From) {
|
|
|
+ if s.Cluster.IsIDRemoved(types.ID(m.From)) {
|
|
|
return ErrRemoved
|
|
|
}
|
|
|
return s.node.Step(ctx, m)
|
|
|
@@ -423,8 +422,8 @@ func (s *EtcdServer) StoreStats() []byte {
|
|
|
return s.store.JsonStats()
|
|
|
}
|
|
|
|
|
|
-func (s *EtcdServer) UpdateRecvApp(from uint64, length int64) {
|
|
|
- s.stats.RecvAppendReq(strutil.IDAsHex(from), int(length))
|
|
|
+func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) {
|
|
|
+ s.stats.RecvAppendReq(from.String(), int(length))
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
|
|
@@ -436,7 +435,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
|
|
cc := raftpb.ConfChange{
|
|
|
ID: GenID(),
|
|
|
Type: raftpb.ConfChangeAddNode,
|
|
|
- NodeID: memb.ID,
|
|
|
+ NodeID: uint64(memb.ID),
|
|
|
Context: b,
|
|
|
}
|
|
|
return s.configure(ctx, cc)
|
|
|
@@ -528,7 +527,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
|
|
cancel()
|
|
|
switch err {
|
|
|
case nil:
|
|
|
- log.Printf("etcdserver: published %+v to cluster %x", s.attributes, s.Cluster.ID())
|
|
|
+ log.Printf("etcdserver: published %+v to cluster %s", s.attributes, s.Cluster.ID())
|
|
|
return
|
|
|
case ErrStopped:
|
|
|
log.Printf("etcdserver: aborting publish because server is stopped")
|
|
|
@@ -595,7 +594,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|
|
id := mustParseMemberIDFromKey(path.Dir(r.Path))
|
|
|
m := s.Cluster.Member(id)
|
|
|
if m == nil {
|
|
|
- log.Fatalf("fetch member %x should never fail", id)
|
|
|
+ log.Fatalf("fetch member %s should never fail", id)
|
|
|
}
|
|
|
if err := json.Unmarshal([]byte(r.Val), &m.Attributes); err != nil {
|
|
|
log.Fatalf("unmarshal %s should never fail: %v", r.Val, err)
|
|
|
@@ -634,18 +633,18 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
|
|
|
if err := json.Unmarshal(cc.Context, m); err != nil {
|
|
|
panic("unexpected unmarshal error")
|
|
|
}
|
|
|
- if cc.NodeID != m.ID {
|
|
|
+ if cc.NodeID != uint64(m.ID) {
|
|
|
panic("unexpected nodeID mismatch")
|
|
|
}
|
|
|
s.Cluster.AddMember(m)
|
|
|
case raftpb.ConfChangeRemoveNode:
|
|
|
- s.Cluster.RemoveMember(cc.NodeID)
|
|
|
+ s.Cluster.RemoveMember(types.ID(cc.NodeID))
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
|
|
|
- if s.Cluster.IsIDRemoved(cc.NodeID) {
|
|
|
+ if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) {
|
|
|
return ErrIDRemoved
|
|
|
}
|
|
|
switch cc.Type {
|
|
|
@@ -692,7 +691,7 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
|
|
log.Printf("etcdserver: unmarshal body error: %v", err)
|
|
|
continue
|
|
|
}
|
|
|
- id, err := strconv.ParseUint(resp.Header.Get("X-Etcd-Cluster-ID"), 16, 64)
|
|
|
+ id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
|
|
|
if err != nil {
|
|
|
log.Printf("etcdserver: parse uint error: %v", err)
|
|
|
continue
|
|
|
@@ -702,12 +701,17 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
|
|
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
|
|
}
|
|
|
|
|
|
-func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.WAL) {
|
|
|
+func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *wal.WAL) {
|
|
|
var err error
|
|
|
// TODO: remove the discoveryURL when it becomes part of the source for
|
|
|
// generating nodeID.
|
|
|
member := cfg.Cluster.MemberByName(cfg.Name)
|
|
|
- metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: member.ID, ClusterID: cfg.Cluster.ID()})
|
|
|
+ metadata := pbutil.MustMarshal(
|
|
|
+ &pb.Metadata{
|
|
|
+ NodeID: uint64(member.ID),
|
|
|
+ ClusterID: uint64(cfg.Cluster.ID()),
|
|
|
+ },
|
|
|
+ )
|
|
|
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
@@ -717,15 +721,15 @@ func startNode(cfg *ServerConfig, ids []uint64) (id uint64, n raft.Node, w *wal.
|
|
|
if err != nil {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
- peers[i] = raft.Peer{ID: id, Context: ctx}
|
|
|
+ peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
|
|
}
|
|
|
id = member.ID
|
|
|
- log.Printf("etcdserver: start node %x in cluster %x", id, cfg.Cluster.ID())
|
|
|
- n = raft.StartNode(id, peers, 10, 1)
|
|
|
+ log.Printf("etcdserver: start node %x in cluster %x", id.String(), cfg.Cluster.ID().String())
|
|
|
+ n = raft.StartNode(uint64(id), peers, 10, 1)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id uint64, n raft.Node, w *wal.WAL) {
|
|
|
+func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
|
|
|
var err error
|
|
|
// restart a node from previous wal
|
|
|
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
|
|
|
@@ -738,10 +742,10 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
|
|
|
|
|
|
var metadata pb.Metadata
|
|
|
pbutil.MustUnmarshal(&metadata, wmetadata)
|
|
|
- id = metadata.NodeID
|
|
|
- cfg.Cluster.SetID(metadata.ClusterID)
|
|
|
- log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
|
|
- n = raft.RestartNode(id, 10, 1, snapshot, st, ents)
|
|
|
+ id = types.ID(metadata.NodeID)
|
|
|
+ cfg.Cluster.SetID(types.ID(metadata.ClusterID))
|
|
|
+ log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
|
|
+ n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
|
|
|
return
|
|
|
}
|
|
|
|