|
|
@@ -34,6 +34,7 @@ var (
|
|
|
ErrNoEndpoints = errors.New("client: no endpoints available")
|
|
|
ErrTooManyRedirects = errors.New("client: too many redirects")
|
|
|
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
|
|
+ ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
|
|
|
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
|
|
)
|
|
|
|
|
|
@@ -48,6 +49,19 @@ var DefaultTransport CancelableTransport = &http.Transport{
|
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
|
}
|
|
|
|
|
|
+type EndpointSelectionMode int
|
|
|
+
|
|
|
+const (
|
|
|
+ // EndpointSelectionRandom is to pick an endpoint in a random manner.
|
|
|
+ EndpointSelectionRandom EndpointSelectionMode = iota
|
|
|
+
|
|
|
+ // EndpointSelectionPrioritizeLeader is to prioritize leader for reducing needless
|
|
|
+ // forward between follower and leader.
|
|
|
+ //
|
|
|
+ // This mode should be used with Client.AutoSync().
|
|
|
+ EndpointSelectionPrioritizeLeader
|
|
|
+)
|
|
|
+
|
|
|
type Config struct {
|
|
|
// Endpoints defines a set of URLs (schemes, hosts and ports only)
|
|
|
// that can be used to communicate with a logical etcd cluster. For
|
|
|
@@ -104,6 +118,9 @@ type Config struct {
|
|
|
//
|
|
|
// A HeaderTimeoutPerRequest of zero means no timeout.
|
|
|
HeaderTimeoutPerRequest time.Duration
|
|
|
+
|
|
|
+ // SelectionMode specifies a way of selecting destination endpoint.
|
|
|
+ SelectionMode EndpointSelectionMode
|
|
|
}
|
|
|
|
|
|
func (cfg *Config) transport() CancelableTransport {
|
|
|
@@ -169,6 +186,7 @@ func New(cfg Config) (Client, error) {
|
|
|
c := &httpClusterClient{
|
|
|
clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
|
|
|
rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
|
|
|
+ selectionMode: cfg.SelectionMode,
|
|
|
}
|
|
|
if cfg.Username != "" {
|
|
|
c.credentials = &credentials{
|
|
|
@@ -216,7 +234,18 @@ type httpClusterClient struct {
|
|
|
pinned int
|
|
|
credentials *credentials
|
|
|
sync.RWMutex
|
|
|
- rand *rand.Rand
|
|
|
+ rand *rand.Rand
|
|
|
+ selectionMode EndpointSelectionMode
|
|
|
+}
|
|
|
+
|
|
|
+func (c *httpClusterClient) getLeaderEndpoint() (string, error) {
|
|
|
+ mAPI := NewMembersAPI(c)
|
|
|
+ leader, err := mAPI.Leader(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+
|
|
|
+ return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
|
|
|
}
|
|
|
|
|
|
func (c *httpClusterClient) reset(eps []string) error {
|
|
|
@@ -233,9 +262,28 @@ func (c *httpClusterClient) reset(eps []string) error {
|
|
|
neps[i] = *u
|
|
|
}
|
|
|
|
|
|
- c.endpoints = shuffleEndpoints(c.rand, neps)
|
|
|
- // TODO: pin old endpoint if possible, and rebalance when new endpoint appears
|
|
|
- c.pinned = 0
|
|
|
+ switch c.selectionMode {
|
|
|
+ case EndpointSelectionRandom:
|
|
|
+ c.endpoints = shuffleEndpoints(c.rand, neps)
|
|
|
+ c.pinned = 0
|
|
|
+ case EndpointSelectionPrioritizeLeader:
|
|
|
+ c.endpoints = neps
|
|
|
+ lep, err := c.getLeaderEndpoint()
|
|
|
+ if err != nil {
|
|
|
+ return ErrNoLeaderEndpoint
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := range c.endpoints {
|
|
|
+ if c.endpoints[i].String() == lep {
|
|
|
+ c.pinned = i
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // If endpoints doesn't have the lu, just keep c.pinned = 0.
|
|
|
+ // Forwarding between follower and leader would be required but it works.
|
|
|
+ default:
|
|
|
+ return errors.New(fmt.Sprintf("invalid endpoint selection mode: %d", c.selectionMode))
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|