Browse Source

client: prevent deadlock in Sync

Laurie Clark-Michalek 9 years ago
parent
commit
de008c8a4a
2 changed files with 186 additions and 41 deletions
  1. 102 41
      client/client.go
  2. 84 0
      client/client_test.go

+ 102 - 41
client/client.go

@@ -22,7 +22,6 @@ import (
 	"net"
 	"net"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
-	"reflect"
 	"sort"
 	"sort"
 	"strconv"
 	"strconv"
 	"sync"
 	"sync"
@@ -261,53 +260,67 @@ type httpClusterClient struct {
 	selectionMode EndpointSelectionMode
 	selectionMode EndpointSelectionMode
 }
 }
 
 
-func (c *httpClusterClient) getLeaderEndpoint() (string, error) {
-	mAPI := NewMembersAPI(c)
-	leader, err := mAPI.Leader(context.Background())
+func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) {
+	ceps := make([]url.URL, len(eps))
+	copy(ceps, eps)
+
+	// To perform a lookup on the new endpoint list without using the current
+	// client, we'll copy it
+	clientCopy := &httpClusterClient{
+		clientFactory: c.clientFactory,
+		credentials:   c.credentials,
+		rand:          c.rand,
+
+		pinned:    0,
+		endpoints: ceps,
+	}
+
+	mAPI := NewMembersAPI(clientCopy)
+	leader, err := mAPI.Leader(ctx)
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	}
 	}
+	if len(leader.ClientURLs) == 0 {
+		return "", ErrNoLeaderEndpoint
+	}
 
 
 	return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
 	return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
 }
 }
 
 
-func (c *httpClusterClient) SetEndpoints(eps []string) error {
+func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) {
 	if len(eps) == 0 {
 	if len(eps) == 0 {
-		return ErrNoEndpoints
+		return []url.URL{}, ErrNoEndpoints
 	}
 	}
 
 
 	neps := make([]url.URL, len(eps))
 	neps := make([]url.URL, len(eps))
 	for i, ep := range eps {
 	for i, ep := range eps {
 		u, err := url.Parse(ep)
 		u, err := url.Parse(ep)
 		if err != nil {
 		if err != nil {
-			return err
+			return []url.URL{}, err
 		}
 		}
 		neps[i] = *u
 		neps[i] = *u
 	}
 	}
+	return neps, nil
+}
 
 
-	switch c.selectionMode {
-	case EndpointSelectionRandom:
-		c.endpoints = shuffleEndpoints(c.rand, neps)
-		c.pinned = 0
-	case EndpointSelectionPrioritizeLeader:
-		c.endpoints = neps
-		lep, err := c.getLeaderEndpoint()
-		if err != nil {
-			return ErrNoLeaderEndpoint
-		}
-
-		for i := range c.endpoints {
-			if c.endpoints[i].String() == lep {
-				c.pinned = i
-				break
-			}
-		}
-		// If endpoints doesn't have the lu, just keep c.pinned = 0.
-		// Forwarding between follower and leader would be required but it works.
-	default:
-		return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
+func (c *httpClusterClient) SetEndpoints(eps []string) error {
+	neps, err := c.parseEndpoints(eps)
+	if err != nil {
+		return err
 	}
 	}
 
 
+	c.Lock()
+	defer c.Unlock()
+
+	c.endpoints = shuffleEndpoints(c.rand, neps)
+	// We're not doing anything for PrioritizeLeader here. This is
+	// due to not having a context meaning we can't call getLeaderEndpoint
+	// However, if you're using PrioritizeLeader, you've already been told
+	// to regularly call sync, where we do have a ctx, and can figure the
+	// leader. PrioritizeLeader is also quite a loose guarantee, so deal
+	// with it
+	c.pinned = 0
+
 	return nil
 	return nil
 }
 }
 
 
@@ -401,27 +414,51 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
 		return err
 		return err
 	}
 	}
 
 
-	c.Lock()
-	defer c.Unlock()
-
 	var eps []string
 	var eps []string
 	for _, m := range ms {
 	for _, m := range ms {
 		eps = append(eps, m.ClientURLs...)
 		eps = append(eps, m.ClientURLs...)
 	}
 	}
-	sort.Sort(sort.StringSlice(eps))
 
 
-	ceps := make([]string, len(c.endpoints))
-	for i, cep := range c.endpoints {
-		ceps[i] = cep.String()
+	neps, err := c.parseEndpoints(eps)
+	if err != nil {
+		return err
 	}
 	}
-	sort.Sort(sort.StringSlice(ceps))
-	// fast path if no change happens
-	// this helps client to pin the endpoint when no cluster change
-	if reflect.DeepEqual(eps, ceps) {
-		return nil
+
+	npin := 0
+
+	switch c.selectionMode {
+	case EndpointSelectionRandom:
+		c.RLock()
+		eq := endpointsEqual(c.endpoints, neps)
+		c.RUnlock()
+
+		if eq {
+			return nil
+		}
+		// When items in the endpoint list changes, we choose a new pin
+		neps = shuffleEndpoints(c.rand, neps)
+	case EndpointSelectionPrioritizeLeader:
+		nle, err := c.getLeaderEndpoint(ctx, neps)
+		if err != nil {
+			return ErrNoLeaderEndpoint
+		}
+
+		for i, n := range neps {
+			if n.String() == nle {
+				npin = i
+				break
+			}
+		}
+	default:
+		return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
 	}
 	}
 
 
-	return c.SetEndpoints(eps)
+	c.Lock()
+	defer c.Unlock()
+	c.endpoints = neps
+	c.pinned = npin
+
+	return nil
 }
 }
 
 
 func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
 func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
@@ -607,3 +644,27 @@ func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL {
 	}
 	}
 	return neps
 	return neps
 }
 }
+
+func endpointsEqual(left, right []url.URL) bool {
+	if len(left) != len(right) {
+		return false
+	}
+
+	sLeft := make([]string, len(left))
+	sRight := make([]string, len(right))
+	for i, l := range left {
+		sLeft[i] = l.String()
+	}
+	for i, r := range right {
+		sRight[i] = r.String()
+	}
+
+	sort.Strings(sLeft)
+	sort.Strings(sRight)
+	for i := range sLeft {
+		if sLeft[i] != sRight[i] {
+			return false
+		}
+	}
+	return true
+}

+ 84 - 0
client/client_test.go

@@ -900,6 +900,90 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestHTTPClusterClientSyncUnpinEndpoint tests that Sync() unpins the endpoint when
+// it gets a different member list than before.
+func TestHTTPClusterClientSyncUnpinEndpoint(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+	wants := []string{"http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"}
+
+	for i := 0; i < 3; i++ {
+		err = hc.Sync(context.Background())
+		if err != nil {
+			t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
+		}
+
+		if g := hc.endpoints[hc.pinned]; g.String() != wants[i] {
+			t.Errorf("#%d: pinned endpoint = %v, want %v", i, g, wants[i])
+		}
+	}
+}
+
+// TestHTTPClusterClientSyncPinLeaderEndpoint tests that Sync() pins the leader
+// when the selection mode is EndpointSelectionPrioritizeLeader
+func TestHTTPClusterClientSyncPinLeaderEndpoint(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+		selectionMode: EndpointSelectionPrioritizeLeader,
+		endpoints:     []url.URL{{}}, // Need somewhere to pretend to send to initially
+	}
+
+	wants := []string{"http://127.0.0.1:4003", "http://127.0.0.1:4002"}
+
+	for i, want := range wants {
+		err := hc.Sync(context.Background())
+		if err != nil {
+			t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
+		}
+
+		pinned := hc.endpoints[hc.pinned].String()
+		if pinned != want {
+			t.Errorf("#%d: pinned endpoint = %v, want %v", i, pinned, want)
+		}
+	}
+}
+
 func TestHTTPClusterClientResetFail(t *testing.T) {
 func TestHTTPClusterClientResetFail(t *testing.T) {
 	tests := [][]string{
 	tests := [][]string{
 		// need at least one endpoint
 		// need at least one endpoint