Ver Fonte

client: Sync() pin the endpoint when member list doesn't change

This helps client to pin the same endpoint as long as cluster doesn't change.
Yicheng Qin há 10 anos atrás
pai
commit
0ab16db728
2 ficheiros alterados com 54 adições e 0 exclusões
  1. 14 0
      client/client.go
  2. 40 0
      client/client_test.go

+ 14 - 0
client/client.go

@@ -22,6 +22,8 @@ import (
 	"net"
 	"net/http"
 	"net/url"
+	"reflect"
+	"sort"
 	"sync"
 	"time"
 
@@ -313,6 +315,18 @@ func (c *httpClusterClient) Sync(ctx context.Context) error {
 	for _, m := range ms {
 		eps = append(eps, m.ClientURLs...)
 	}
+	sort.Sort(sort.StringSlice(eps))
+
+	ceps := make([]string, len(c.endpoints))
+	for i, cep := range c.endpoints {
+		ceps[i] = cep.String()
+	}
+	sort.Sort(sort.StringSlice(ceps))
+	// fast path if no change happens
+	// this helps client to pin the endpoint when no cluster change
+	if reflect.DeepEqual(eps, ceps) {
+		return nil
+	}
 
 	return c.reset(eps)
 }

+ 40 - 0
client/client_test.go

@@ -802,6 +802,46 @@ func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
 	}
 }
 
+// TestHTTPClusterClientSyncPinEndpoint tests that Sync() pins the endpoint when
+// it gets the exactly same member list as before.
+func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
+	cf := newStaticHTTPClientFactory([]staticHTTPResponse{
+		staticHTTPResponse{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		staticHTTPResponse{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+		staticHTTPResponse{
+			resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
+			body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
+		},
+	})
+
+	hc := &httpClusterClient{
+		clientFactory: cf,
+		rand:          rand.New(rand.NewSource(0)),
+	}
+	err := hc.reset([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
+	if err != nil {
+		t.Fatalf("unexpected error during setup: %#v", err)
+	}
+	pinnedEndpoint := hc.endpoints[hc.pinned]
+
+	for i := 0; i < 3; i++ {
+		err = hc.Sync(context.Background())
+		if err != nil {
+			t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
+		}
+
+		if g := hc.endpoints[hc.pinned]; g != pinnedEndpoint {
+			t.Errorf("#%d: pinned endpoint = %s, want %s", i, g, pinnedEndpoint)
+		}
+	}
+}
+
 func TestHTTPClusterClientResetFail(t *testing.T) {
 	tests := [][]string{
 		// need at least one endpoint