|
|
@@ -3,6 +3,7 @@ package discovery
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "log"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"path"
|
|
|
@@ -15,12 +16,16 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- ErrInvalidURL = errors.New("discovery: invalid URL")
|
|
|
- ErrBadSizeKey = errors.New("discovery: size key is bad")
|
|
|
- ErrSizeNotFound = errors.New("discovery: size key not found")
|
|
|
- ErrTokenNotFound = errors.New("discovery: token not found")
|
|
|
- ErrDuplicateID = errors.New("discovery: found duplicate id")
|
|
|
- ErrFullCluster = errors.New("discovery: cluster is full")
|
|
|
+ ErrInvalidURL = errors.New("discovery: invalid URL")
|
|
|
+ ErrBadSizeKey = errors.New("discovery: size key is bad")
|
|
|
+ ErrSizeNotFound = errors.New("discovery: size key not found")
|
|
|
+ ErrTokenNotFound = errors.New("discovery: token not found")
|
|
|
+ ErrDuplicateID = errors.New("discovery: found duplicate id")
|
|
|
+ ErrFullCluster = errors.New("discovery: cluster is full")
|
|
|
+ ErrTooManyRetries = errors.New("discovery: too many retries")
|
|
|
+
|
|
|
+ // Number of retries discovery will attempt before giving up and erroring out.
|
|
|
+ nRetries = uint(3)
|
|
|
)
|
|
|
|
|
|
type Discoverer interface {
|
|
|
@@ -32,6 +37,8 @@ type discovery struct {
|
|
|
id int64
|
|
|
config string
|
|
|
c client.Client
|
|
|
+ retries uint
|
|
|
+ url *url.URL
|
|
|
}
|
|
|
|
|
|
func New(durl string, id int64, config string) (Discoverer, error) {
|
|
|
@@ -41,18 +48,19 @@ func New(durl string, id int64, config string) (Discoverer, error) {
|
|
|
}
|
|
|
token := u.Path
|
|
|
u.Path = ""
|
|
|
- client, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5)
|
|
|
+ c, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
// discovery service redirects /[key] to /v2/keys/[key]
|
|
|
// set the prefix of client to "" to handle this
|
|
|
- client.SetPrefix("")
|
|
|
+ c.SetPrefix("")
|
|
|
return &discovery{
|
|
|
cluster: token,
|
|
|
id: id,
|
|
|
config: config,
|
|
|
- c: client,
|
|
|
+ c: c,
|
|
|
+ url: u,
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
@@ -65,6 +73,12 @@ func (d *discovery) Discover() (string, error) {
|
|
|
}
|
|
|
|
|
|
if err := d.createSelf(); err != nil {
|
|
|
+ if err == client.ErrTimeout {
|
|
|
+ if d.retries < nRetries {
|
|
|
+ d.logAndBackoffForRetry("registering self")
|
|
|
+ return d.Discover()
|
|
|
+ }
|
|
|
+ }
|
|
|
return "", err
|
|
|
}
|
|
|
|
|
|
@@ -75,6 +89,15 @@ func (d *discovery) Discover() (string, error) {
|
|
|
|
|
|
all, err := d.waitNodes(nodes, size)
|
|
|
if err != nil {
|
|
|
+ if err == client.ErrTimeout {
|
|
|
+ // Our actual connection timed out (nodes can take awhile, but the discovery
|
|
|
+ // server stopped responding) increment our retry counter and we have to
|
|
|
+ // start from scratch. Calling createSelf() again should be idempotent.
|
|
|
+ if d.retries < nRetries {
|
|
|
+ d.logAndBackoffForRetry("waiting for other nodes")
|
|
|
+ return d.Discover()
|
|
|
+ }
|
|
|
+ }
|
|
|
return "", err
|
|
|
}
|
|
|
|
|
|
@@ -101,6 +124,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
|
|
|
if err == client.ErrKeyNoExist {
|
|
|
return nil, 0, ErrSizeNotFound
|
|
|
}
|
|
|
+ if err == client.ErrTimeout {
|
|
|
+ return d.checkClusterRetry()
|
|
|
+ }
|
|
|
return nil, 0, err
|
|
|
}
|
|
|
size, err := strconv.Atoi(resp.Node.Value)
|
|
|
@@ -110,6 +136,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
|
|
|
|
|
|
resp, err = d.c.Get(d.cluster)
|
|
|
if err != nil {
|
|
|
+ if err == client.ErrTimeout {
|
|
|
+ return d.checkClusterRetry()
|
|
|
+ }
|
|
|
return nil, 0, err
|
|
|
}
|
|
|
nodes := make(client.Nodes, 0)
|
|
|
@@ -135,6 +164,21 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
|
|
|
return nodes, size, nil
|
|
|
}
|
|
|
|
|
|
+func (d *discovery) logAndBackoffForRetry(step string) {
|
|
|
+ d.retries++
|
|
|
+ retryTime := time.Second * (0x1 << d.retries)
|
|
|
+ log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
|
|
|
+ time.Sleep(retryTime)
|
|
|
+}
|
|
|
+
|
|
|
+func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
|
|
|
+ if d.retries < nRetries {
|
|
|
+ d.logAndBackoffForRetry("cluster status check")
|
|
|
+ return d.checkCluster()
|
|
|
+ }
|
|
|
+ return nil, 0, ErrTooManyRetries
|
|
|
+}
|
|
|
+
|
|
|
func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
|
|
|
if len(nodes) > size {
|
|
|
nodes = nodes[:size]
|