|
|
@@ -4,6 +4,7 @@ import (
|
|
|
"bytes"
|
|
|
"crypto/tls"
|
|
|
"encoding/json"
|
|
|
+ "io/ioutil"
|
|
|
"fmt"
|
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
|
"github.com/coreos/go-raft"
|
|
|
@@ -14,6 +15,7 @@ import (
|
|
|
|
|
|
type raftServer struct {
|
|
|
*raft.Server
|
|
|
+ version string
|
|
|
name string
|
|
|
url string
|
|
|
tlsConf *TLSConfig
|
|
|
@@ -34,6 +36,7 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo
|
|
|
|
|
|
return &raftServer{
|
|
|
Server: server,
|
|
|
+ version: raftVersion,
|
|
|
name: name,
|
|
|
url: url,
|
|
|
tlsConf: tlsConf,
|
|
|
@@ -144,6 +147,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
|
|
|
|
|
|
// internal commands
|
|
|
raftMux.HandleFunc("/name", NameHttpHandler)
|
|
|
+ raftMux.HandleFunc("/version", RaftVersionHttpHandler)
|
|
|
raftMux.Handle("/join", errorHandler(JoinHttpHandler))
|
|
|
raftMux.HandleFunc("/vote", VoteHttpHandler)
|
|
|
raftMux.HandleFunc("/log", GetLogHttpHandler)
|
|
|
@@ -160,15 +164,44 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
|
|
|
|
|
|
}
|
|
|
|
|
|
+// getVersion fetches the raft version of a peer. This works for now but we
|
|
|
+// will need to do something more sophisticated later when we allow mixed
|
|
|
+// version clusters.
|
|
|
+func getVersion(t transporter, versionURL url.URL) (string, error) {
|
|
|
+ resp, err := t.Get(versionURL.String())
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+
|
|
|
+ defer resp.Body.Close()
|
|
|
+ body, err := ioutil.ReadAll(resp.Body)
|
|
|
+
|
|
|
+ return string(body), nil
|
|
|
+}
|
|
|
+
|
|
|
// Send join requests to the leader.
|
|
|
func joinCluster(s *raft.Server, raftURL string, scheme string) error {
|
|
|
var b bytes.Buffer
|
|
|
|
|
|
- json.NewEncoder(&b).Encode(newJoinCommand())
|
|
|
-
|
|
|
// t must be ok
|
|
|
t, _ := r.Transporter().(transporter)
|
|
|
|
|
|
+ // Our version must match the leaders version
|
|
|
+ versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"}
|
|
|
+ version, err := getVersion(t, versionURL)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("Unable to join: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: versioning of the internal protocol. See:
|
|
|
+ // Documentation/internatl-protocol-versioning.md
|
|
|
+ if version != r.version {
|
|
|
+ return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
|
|
|
+ }
|
|
|
+
|
|
|
+ json.NewEncoder(&b).Encode(newJoinCommand())
|
|
|
+
|
|
|
joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
|
|
|
|
|
|
debugf("Send Join Request to %s", raftURL)
|