|
@@ -9,7 +9,7 @@ import (
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/log"
|
|
"github.com/coreos/etcd/log"
|
|
|
- "github.com/coreos/go-etcd/etcd"
|
|
|
|
|
|
|
+ "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
@@ -44,14 +44,21 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // prefix is appended to all keys
|
|
|
|
|
|
|
+ // prefix is prepended to all keys for this discovery
|
|
|
d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
|
|
d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
|
|
|
|
|
|
|
|
- // Connect to a scheme://host not a full URL with path
|
|
|
|
|
|
|
+ // keep the old path in case we need to set the KeyPrefix below
|
|
|
|
|
+ oldPath := u.Path
|
|
|
u.Path = ""
|
|
u.Path = ""
|
|
|
- log.Infof("Bootstrapping via %s using prefix %s.", u.String(), d.prefix)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Connect to a scheme://host not a full URL with path
|
|
|
|
|
+ log.Infof("Discovery via %s using prefix %s.", u.String(), d.prefix)
|
|
|
d.client = etcd.NewClient([]string{u.String()})
|
|
d.client = etcd.NewClient([]string{u.String()})
|
|
|
|
|
|
|
|
|
|
+ if !strings.HasPrefix(oldPath, "/v2/keys") {
|
|
|
|
|
+ d.client.SetKeyPrefix("")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Register this machine first and announce that we are a member of
|
|
// Register this machine first and announce that we are a member of
|
|
|
// this cluster
|
|
// this cluster
|
|
|
err = d.heartbeat()
|
|
err = d.heartbeat()
|
|
@@ -68,7 +75,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []
|
|
|
|
|
|
|
|
// Bail out on unexpected errors
|
|
// Bail out on unexpected errors
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- if etcdErr, ok := err.(etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 {
|
|
|
|
|
|
|
+ if etcdErr, ok := err.(*etcd.EtcdError); !ok || etcdErr.ErrorCode != 101 {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -76,11 +83,11 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []
|
|
|
// If we got a response then the CAS was successful, we are leader
|
|
// If we got a response then the CAS was successful, we are leader
|
|
|
if resp != nil && resp.Node.Value == startedState {
|
|
if resp != nil && resp.Node.Value == startedState {
|
|
|
// We are the leader, we have no peers
|
|
// We are the leader, we have no peers
|
|
|
- log.Infof("Bootstrapping was in 'init' state this machine is the initial leader.")
|
|
|
|
|
|
|
+ log.Infof("Discovery was in the 'init' state this machine is the initial leader.")
|
|
|
return nil, nil
|
|
return nil, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Fall through to finding the other discoveryped peers
|
|
|
|
|
|
|
+ // Fall through to finding the other discovery peers
|
|
|
return d.findPeers()
|
|
return d.findPeers()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -93,7 +100,7 @@ func (d *Discoverer) findPeers() (peers []string, err error) {
|
|
|
node := resp.Node
|
|
node := resp.Node
|
|
|
|
|
|
|
|
if node == nil {
|
|
if node == nil {
|
|
|
- return nil, errors.New(fmt.Sprintf("%s key doesn't exist.", d.prefix))
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for _, n := range node.Nodes {
|
|
for _, n := range node.Nodes {
|
|
@@ -105,10 +112,10 @@ func (d *Discoverer) findPeers() (peers []string, err error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if len(peers) == 0 {
|
|
if len(peers) == 0 {
|
|
|
- return nil, errors.New("No peers found.")
|
|
|
|
|
|
|
+ return nil, errors.New("Discovery found an initialized cluster but no peers are registered.")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.Infof("Bootstrap found peers %v", peers)
|
|
|
|
|
|
|
+ log.Infof("Discovery found peers %v", peers)
|
|
|
|
|
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -122,7 +129,7 @@ func (d *Discoverer) startHeartbeat() {
|
|
|
case <-ticker:
|
|
case <-ticker:
|
|
|
err := d.heartbeat()
|
|
err := d.heartbeat()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- log.Warnf("Bootstrapping heartbeat failed: %v", err)
|
|
|
|
|
|
|
+ log.Warnf("Discovery heartbeat failed: %v", err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -130,7 +137,6 @@ func (d *Discoverer) startHeartbeat() {
|
|
|
|
|
|
|
|
func (d *Discoverer) heartbeat() error {
|
|
func (d *Discoverer) heartbeat() error {
|
|
|
_, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL)
|
|
_, err := d.client.Set(path.Join(d.prefix, d.name), d.peer, defaultTTL)
|
|
|
-
|
|
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|