Browse Source

Merge pull request #1256 from barakmich/retry

Add logging and backoff and simple retry logic
Barak Michener 11 years ago
parent
commit
74ab003e1f
3 changed files with 170 additions and 28 deletions
  1. 2 1
      client/http.go
  2. 65 12
      discovery/discovery.go
  3. 103 15
      discovery/discovery_test.go

+ 2 - 1
client/http.go

@@ -15,7 +15,8 @@ import (
 )
 
 var (
-	v2Prefix = "/v2/keys"
+	v2Prefix   = "/v2/keys"
+	ErrTimeout = context.DeadlineExceeded
 )
 
 // transport mimics http.Transport to provide an interface which can be

+ 65 - 12
discovery/discovery.go

@@ -3,6 +3,7 @@ package discovery
 import (
 	"errors"
 	"fmt"
+	"log"
 	"net/http"
 	"net/url"
 	"path"
@@ -15,12 +16,18 @@ import (
 )
 
 var (
-	ErrInvalidURL    = errors.New("discovery: invalid URL")
-	ErrBadSizeKey    = errors.New("discovery: size key is bad")
-	ErrSizeNotFound  = errors.New("discovery: size key not found")
-	ErrTokenNotFound = errors.New("discovery: token not found")
-	ErrDuplicateID   = errors.New("discovery: found duplicate id")
-	ErrFullCluster   = errors.New("discovery: cluster is full")
+	ErrInvalidURL     = errors.New("discovery: invalid URL")
+	ErrBadSizeKey     = errors.New("discovery: size key is bad")
+	ErrSizeNotFound   = errors.New("discovery: size key not found")
+	ErrTokenNotFound  = errors.New("discovery: token not found")
+	ErrDuplicateID    = errors.New("discovery: found duplicate id")
+	ErrFullCluster    = errors.New("discovery: cluster is full")
+	ErrTooManyRetries = errors.New("discovery: too many retries")
+)
+
+const (
+	// Number of retries discovery will attempt before giving up and erroring out.
+	nRetries = uint(3)
 )
 
 type Discoverer interface {
@@ -32,6 +39,11 @@ type discovery struct {
 	id      uint64
 	config  string
 	c       client.Client
+	retries uint
+	url     *url.URL
+
+	// Injectable for testing. nil means Seconds.
+	timeoutTimescale time.Duration
 }
 
 func New(durl string, id uint64, config string) (Discoverer, error) {
@@ -41,18 +53,20 @@ func New(durl string, id uint64, config string) (Discoverer, error) {
 	}
 	token := u.Path
 	u.Path = ""
-	client, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5)
+	c, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5)
 	if err != nil {
 		return nil, err
 	}
 	// discovery service redirects /[key] to /v2/keys/[key]
 	// set the prefix of client to "" to handle this
-	client.SetPrefix("")
+	c.SetPrefix("")
 	return &discovery{
-		cluster: token,
-		id:      id,
-		config:  config,
-		c:       client,
+		cluster:          token,
+		id:               id,
+		config:           config,
+		c:                c,
+		url:              u,
+		timeoutTimescale: time.Second,
 	}, nil
 }
 
@@ -65,6 +79,9 @@ func (d *discovery) Discover() (string, error) {
 	}
 
 	if err := d.createSelf(); err != nil {
+		// Fails, even on a timeout, if createSelf times out.
+		// TODO(barakmich): Retrying the same node might want to succeed here
+		// (ie, createSelf should be idempotent for discovery).
 		return "", err
 	}
 
@@ -101,6 +118,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 		if err == client.ErrKeyNoExist {
 			return nil, 0, ErrSizeNotFound
 		}
+		if err == client.ErrTimeout {
+			return d.checkClusterRetry()
+		}
 		return nil, 0, err
 	}
 	size, err := strconv.Atoi(resp.Node.Value)
@@ -110,6 +130,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 
 	resp, err = d.c.Get(d.cluster)
 	if err != nil {
+		if err == client.ErrTimeout {
+			return d.checkClusterRetry()
+		}
 		return nil, 0, err
 	}
 	nodes := make(client.Nodes, 0)
@@ -135,6 +158,33 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 	return nodes, size, nil
 }
 
+func (d *discovery) logAndBackoffForRetry(step string) {
+	d.retries++
+	retryTime := d.timeoutTimescale * (0x1 << d.retries)
+	log.Println("discovery: during", step, "connection to", d.url, "timed out, retrying in", retryTime)
+	time.Sleep(retryTime)
+}
+
+func (d *discovery) checkClusterRetry() (client.Nodes, int, error) {
+	if d.retries < nRetries {
+		d.logAndBackoffForRetry("cluster status check")
+		return d.checkCluster()
+	}
+	return nil, 0, ErrTooManyRetries
+}
+
+func (d *discovery) waitNodesRetry() (client.Nodes, error) {
+	if d.retries < nRetries {
+		d.logAndBackoffForRetry("waiting for other nodes")
+		nodes, n, err := d.checkCluster()
+		if err != nil {
+			return nil, err
+		}
+		return d.waitNodes(nodes, n)
+	}
+	return nil, ErrTooManyRetries
+}
+
 func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
 	if len(nodes) > size {
 		nodes = nodes[:size]
@@ -146,6 +196,9 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error
 	for len(all) < size {
 		resp, err := w.Next()
 		if err != nil {
+			if err == client.ErrTimeout {
+				return d.waitNodesRetry()
+			}
 			return nil, err
 		}
 		all = append(all, resp.Node)

+ 103 - 15
discovery/discovery_test.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"math/rand"
 	"sort"
+	"strconv"
 
 	"reflect"
 	"testing"
@@ -107,15 +108,21 @@ func TestCheckCluster(t *testing.T) {
 		c := &clientWithResp{rs: rs}
 		d := discovery{cluster: cluster, id: 1, c: c}
 
-		ns, size, err := d.checkCluster()
-		if err != tt.werr {
-			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
-		}
-		if reflect.DeepEqual(ns, tt.nodes) {
-			t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
-		}
-		if size != tt.wsize {
-			t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
+		cRetry := &clientWithRetry{failTimes: 3}
+		cRetry.rs = rs
+		dRetry := discovery{cluster: cluster, id: 1, c: cRetry, timeoutTimescale: time.Millisecond * 2}
+
+		for _, d := range []discovery{d, dRetry} {
+			ns, size, err := d.checkCluster()
+			if err != tt.werr {
+				t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+			}
+			if reflect.DeepEqual(ns, tt.nodes) {
+				t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
+			}
+			if size != tt.wsize {
+				t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
+			}
 		}
 	}
 }
@@ -173,14 +180,43 @@ func TestWaitNodes(t *testing.T) {
 	}
 
 	for i, tt := range tests {
+		// Basic case
 		c := &clientWithResp{nil, &watcherWithResp{tt.rs}}
 		d := &discovery{cluster: "1000", c: c}
-		g, err := d.waitNodes(tt.nodes, tt.size)
-		if err != tt.werr {
-			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+
+		// Retry case
+		retryScanResp := make([]*client.Response, 0)
+		if len(tt.nodes) > 0 {
+			retryScanResp = append(retryScanResp, &client.Response{
+				Node: &client.Node{
+					Key:   "1000",
+					Value: strconv.Itoa(tt.size),
+				},
+			})
+			retryScanResp = append(retryScanResp, &client.Response{
+				Node: &client.Node{
+					Nodes: tt.nodes,
+				},
+			})
+		}
+		cRetry := &clientWithResp{
+			rs: retryScanResp,
+			w:  &watcherWithRetry{rs: tt.rs, failTimes: 2},
 		}
-		if !reflect.DeepEqual(g, tt.wall) {
-			t.Errorf("#%d: all = %v, want %v", i, g, tt.wall)
+		dRetry := &discovery{
+			cluster:          "1000",
+			c:                cRetry,
+			timeoutTimescale: time.Millisecond * 2,
+		}
+
+		for _, d := range []*discovery{d, dRetry} {
+			g, err := d.waitNodes(tt.nodes, tt.size)
+			if err != tt.werr {
+				t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+			}
+			if !reflect.DeepEqual(g, tt.wall) {
+				t.Errorf("#%d: all = %v, want %v", i, g, tt.wall)
+			}
 		}
 	}
 }
@@ -258,6 +294,16 @@ func TestSortableNodes(t *testing.T) {
 	}
 }
 
+func TestRetryFailure(t *testing.T) {
+	cluster := "1000"
+	c := &clientWithRetry{failTimes: 4}
+	d := discovery{cluster: cluster, id: 1, c: c, timeoutTimescale: time.Millisecond * 2}
+	_, _, err := d.checkCluster()
+	if err != ErrTooManyRetries {
+		t.Errorf("err = %v, want %v", err, ErrTooManyRetries)
+	}
+}
+
 type clientWithResp struct {
 	rs []*client.Response
 	w  client.Watcher
@@ -277,7 +323,7 @@ func (c *clientWithResp) Get(key string) (*client.Response, error) {
 		return &client.Response{}, client.ErrKeyNoExist
 	}
 	r := c.rs[0]
-	c.rs = c.rs[1:]
+	c.rs = append(c.rs[1:], r)
 	return r, nil
 }
 
@@ -330,3 +376,45 @@ type watcherWithErr struct {
 func (w *watcherWithErr) Next() (*client.Response, error) {
 	return &client.Response{}, w.err
 }
+
+// Fails every other time
+type clientWithRetry struct {
+	clientWithResp
+	failCount int
+	failTimes int
+}
+
+func (c *clientWithRetry) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+	if c.failCount < c.failTimes {
+		c.failCount++
+		return nil, client.ErrTimeout
+	}
+	return c.clientWithResp.Create(key, value, ttl)
+}
+
+func (c *clientWithRetry) Get(key string) (*client.Response, error) {
+	if c.failCount < c.failTimes {
+		c.failCount++
+		return nil, client.ErrTimeout
+	}
+	return c.clientWithResp.Get(key)
+}
+
+type watcherWithRetry struct {
+	rs        []*client.Response
+	failCount int
+	failTimes int
+}
+
+func (w *watcherWithRetry) Next() (*client.Response, error) {
+	if w.failCount < w.failTimes {
+		w.failCount++
+		return nil, client.ErrTimeout
+	}
+	if len(w.rs) == 0 {
+		return &client.Response{}, nil
+	}
+	r := w.rs[0]
+	w.rs = w.rs[1:]
+	return r, nil
+}