|
@@ -41,6 +41,7 @@ import (
|
|
|
"github.com/coreos/etcd/pkg/wait"
|
|
"github.com/coreos/etcd/pkg/wait"
|
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft"
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
|
|
+ "github.com/coreos/etcd/rafthttp"
|
|
|
"github.com/coreos/etcd/snap"
|
|
"github.com/coreos/etcd/snap"
|
|
|
"github.com/coreos/etcd/store"
|
|
"github.com/coreos/etcd/store"
|
|
|
"github.com/coreos/etcd/wal"
|
|
"github.com/coreos/etcd/wal"
|
|
@@ -85,7 +86,8 @@ type Response struct {
|
|
|
err error
|
|
err error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-type Sender interface {
|
|
|
|
|
|
|
+type SendHub interface {
|
|
|
|
|
+ rafthttp.SenderFinder
|
|
|
Send(m []raftpb.Message)
|
|
Send(m []raftpb.Message)
|
|
|
Add(m *Member)
|
|
Add(m *Member)
|
|
|
Remove(id types.ID)
|
|
Remove(id types.ID)
|
|
@@ -172,7 +174,7 @@ type EtcdServer struct {
|
|
|
// MUST NOT block. It is okay to drop messages, since clients should
|
|
// MUST NOT block. It is okay to drop messages, since clients should
|
|
|
// timeout and reissue their messages. If send is nil, server will
|
|
// timeout and reissue their messages. If send is nil, server will
|
|
|
// panic.
|
|
// panic.
|
|
|
- sender Sender
|
|
|
|
|
|
|
+ sendhub SendHub
|
|
|
|
|
|
|
|
storage Storage
|
|
storage Storage
|
|
|
|
|
|
|
@@ -268,7 +270,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|
|
}
|
|
}
|
|
|
lstats := stats.NewLeaderStats(id.String())
|
|
lstats := stats.NewLeaderStats(id.String())
|
|
|
|
|
|
|
|
- shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
|
|
|
|
|
s := &EtcdServer{
|
|
s := &EtcdServer{
|
|
|
store: st,
|
|
store: st,
|
|
|
node: n,
|
|
node: n,
|
|
@@ -281,11 +282,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|
|
}{w, ss},
|
|
}{w, ss},
|
|
|
stats: sstats,
|
|
stats: sstats,
|
|
|
lstats: lstats,
|
|
lstats: lstats,
|
|
|
- sender: shub,
|
|
|
|
|
Ticker: time.Tick(100 * time.Millisecond),
|
|
Ticker: time.Tick(100 * time.Millisecond),
|
|
|
SyncTicker: time.Tick(500 * time.Millisecond),
|
|
SyncTicker: time.Tick(500 * time.Millisecond),
|
|
|
snapCount: cfg.SnapCount,
|
|
snapCount: cfg.SnapCount,
|
|
|
}
|
|
}
|
|
|
|
|
+ s.sendhub = newSendHub(cfg.Transport, cfg.Cluster, s, sstats, lstats)
|
|
|
return s, nil
|
|
return s, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -316,6 +317,8 @@ func (s *EtcdServer) start() {
|
|
|
|
|
|
|
|
func (s *EtcdServer) ID() types.ID { return s.id }
|
|
func (s *EtcdServer) ID() types.ID { return s.id }
|
|
|
|
|
|
|
|
|
|
+func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub }
|
|
|
|
|
+
|
|
|
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
|
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
|
|
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
|
|
if s.Cluster.IsIDRemoved(types.ID(m.From)) {
|
|
|
log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
|
|
log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String())
|
|
@@ -333,11 +336,11 @@ func (s *EtcdServer) run() {
|
|
|
var snapi, appliedi uint64
|
|
var snapi, appliedi uint64
|
|
|
var nodes []uint64
|
|
var nodes []uint64
|
|
|
var shouldstop bool
|
|
var shouldstop bool
|
|
|
- shouldstopC := s.sender.ShouldStopNotify()
|
|
|
|
|
|
|
+ shouldstopC := s.sendhub.ShouldStopNotify()
|
|
|
|
|
|
|
|
defer func() {
|
|
defer func() {
|
|
|
s.node.Stop()
|
|
s.node.Stop()
|
|
|
- s.sender.Stop()
|
|
|
|
|
|
|
+ s.sendhub.Stop()
|
|
|
close(s.done)
|
|
close(s.done)
|
|
|
}()
|
|
}()
|
|
|
for {
|
|
for {
|
|
@@ -361,7 +364,7 @@ func (s *EtcdServer) run() {
|
|
|
if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
|
|
if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
|
|
|
log.Fatalf("etcdserver: create snapshot error: %v", err)
|
|
log.Fatalf("etcdserver: create snapshot error: %v", err)
|
|
|
}
|
|
}
|
|
|
- s.sender.Send(rd.Messages)
|
|
|
|
|
|
|
+ s.sendhub.Send(rd.Messages)
|
|
|
|
|
|
|
|
// recover from snapshot if it is more updated than current applied
|
|
// recover from snapshot if it is more updated than current applied
|
|
|
if rd.Snapshot.Index > appliedi {
|
|
if rd.Snapshot.Index > appliedi {
|
|
@@ -726,7 +729,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
|
|
if m.ID == s.id {
|
|
if m.ID == s.id {
|
|
|
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
|
} else {
|
|
} else {
|
|
|
- s.sender.Add(m)
|
|
|
|
|
|
|
+ s.sendhub.Add(m)
|
|
|
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
|
}
|
|
}
|
|
|
case raftpb.ConfChangeRemoveNode:
|
|
case raftpb.ConfChangeRemoveNode:
|
|
@@ -737,7 +740,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
|
|
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
|
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
|
|
|
return true, nil
|
|
return true, nil
|
|
|
} else {
|
|
} else {
|
|
|
- s.sender.Remove(id)
|
|
|
|
|
|
|
+ s.sendhub.Remove(id)
|
|
|
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
|
|
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
|
|
|
}
|
|
}
|
|
|
case raftpb.ConfChangeUpdateNode:
|
|
case raftpb.ConfChangeUpdateNode:
|
|
@@ -752,7 +755,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) (bool, error) {
|
|
|
if m.ID == s.id {
|
|
if m.ID == s.id {
|
|
|
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
|
} else {
|
|
} else {
|
|
|
- s.sender.Update(m)
|
|
|
|
|
|
|
+ s.sendhub.Update(m)
|
|
|
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|