|
|
@@ -2,49 +2,47 @@ package server
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
- "crypto/tls"
|
|
|
"encoding/binary"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
- "net"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/coreos/raft"
|
|
|
+ "github.com/gorilla/mux"
|
|
|
+
|
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
|
"github.com/coreos/etcd/log"
|
|
|
"github.com/coreos/etcd/metrics"
|
|
|
"github.com/coreos/etcd/store"
|
|
|
- "github.com/coreos/raft"
|
|
|
- "github.com/gorilla/mux"
|
|
|
)
|
|
|
|
|
|
const retryInterval = 10
|
|
|
|
|
|
const ThresholdMonitorTimeout = 5 * time.Second
|
|
|
|
|
|
-type PeerServer struct {
|
|
|
- raftServer raft.Server
|
|
|
- server *Server
|
|
|
- httpServer *http.Server
|
|
|
- listener net.Listener
|
|
|
- joinIndex uint64
|
|
|
- name string
|
|
|
- url string
|
|
|
- bindAddr string
|
|
|
- tlsConf *TLSConfig
|
|
|
- tlsInfo *TLSInfo
|
|
|
- followersStats *raftFollowersStats
|
|
|
- serverStats *raftServerStats
|
|
|
- registry *Registry
|
|
|
- store store.Store
|
|
|
- snapConf *snapshotConf
|
|
|
+type PeerServerConfig struct {
|
|
|
+ Name string
|
|
|
+ Scheme string
|
|
|
+ URL string
|
|
|
+ SnapshotCount int
|
|
|
MaxClusterSize int
|
|
|
RetryTimes int
|
|
|
- HeartbeatTimeout time.Duration
|
|
|
- ElectionTimeout time.Duration
|
|
|
+}
|
|
|
+
|
|
|
+type PeerServer struct {
|
|
|
+ Config PeerServerConfig
|
|
|
+ raftServer raft.Server
|
|
|
+ server *Server
|
|
|
+ joinIndex uint64
|
|
|
+ followersStats *raftFollowersStats
|
|
|
+ serverStats *raftServerStats
|
|
|
+ registry *Registry
|
|
|
+ store store.Store
|
|
|
+ snapConf *snapshotConf
|
|
|
|
|
|
closeChan chan bool
|
|
|
timeoutThresholdChan chan interface{}
|
|
|
@@ -65,84 +63,56 @@ type snapshotConf struct {
|
|
|
snapshotThr uint64
|
|
|
}
|
|
|
|
|
|
-func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration, mb *metrics.Bucket) *PeerServer {
|
|
|
-
|
|
|
+func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
|
|
|
s := &PeerServer{
|
|
|
- name: name,
|
|
|
- url: url,
|
|
|
- bindAddr: bindAddr,
|
|
|
- tlsConf: tlsConf,
|
|
|
- tlsInfo: tlsInfo,
|
|
|
+ Config: psConfig,
|
|
|
registry: registry,
|
|
|
store: store,
|
|
|
- followersStats: &raftFollowersStats{
|
|
|
- Leader: name,
|
|
|
- Followers: make(map[string]*raftFollowerStats),
|
|
|
- },
|
|
|
- serverStats: &raftServerStats{
|
|
|
- Name: name,
|
|
|
- StartTime: time.Now(),
|
|
|
- sendRateQueue: &statsQueue{
|
|
|
- back: -1,
|
|
|
- },
|
|
|
- recvRateQueue: &statsQueue{
|
|
|
- back: -1,
|
|
|
- },
|
|
|
- },
|
|
|
- HeartbeatTimeout: heartbeatTimeout,
|
|
|
- ElectionTimeout: electionTimeout,
|
|
|
+ followersStats: followersStats,
|
|
|
+ serverStats: serverStats,
|
|
|
|
|
|
timeoutThresholdChan: make(chan interface{}, 1),
|
|
|
|
|
|
metrics: mb,
|
|
|
}
|
|
|
|
|
|
- // Create transporter for raft
|
|
|
- raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
|
|
|
-
|
|
|
- // Create raft server
|
|
|
- raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
|
|
|
- if err != nil {
|
|
|
- log.Fatal(err)
|
|
|
- }
|
|
|
+ return s
|
|
|
+}
|
|
|
|
|
|
+func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
|
|
|
s.snapConf = &snapshotConf{
|
|
|
checkingInterval: time.Second * 3,
|
|
|
// this is not accurate, we will update raft to provide an api
|
|
|
lastIndex: raftServer.CommitIndex(),
|
|
|
- snapshotThr: uint64(snapshotCount),
|
|
|
+ snapshotThr: uint64(s.Config.SnapshotCount),
|
|
|
}
|
|
|
|
|
|
- s.raftServer = raftServer
|
|
|
- s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger)
|
|
|
- s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger)
|
|
|
+ raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
|
|
|
|
|
|
- s.raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
|
|
|
+ raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
|
|
|
|
|
|
- return s
|
|
|
+ s.raftServer = raftServer
|
|
|
}
|
|
|
|
|
|
// Start the raft server
|
|
|
-func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
|
|
+func (s *PeerServer) Start(snapshot bool, cluster []string) error {
|
|
|
// LoadSnapshot
|
|
|
if snapshot {
|
|
|
err := s.raftServer.LoadSnapshot()
|
|
|
|
|
|
if err == nil {
|
|
|
- log.Debugf("%s finished load snapshot", s.name)
|
|
|
+ log.Debugf("%s finished load snapshot", s.Config.Name)
|
|
|
} else {
|
|
|
log.Debug(err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- s.raftServer.SetElectionTimeout(s.ElectionTimeout)
|
|
|
- s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout)
|
|
|
-
|
|
|
s.raftServer.Start()
|
|
|
|
|
|
if s.raftServer.IsLogEmpty() {
|
|
|
@@ -155,7 +125,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
|
|
|
|
|
} else {
|
|
|
// Rejoin the previous cluster
|
|
|
- cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name)
|
|
|
+ cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
|
for i := 0; i < len(cluster); i++ {
|
|
|
u, err := url.Parse(cluster[i])
|
|
|
if err != nil {
|
|
|
@@ -168,7 +138,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
|
|
log.Warn("the entire cluster is down! this peer will restart the cluster.")
|
|
|
}
|
|
|
|
|
|
- log.Debugf("%s restart as a follower", s.name)
|
|
|
+ log.Debugf("%s restart as a follower", s.Config.Name)
|
|
|
}
|
|
|
|
|
|
s.closeChan = make(chan bool)
|
|
|
@@ -181,65 +151,34 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
|
|
|
go s.monitorSnapshot()
|
|
|
}
|
|
|
|
|
|
- // start to response to raft requests
|
|
|
- return s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
-// Overridden version of net/http added so we can manage the listener.
|
|
|
-func (s *PeerServer) listenAndServe() error {
|
|
|
- addr := s.httpServer.Addr
|
|
|
- if addr == "" {
|
|
|
- addr = ":http"
|
|
|
- }
|
|
|
- l, e := net.Listen("tcp", addr)
|
|
|
- if e != nil {
|
|
|
- return e
|
|
|
+func (s *PeerServer) Stop() {
|
|
|
+ if s.closeChan != nil {
|
|
|
+ close(s.closeChan)
|
|
|
+ s.closeChan = nil
|
|
|
}
|
|
|
- s.listener = l
|
|
|
- return s.httpServer.Serve(l)
|
|
|
}
|
|
|
|
|
|
-// Overridden version of net/http added so we can manage the listener.
|
|
|
-func (s *PeerServer) listenAndServeTLS(certFile, keyFile string) error {
|
|
|
- addr := s.httpServer.Addr
|
|
|
- if addr == "" {
|
|
|
- addr = ":https"
|
|
|
- }
|
|
|
- config := &tls.Config{}
|
|
|
- if s.httpServer.TLSConfig != nil {
|
|
|
- *config = *s.httpServer.TLSConfig
|
|
|
- }
|
|
|
- if config.NextProtos == nil {
|
|
|
- config.NextProtos = []string{"http/1.1"}
|
|
|
- }
|
|
|
-
|
|
|
- var err error
|
|
|
- config.Certificates = make([]tls.Certificate, 1)
|
|
|
- config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- conn, err := net.Listen("tcp", addr)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+func (s *PeerServer) HTTPHandler() http.Handler {
|
|
|
+ router := mux.NewRouter()
|
|
|
|
|
|
- tlsListener := tls.NewListener(conn, config)
|
|
|
- s.listener = tlsListener
|
|
|
- return s.httpServer.Serve(tlsListener)
|
|
|
-}
|
|
|
+ // internal commands
|
|
|
+ router.HandleFunc("/name", s.NameHttpHandler)
|
|
|
+ router.HandleFunc("/version", s.VersionHttpHandler)
|
|
|
+ router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
|
|
|
+ router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
|
|
|
+ router.HandleFunc("/join", s.JoinHttpHandler)
|
|
|
+ router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
|
|
|
+ router.HandleFunc("/vote", s.VoteHttpHandler)
|
|
|
+ router.HandleFunc("/log", s.GetLogHttpHandler)
|
|
|
+ router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
|
|
|
+ router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
|
|
|
+ router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
|
|
|
+ router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
|
|
|
|
|
|
-// Stops the server.
|
|
|
-func (s *PeerServer) Close() {
|
|
|
- if s.closeChan != nil {
|
|
|
- close(s.closeChan)
|
|
|
- s.closeChan = nil
|
|
|
- }
|
|
|
- if s.listener != nil {
|
|
|
- s.listener.Close()
|
|
|
- s.listener = nil
|
|
|
- }
|
|
|
+ return router
|
|
|
}
|
|
|
|
|
|
// Retrieves the underlying Raft server.
|
|
|
@@ -255,17 +194,17 @@ func (s *PeerServer) SetServer(server *Server) {
|
|
|
func (s *PeerServer) startAsLeader() {
|
|
|
// leader need to join self as a peer
|
|
|
for {
|
|
|
- _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.url, s.server.URL()))
|
|
|
+ _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL()))
|
|
|
if err == nil {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
- log.Debugf("%s start as a leader", s.name)
|
|
|
+ log.Debugf("%s start as a leader", s.Config.Name)
|
|
|
}
|
|
|
|
|
|
func (s *PeerServer) startAsFollower(cluster []string) {
|
|
|
// start as a follower in a existing cluster
|
|
|
- for i := 0; i < s.RetryTimes; i++ {
|
|
|
+ for i := 0; i < s.Config.RetryTimes; i++ {
|
|
|
ok := s.joinCluster(cluster)
|
|
|
if ok {
|
|
|
return
|
|
|
@@ -274,41 +213,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
|
|
|
time.Sleep(time.Second * retryInterval)
|
|
|
}
|
|
|
|
|
|
- log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)
|
|
|
-}
|
|
|
-
|
|
|
-// Start to listen and response raft command
|
|
|
-func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
|
|
|
- log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.bindAddr, s.url)
|
|
|
-
|
|
|
- router := mux.NewRouter()
|
|
|
-
|
|
|
- s.httpServer = &http.Server{
|
|
|
- Handler: router,
|
|
|
- TLSConfig: &tlsConf,
|
|
|
- Addr: s.bindAddr,
|
|
|
- }
|
|
|
-
|
|
|
- // internal commands
|
|
|
- router.HandleFunc("/name", s.NameHttpHandler)
|
|
|
- router.HandleFunc("/version", s.VersionHttpHandler)
|
|
|
- router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
|
|
|
- router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
|
|
|
- router.HandleFunc("/join", s.JoinHttpHandler)
|
|
|
- router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
|
|
|
- router.HandleFunc("/vote", s.VoteHttpHandler)
|
|
|
- router.HandleFunc("/log", s.GetLogHttpHandler)
|
|
|
- router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
|
|
|
- router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
|
|
|
- router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
|
|
|
- router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
|
|
|
-
|
|
|
- if scheme == "http" {
|
|
|
- return s.listenAndServe()
|
|
|
- } else {
|
|
|
- return s.listenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)
|
|
|
- }
|
|
|
-
|
|
|
+ log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
|
|
}
|
|
|
|
|
|
// getVersion fetches the peer version of a cluster.
|
|
|
@@ -333,14 +238,14 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) {
|
|
|
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
|
|
|
func (s *PeerServer) Upgradable() error {
|
|
|
nextVersion := s.store.Version() + 1
|
|
|
- for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) {
|
|
|
+ for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) {
|
|
|
u, err := url.Parse(peerURL)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
|
|
|
}
|
|
|
|
|
|
t, _ := s.raftServer.Transporter().(*transporter)
|
|
|
- checkURL := (&url.URL{Host: u.Host, Scheme: s.tlsConf.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String()
|
|
|
+ checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String()
|
|
|
resp, _, err := t.Get(checkURL)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host)
|
|
|
@@ -359,9 +264,9 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme)
|
|
|
+ err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
|
|
|
if err == nil {
|
|
|
- log.Debugf("%s success join to the cluster via peer %s", s.name, peer)
|
|
|
+ log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer)
|
|
|
return true
|
|
|
|
|
|
} else {
|
|
|
@@ -392,7 +297,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
|
|
return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
|
|
|
}
|
|
|
|
|
|
- json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
|
|
|
+ json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
|
|
|
|
|
|
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
|
|
|
|
|
|
@@ -417,7 +322,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
|
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
|
address := resp.Header.Get("Location")
|
|
|
log.Debugf("Send Join Request to %s", address)
|
|
|
- json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL()))
|
|
|
+ json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL()))
|
|
|
resp, req, err = t.Post(address, &b)
|
|
|
|
|
|
} else if resp.StatusCode == http.StatusBadRequest {
|
|
|
@@ -477,21 +382,21 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
|
|
|
|
|
|
switch event.Type() {
|
|
|
case raft.StateChangeEventType:
|
|
|
- log.Infof("%s: state changed from '%v' to '%v'.", s.name, prevValue, value)
|
|
|
+ log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
|
case raft.TermChangeEventType:
|
|
|
- log.Infof("%s: term #%v started.", s.name, value)
|
|
|
+ log.Infof("%s: term #%v started.", s.Config.Name, value)
|
|
|
case raft.LeaderChangeEventType:
|
|
|
- log.Infof("%s: leader changed from '%v' to '%v'.", s.name, prevValue, value)
|
|
|
+ log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
|
case raft.AddPeerEventType:
|
|
|
- log.Infof("%s: peer added: '%v'", s.name, value)
|
|
|
+ log.Infof("%s: peer added: '%v'", s.Config.Name, value)
|
|
|
case raft.RemovePeerEventType:
|
|
|
- log.Infof("%s: peer removed: '%v'", s.name, value)
|
|
|
+ log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
|
|
|
case raft.HeartbeatTimeoutEventType:
|
|
|
var name = "<unknown>"
|
|
|
if peer, ok := value.(*raft.Peer); ok {
|
|
|
name = peer.Name
|
|
|
}
|
|
|
- log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name)
|
|
|
+ log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name)
|
|
|
case raft.ElectionTimeoutThresholdEventType:
|
|
|
select {
|
|
|
case s.timeoutThresholdChan <- value:
|
|
|
@@ -538,7 +443,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
|
|
|
for {
|
|
|
select {
|
|
|
case value := <-s.timeoutThresholdChan:
|
|
|
- log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value)
|
|
|
+ log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
|
|
|
case <-closeChan:
|
|
|
return
|
|
|
}
|