|
@@ -21,6 +21,7 @@ import (
|
|
|
"net/http"
|
|
"net/http"
|
|
|
"net/url"
|
|
"net/url"
|
|
|
"path"
|
|
"path"
|
|
|
|
|
+ "sync"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
@@ -48,6 +49,7 @@ type sendHub struct {
|
|
|
p rafthttp.Processor
|
|
p rafthttp.Processor
|
|
|
ss *stats.ServerStats
|
|
ss *stats.ServerStats
|
|
|
ls *stats.LeaderStats
|
|
ls *stats.LeaderStats
|
|
|
|
|
+ mu sync.RWMutex // protect the sender map
|
|
|
senders map[types.ID]rafthttp.Sender
|
|
senders map[types.ID]rafthttp.Sender
|
|
|
shouldstop chan struct{}
|
|
shouldstop chan struct{}
|
|
|
}
|
|
}
|
|
@@ -67,7 +69,11 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *s
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (h *sendHub) Sender(id types.ID) rafthttp.Sender { return h.senders[id] }
|
|
|
|
|
|
|
+func (h *sendHub) Sender(id types.ID) rafthttp.Sender {
|
|
|
|
|
+ h.mu.RLock()
|
|
|
|
|
+ defer h.mu.RUnlock()
|
|
|
|
|
+ return h.senders[id]
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
func (h *sendHub) Send(msgs []raftpb.Message) {
|
|
func (h *sendHub) Send(msgs []raftpb.Message) {
|
|
|
for _, m := range msgs {
|
|
for _, m := range msgs {
|
|
@@ -102,6 +108,8 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (h *sendHub) Add(m *Member) {
|
|
func (h *sendHub) Add(m *Member) {
|
|
|
|
|
+ h.mu.Lock()
|
|
|
|
|
+ defer h.mu.Unlock()
|
|
|
if _, ok := h.senders[m.ID]; ok {
|
|
if _, ok := h.senders[m.ID]; ok {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -118,11 +126,15 @@ func (h *sendHub) Add(m *Member) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (h *sendHub) Remove(id types.ID) {
|
|
func (h *sendHub) Remove(id types.ID) {
|
|
|
|
|
+ h.mu.Lock()
|
|
|
|
|
+ defer h.mu.Unlock()
|
|
|
h.senders[id].Stop()
|
|
h.senders[id].Stop()
|
|
|
delete(h.senders, id)
|
|
delete(h.senders, id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (h *sendHub) Update(m *Member) {
|
|
func (h *sendHub) Update(m *Member) {
|
|
|
|
|
+ h.mu.Lock()
|
|
|
|
|
+ defer h.mu.Unlock()
|
|
|
// TODO: return error or just panic?
|
|
// TODO: return error or just panic?
|
|
|
if _, ok := h.senders[m.ID]; !ok {
|
|
if _, ok := h.senders[m.ID]; !ok {
|
|
|
return
|
|
return
|