Browse Source

Merge pull request #1553 from bcwaldon/client-sync

Support syncing and `--no-sync` flag in `etcdctl member` commands
Brian Waldon 11 years ago
parent
commit
75104c10d4

+ 0 - 53
client/cluster.go

@@ -1,53 +0,0 @@
-/*
-   Copyright 2014 CoreOS, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package client
-
-import (
-	"net/http"
-	"net/url"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
-)
-
-func newHTTPClusterClient(tr *http.Transport, eps []string) (*httpClusterClient, error) {
-	c := httpClusterClient{
-		endpoints: make([]*httpClient, len(eps)),
-	}
-
-	for i, ep := range eps {
-		u, err := url.Parse(ep)
-		if err != nil {
-			return nil, err
-		}
-
-		c.endpoints[i] = &httpClient{
-			transport: tr,
-			endpoint:  *u,
-		}
-	}
-
-	return &c, nil
-}
-
-type httpClusterClient struct {
-	endpoints []*httpClient
-}
-
-func (c *httpClusterClient) do(ctx context.Context, act httpAction) (int, []byte, error) {
-	//TODO(bcwaldon): introduce retry logic so all endpoints are attempted
-	return c.endpoints[0].do(ctx, act)
-}

+ 71 - 12
client/http.go

@@ -30,20 +30,80 @@ var (
 	DefaultRequestTimeout = 5 * time.Second
 	DefaultRequestTimeout = 5 * time.Second
 )
 )
 
 
-// transport mimics http.Transport to provide an interface which can be
+type SyncableHTTPClient interface {
+	HTTPClient
+	Sync(context.Context) error
+}
+
+type HTTPClient interface {
+	Do(context.Context, HTTPAction) (*http.Response, []byte, error)
+}
+
+type HTTPAction interface {
+	HTTPRequest(url.URL) *http.Request
+}
+
+// CancelableTransport mimics http.Transport to provide an interface which can be
 // substituted for testing (since the RoundTripper interface alone does not
 // substituted for testing (since the RoundTripper interface alone does not
 // require the CancelRequest method)
 // require the CancelRequest method)
-type transport interface {
+type CancelableTransport interface {
 	http.RoundTripper
 	http.RoundTripper
 	CancelRequest(req *http.Request)
 	CancelRequest(req *http.Request)
 }
 }
 
 
-type httpAction interface {
-	httpRequest(url.URL) *http.Request
+func NewHTTPClient(tr CancelableTransport, eps []string) (SyncableHTTPClient, error) {
+	return newHTTPClusterClient(tr, eps)
+}
+
+func newHTTPClusterClient(tr CancelableTransport, eps []string) (*httpClusterClient, error) {
+	c := httpClusterClient{
+		transport: tr,
+		endpoints: make([]HTTPClient, len(eps)),
+	}
+
+	for i, ep := range eps {
+		u, err := url.Parse(ep)
+		if err != nil {
+			return nil, err
+		}
+
+		c.endpoints[i] = &httpClient{
+			transport: tr,
+			endpoint:  *u,
+		}
+	}
+
+	return &c, nil
+}
+
+type httpClusterClient struct {
+	transport CancelableTransport
+	endpoints []HTTPClient
 }
 }
 
 
-type httpActionDo interface {
-	do(context.Context, httpAction) (int, []byte, error)
+func (c *httpClusterClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
+	//TODO(bcwaldon): introduce retry logic so all endpoints are attempted
+	return c.endpoints[0].Do(ctx, act)
+}
+
+func (c *httpClusterClient) Sync(ctx context.Context) error {
+	mAPI := NewMembersAPI(c)
+	ms, err := mAPI.List(ctx)
+	if err != nil {
+		return err
+	}
+
+	eps := make([]string, 0)
+	for _, m := range ms {
+		eps = append(eps, m.ClientURLs...)
+	}
+	nc, err := newHTTPClusterClient(c.transport, eps)
+	if err != nil {
+		return err
+	}
+
+	*c = *nc
+	return nil
 }
 }
 
 
 type roundTripResponse struct {
 type roundTripResponse struct {
@@ -52,13 +112,12 @@ type roundTripResponse struct {
 }
 }
 
 
 type httpClient struct {
 type httpClient struct {
-	transport transport
+	transport CancelableTransport
 	endpoint  url.URL
 	endpoint  url.URL
-	timeout   time.Duration
 }
 }
 
 
-func (c *httpClient) do(ctx context.Context, act httpAction) (int, []byte, error) {
-	req := act.httpRequest(c.endpoint)
+func (c *httpClient) Do(ctx context.Context, act HTTPAction) (*http.Response, []byte, error) {
+	req := act.HTTPRequest(c.endpoint)
 
 
 	rtchan := make(chan roundTripResponse, 1)
 	rtchan := make(chan roundTripResponse, 1)
 	go func() {
 	go func() {
@@ -89,9 +148,9 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (int, []byte, error
 	}()
 	}()
 
 
 	if err != nil {
 	if err != nil {
-		return 0, nil, err
+		return nil, nil, err
 	}
 	}
 
 
 	body, err := ioutil.ReadAll(resp.Body)
 	body, err := ioutil.ReadAll(resp.Body)
-	return resp.StatusCode, body, err
+	return resp, body, err
 }
 }

+ 7 - 7
client/http_test.go

@@ -65,7 +65,7 @@ func (t *fakeTransport) CancelRequest(*http.Request) {
 
 
 type fakeAction struct{}
 type fakeAction struct{}
 
 
-func (a *fakeAction) httpRequest(url.URL) *http.Request {
+func (a *fakeAction) HTTPRequest(url.URL) *http.Request {
 	return &http.Request{}
 	return &http.Request{}
 }
 }
 
 
@@ -78,14 +78,14 @@ func TestHTTPClientDoSuccess(t *testing.T) {
 		Body:       ioutil.NopCloser(strings.NewReader("foo")),
 		Body:       ioutil.NopCloser(strings.NewReader("foo")),
 	}
 	}
 
 
-	code, body, err := c.do(context.Background(), &fakeAction{})
+	resp, body, err := c.Do(context.Background(), &fakeAction{})
 	if err != nil {
 	if err != nil {
 		t.Fatalf("incorrect error value: want=nil got=%v", err)
 		t.Fatalf("incorrect error value: want=nil got=%v", err)
 	}
 	}
 
 
 	wantCode := http.StatusTeapot
 	wantCode := http.StatusTeapot
-	if wantCode != code {
-		t.Fatalf("invalid response code: want=%d got=%d", wantCode, code)
+	if wantCode != resp.StatusCode {
+		t.Fatalf("invalid response code: want=%d got=%d", wantCode, resp.StatusCode)
 	}
 	}
 
 
 	wantBody := []byte("foo")
 	wantBody := []byte("foo")
@@ -100,7 +100,7 @@ func TestHTTPClientDoError(t *testing.T) {
 
 
 	tr.errchan <- errors.New("fixture")
 	tr.errchan <- errors.New("fixture")
 
 
-	_, _, err := c.do(context.Background(), &fakeAction{})
+	_, _, err := c.Do(context.Background(), &fakeAction{})
 	if err == nil {
 	if err == nil {
 		t.Fatalf("expected non-nil error, got nil")
 		t.Fatalf("expected non-nil error, got nil")
 	}
 	}
@@ -113,7 +113,7 @@ func TestHTTPClientDoCancelContext(t *testing.T) {
 	tr.startCancel <- struct{}{}
 	tr.startCancel <- struct{}{}
 	tr.finishCancel <- struct{}{}
 	tr.finishCancel <- struct{}{}
 
 
-	_, _, err := c.do(context.Background(), &fakeAction{})
+	_, _, err := c.Do(context.Background(), &fakeAction{})
 	if err == nil {
 	if err == nil {
 		t.Fatalf("expected non-nil error, got nil")
 		t.Fatalf("expected non-nil error, got nil")
 	}
 	}
@@ -126,7 +126,7 @@ func TestHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) {
 	donechan := make(chan struct{})
 	donechan := make(chan struct{})
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	go func() {
 	go func() {
-		c.do(ctx, &fakeAction{})
+		c.Do(ctx, &fakeAction{})
 		close(donechan)
 		close(donechan)
 	}()
 	}()
 
 

+ 28 - 42
client/keys.go

@@ -41,38 +41,30 @@ var (
 	ErrKeyExists   = errors.New("client: key already exists")
 	ErrKeyExists   = errors.New("client: key already exists")
 )
 )
 
 
-func NewKeysAPI(tr *http.Transport, eps []string, to time.Duration) (KeysAPI, error) {
-	return newHTTPKeysAPIWithPrefix(tr, eps, to, DefaultV2KeysPrefix)
-}
-
-func NewDiscoveryKeysAPI(tr *http.Transport, eps []string, to time.Duration) (KeysAPI, error) {
-	return newHTTPKeysAPIWithPrefix(tr, eps, to, "")
-}
-
-func newHTTPKeysAPIWithPrefix(tr *http.Transport, eps []string, to time.Duration, prefix string) (*httpKeysAPI, error) {
-	c, err := newHTTPClusterClient(tr, eps)
-	if err != nil {
-		return nil, err
+func NewKeysAPI(c HTTPClient) KeysAPI {
+	return &httpKeysAPI{
+		client: c,
+		prefix: DefaultV2KeysPrefix,
 	}
 	}
+}
 
 
-	kAPI := httpKeysAPI{
-		client:  c,
-		prefix:  prefix,
-		timeout: to,
+func NewDiscoveryKeysAPI(c HTTPClient) KeysAPI {
+	return &httpKeysAPI{
+		client: c,
+		prefix: "",
 	}
 	}
-
-	return &kAPI, nil
 }
 }
 
 
 type KeysAPI interface {
 type KeysAPI interface {
-	Create(key, value string, ttl time.Duration) (*Response, error)
-	Get(key string) (*Response, error)
+	Create(ctx context.Context, key, value string, ttl time.Duration) (*Response, error)
+	Get(ctx context.Context, key string) (*Response, error)
+
 	Watch(key string, idx uint64) Watcher
 	Watch(key string, idx uint64) Watcher
 	RecursiveWatch(key string, idx uint64) Watcher
 	RecursiveWatch(key string, idx uint64) Watcher
 }
 }
 
 
 type Watcher interface {
 type Watcher interface {
-	Next() (*Response, error)
+	Next(context.Context) (*Response, error)
 }
 }
 
 
 type Response struct {
 type Response struct {
@@ -95,12 +87,11 @@ func (n *Node) String() string {
 }
 }
 
 
 type httpKeysAPI struct {
 type httpKeysAPI struct {
-	client  httpActionDo
-	prefix  string
-	timeout time.Duration
+	client HTTPClient
+	prefix string
 }
 }
 
 
-func (k *httpKeysAPI) Create(key, val string, ttl time.Duration) (*Response, error) {
+func (k *httpKeysAPI) Create(ctx context.Context, key, val string, ttl time.Duration) (*Response, error) {
 	create := &createAction{
 	create := &createAction{
 		Prefix: k.prefix,
 		Prefix: k.prefix,
 		Key:    key,
 		Key:    key,
@@ -111,31 +102,27 @@ func (k *httpKeysAPI) Create(key, val string, ttl time.Duration) (*Response, err
 		create.TTL = &uttl
 		create.TTL = &uttl
 	}
 	}
 
 
-	ctx, cancel := context.WithTimeout(context.Background(), k.timeout)
-	code, body, err := k.client.do(ctx, create)
-	cancel()
+	resp, body, err := k.client.Do(ctx, create)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return unmarshalHTTPResponse(code, body)
+	return unmarshalHTTPResponse(resp.StatusCode, body)
 }
 }
 
 
-func (k *httpKeysAPI) Get(key string) (*Response, error) {
+func (k *httpKeysAPI) Get(ctx context.Context, key string) (*Response, error) {
 	get := &getAction{
 	get := &getAction{
 		Prefix:    k.prefix,
 		Prefix:    k.prefix,
 		Key:       key,
 		Key:       key,
 		Recursive: false,
 		Recursive: false,
 	}
 	}
 
 
-	ctx, cancel := context.WithTimeout(context.Background(), k.timeout)
-	code, body, err := k.client.do(ctx, get)
-	cancel()
+	resp, body, err := k.client.Do(ctx, get)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return unmarshalHTTPResponse(code, body)
+	return unmarshalHTTPResponse(resp.StatusCode, body)
 }
 }
 
 
 func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher {
 func (k *httpKeysAPI) Watch(key string, idx uint64) Watcher {
@@ -163,18 +150,17 @@ func (k *httpKeysAPI) RecursiveWatch(key string, idx uint64) Watcher {
 }
 }
 
 
 type httpWatcher struct {
 type httpWatcher struct {
-	client   httpActionDo
+	client   HTTPClient
 	nextWait waitAction
 	nextWait waitAction
 }
 }
 
 
-func (hw *httpWatcher) Next() (*Response, error) {
-	//TODO(bcwaldon): This needs to be cancellable by the calling user
-	code, body, err := hw.client.do(context.Background(), &hw.nextWait)
+func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
+	httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resp, err := unmarshalHTTPResponse(code, body)
+	resp, err := unmarshalHTTPResponse(httpresp.StatusCode, body)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -199,7 +185,7 @@ type getAction struct {
 	Recursive bool
 	Recursive bool
 }
 }
 
 
-func (g *getAction) httpRequest(ep url.URL) *http.Request {
+func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
 	u := v2KeysURL(ep, g.Prefix, g.Key)
 	u := v2KeysURL(ep, g.Prefix, g.Key)
 
 
 	params := u.Query()
 	params := u.Query()
@@ -217,7 +203,7 @@ type waitAction struct {
 	Recursive bool
 	Recursive bool
 }
 }
 
 
-func (w *waitAction) httpRequest(ep url.URL) *http.Request {
+func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
 	u := v2KeysURL(ep, w.Prefix, w.Key)
 	u := v2KeysURL(ep, w.Prefix, w.Key)
 
 
 	params := u.Query()
 	params := u.Query()
@@ -237,7 +223,7 @@ type createAction struct {
 	TTL    *uint64
 	TTL    *uint64
 }
 }
 
 
-func (c *createAction) httpRequest(ep url.URL) *http.Request {
+func (c *createAction) HTTPRequest(ep url.URL) *http.Request {
 	u := v2KeysURL(ep, c.Prefix, c.Key)
 	u := v2KeysURL(ep, c.Prefix, c.Key)
 
 
 	params := u.Query()
 	params := u.Query()

+ 3 - 3
client/keys_test.go

@@ -117,7 +117,7 @@ func TestGetAction(t *testing.T) {
 			Key:       "/foo/bar",
 			Key:       "/foo/bar",
 			Recursive: tt.recursive,
 			Recursive: tt.recursive,
 		}
 		}
-		got := *f.httpRequest(ep)
+		got := *f.HTTPRequest(ep)
 
 
 		wantURL := wantURL
 		wantURL := wantURL
 		wantURL.RawQuery = tt.wantQuery
 		wantURL.RawQuery = tt.wantQuery
@@ -166,7 +166,7 @@ func TestWaitAction(t *testing.T) {
 			WaitIndex: tt.waitIndex,
 			WaitIndex: tt.waitIndex,
 			Recursive: tt.recursive,
 			Recursive: tt.recursive,
 		}
 		}
-		got := *f.httpRequest(ep)
+		got := *f.HTTPRequest(ep)
 
 
 		wantURL := wantURL
 		wantURL := wantURL
 		wantURL.RawQuery = tt.wantQuery
 		wantURL.RawQuery = tt.wantQuery
@@ -213,7 +213,7 @@ func TestCreateAction(t *testing.T) {
 			Value: tt.value,
 			Value: tt.value,
 			TTL:   tt.ttl,
 			TTL:   tt.ttl,
 		}
 		}
-		got := *f.httpRequest(ep)
+		got := *f.HTTPRequest(ep)
 
 
 		err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody))
 		err := assertResponse(got, wantURL, wantHeader, []byte(tt.wantBody))
 		if err != nil {
 		if err != nil {

+ 19 - 35
client/members.go

@@ -23,7 +23,6 @@ import (
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"path"
 	"path"
-	"time"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
@@ -34,41 +33,30 @@ var (
 	DefaultV2MembersPrefix = "/v2/members"
 	DefaultV2MembersPrefix = "/v2/members"
 )
 )
 
 
-func NewMembersAPI(tr *http.Transport, eps []string, to time.Duration) (MembersAPI, error) {
-	c, err := newHTTPClusterClient(tr, eps)
-	if err != nil {
-		return nil, err
+func NewMembersAPI(c HTTPClient) MembersAPI {
+	return &httpMembersAPI{
+		client: c,
 	}
 	}
-
-	mAPI := httpMembersAPI{
-		client:  c,
-		timeout: to,
-	}
-
-	return &mAPI, nil
 }
 }
 
 
 type MembersAPI interface {
 type MembersAPI interface {
-	List() ([]httptypes.Member, error)
-	Add(peerURL string) (*httptypes.Member, error)
-	Remove(mID string) error
+	List(ctx context.Context) ([]httptypes.Member, error)
+	Add(ctx context.Context, peerURL string) (*httptypes.Member, error)
+	Remove(ctx context.Context, mID string) error
 }
 }
 
 
 type httpMembersAPI struct {
 type httpMembersAPI struct {
-	client  httpActionDo
-	timeout time.Duration
+	client HTTPClient
 }
 }
 
 
-func (m *httpMembersAPI) List() ([]httptypes.Member, error) {
+func (m *httpMembersAPI) List(ctx context.Context) ([]httptypes.Member, error) {
 	req := &membersAPIActionList{}
 	req := &membersAPIActionList{}
-	ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
-	code, body, err := m.client.do(ctx, req)
-	cancel()
+	resp, body, err := m.client.Do(ctx, req)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	if err := assertStatusCode(http.StatusOK, code); err != nil {
+	if err := assertStatusCode(http.StatusOK, resp.StatusCode); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -80,21 +68,19 @@ func (m *httpMembersAPI) List() ([]httptypes.Member, error) {
 	return []httptypes.Member(mCollection), nil
 	return []httptypes.Member(mCollection), nil
 }
 }
 
 
-func (m *httpMembersAPI) Add(peerURL string) (*httptypes.Member, error) {
+func (m *httpMembersAPI) Add(ctx context.Context, peerURL string) (*httptypes.Member, error) {
 	urls, err := types.NewURLs([]string{peerURL})
 	urls, err := types.NewURLs([]string{peerURL})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
 	req := &membersAPIActionAdd{peerURLs: urls}
 	req := &membersAPIActionAdd{peerURLs: urls}
-	ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
-	code, body, err := m.client.do(ctx, req)
-	cancel()
+	resp, body, err := m.client.Do(ctx, req)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	if err := assertStatusCode(http.StatusCreated, code); err != nil {
+	if err := assertStatusCode(http.StatusCreated, resp.StatusCode); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -106,21 +92,19 @@ func (m *httpMembersAPI) Add(peerURL string) (*httptypes.Member, error) {
 	return &memb, nil
 	return &memb, nil
 }
 }
 
 
-func (m *httpMembersAPI) Remove(memberID string) error {
+func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
 	req := &membersAPIActionRemove{memberID: memberID}
 	req := &membersAPIActionRemove{memberID: memberID}
-	ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
-	code, _, err := m.client.do(ctx, req)
-	cancel()
+	resp, _, err := m.client.Do(ctx, req)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	return assertStatusCode(http.StatusNoContent, code)
+	return assertStatusCode(http.StatusNoContent, resp.StatusCode)
 }
 }
 
 
 type membersAPIActionList struct{}
 type membersAPIActionList struct{}
 
 
-func (l *membersAPIActionList) httpRequest(ep url.URL) *http.Request {
+func (l *membersAPIActionList) HTTPRequest(ep url.URL) *http.Request {
 	u := v2MembersURL(ep)
 	u := v2MembersURL(ep)
 	req, _ := http.NewRequest("GET", u.String(), nil)
 	req, _ := http.NewRequest("GET", u.String(), nil)
 	return req
 	return req
@@ -130,7 +114,7 @@ type membersAPIActionRemove struct {
 	memberID string
 	memberID string
 }
 }
 
 
-func (d *membersAPIActionRemove) httpRequest(ep url.URL) *http.Request {
+func (d *membersAPIActionRemove) HTTPRequest(ep url.URL) *http.Request {
 	u := v2MembersURL(ep)
 	u := v2MembersURL(ep)
 	u.Path = path.Join(u.Path, d.memberID)
 	u.Path = path.Join(u.Path, d.memberID)
 	req, _ := http.NewRequest("DELETE", u.String(), nil)
 	req, _ := http.NewRequest("DELETE", u.String(), nil)
@@ -141,7 +125,7 @@ type membersAPIActionAdd struct {
 	peerURLs types.URLs
 	peerURLs types.URLs
 }
 }
 
 
-func (a *membersAPIActionAdd) httpRequest(ep url.URL) *http.Request {
+func (a *membersAPIActionAdd) HTTPRequest(ep url.URL) *http.Request {
 	u := v2MembersURL(ep)
 	u := v2MembersURL(ep)
 	m := httptypes.MemberCreateRequest{PeerURLs: a.peerURLs}
 	m := httptypes.MemberCreateRequest{PeerURLs: a.peerURLs}
 	b, _ := json.Marshal(&m)
 	b, _ := json.Marshal(&m)

+ 3 - 3
client/members_test.go

@@ -35,7 +35,7 @@ func TestMembersAPIActionList(t *testing.T) {
 		Path:   "/v2/members",
 		Path:   "/v2/members",
 	}
 	}
 
 
-	got := *act.httpRequest(ep)
+	got := *act.HTTPRequest(ep)
 	err := assertResponse(got, wantURL, http.Header{}, nil)
 	err := assertResponse(got, wantURL, http.Header{}, nil)
 	if err != nil {
 	if err != nil {
 		t.Error(err.Error())
 		t.Error(err.Error())
@@ -61,7 +61,7 @@ func TestMembersAPIActionAdd(t *testing.T) {
 	}
 	}
 	wantBody := []byte(`{"peerURLs":["https://127.0.0.1:8081","http://127.0.0.1:8080"]}`)
 	wantBody := []byte(`{"peerURLs":["https://127.0.0.1:8081","http://127.0.0.1:8080"]}`)
 
 
-	got := *act.httpRequest(ep)
+	got := *act.HTTPRequest(ep)
 	err := assertResponse(got, wantURL, wantHeader, wantBody)
 	err := assertResponse(got, wantURL, wantHeader, wantBody)
 	if err != nil {
 	if err != nil {
 		t.Error(err.Error())
 		t.Error(err.Error())
@@ -78,7 +78,7 @@ func TestMembersAPIActionRemove(t *testing.T) {
 		Path:   "/v2/members/XXX",
 		Path:   "/v2/members/XXX",
 	}
 	}
 
 
-	got := *act.httpRequest(ep)
+	got := *act.HTTPRequest(ep)
 	err := assertResponse(got, wantURL, http.Header{}, nil)
 	err := assertResponse(got, wantURL, http.Header{}, nil)
 	if err != nil {
 	if err != nil {
 		t.Error(err.Error())
 		t.Error(err.Error())

+ 15 - 7
discovery/discovery.go

@@ -29,6 +29,7 @@ import (
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
@@ -106,15 +107,16 @@ func New(durl string, id types.ID, config string) (Discoverer, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	c, err := client.NewDiscoveryKeysAPI(&http.Transport{Proxy: pf}, []string{u.String()}, client.DefaultRequestTimeout)
+	c, err := client.NewHTTPClient(&http.Transport{Proxy: pf}, []string{u.String()})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	dc := client.NewDiscoveryKeysAPI(c)
 	return &discovery{
 	return &discovery{
 		cluster: token,
 		cluster: token,
 		id:      id,
 		id:      id,
 		config:  config,
 		config:  config,
-		c:       c,
+		c:       dc,
 		url:     u,
 		url:     u,
 		clock:   clockwork.NewRealClock(),
 		clock:   clockwork.NewRealClock(),
 	}, nil
 	}, nil
@@ -149,21 +151,25 @@ func (d *discovery) Discover() (string, error) {
 }
 }
 
 
 func (d *discovery) createSelf() error {
 func (d *discovery) createSelf() error {
-	resp, err := d.c.Create(d.selfKey(), d.config, -1)
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1)
+	cancel()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	// ensure self appears on the server we connected to
 	// ensure self appears on the server we connected to
 	w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
 	w := d.c.Watch(d.selfKey(), resp.Node.CreatedIndex)
-	_, err = w.Next()
+	_, err = w.Next(context.Background())
 	return err
 	return err
 }
 }
 
 
 func (d *discovery) checkCluster() (client.Nodes, int, error) {
 func (d *discovery) checkCluster() (client.Nodes, int, error) {
 	configKey := path.Join("/", d.cluster, "_config")
 	configKey := path.Join("/", d.cluster, "_config")
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
 	// find cluster size
 	// find cluster size
-	resp, err := d.c.Get(path.Join(configKey, "size"))
+	resp, err := d.c.Get(ctx, path.Join(configKey, "size"))
+	cancel()
 	if err != nil {
 	if err != nil {
 		if err == client.ErrKeyNoExist {
 		if err == client.ErrKeyNoExist {
 			return nil, 0, ErrSizeNotFound
 			return nil, 0, ErrSizeNotFound
@@ -178,7 +184,9 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 		return nil, 0, ErrBadSizeKey
 		return nil, 0, ErrBadSizeKey
 	}
 	}
 
 
-	resp, err = d.c.Get(d.cluster)
+	ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	resp, err = d.c.Get(ctx, d.cluster)
+	cancel()
 	if err != nil {
 	if err != nil {
 		if err == client.ErrTimeout {
 		if err == client.ErrTimeout {
 			return d.checkClusterRetry()
 			return d.checkClusterRetry()
@@ -253,7 +261,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error
 	// wait for others
 	// wait for others
 	for len(all) < size {
 	for len(all) < size {
 		log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
 		log.Printf("discovery: found %d peer(s), waiting for %d more", len(all), size-len(all))
-		resp, err := w.Next()
+		resp, err := w.Next(context.Background())
 		if err != nil {
 		if err != nil {
 			if err == client.ErrTimeout {
 			if err == client.ErrTimeout {
 				return d.waitNodesRetry()
 				return d.waitNodesRetry()

+ 13 - 13
discovery/discovery_test.go

@@ -21,13 +21,13 @@ import (
 	"math/rand"
 	"math/rand"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
+	"reflect"
 	"sort"
 	"sort"
 	"strconv"
 	"strconv"
-
-	"reflect"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/client"
 )
 )
@@ -397,7 +397,7 @@ type clientWithResp struct {
 	w  client.Watcher
 	w  client.Watcher
 }
 }
 
 
-func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+func (c *clientWithResp) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
 	if len(c.rs) == 0 {
 	if len(c.rs) == 0 {
 		return &client.Response{}, nil
 		return &client.Response{}, nil
 	}
 	}
@@ -406,7 +406,7 @@ func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*c
 	return r, nil
 	return r, nil
 }
 }
 
 
-func (c *clientWithResp) Get(key string) (*client.Response, error) {
+func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response, error) {
 	if len(c.rs) == 0 {
 	if len(c.rs) == 0 {
 		return &client.Response{}, client.ErrKeyNoExist
 		return &client.Response{}, client.ErrKeyNoExist
 	}
 	}
@@ -428,11 +428,11 @@ type clientWithErr struct {
 	w   client.Watcher
 	w   client.Watcher
 }
 }
 
 
-func (c *clientWithErr) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+func (c *clientWithErr) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
 	return &client.Response{}, c.err
 	return &client.Response{}, c.err
 }
 }
 
 
-func (c *clientWithErr) Get(key string) (*client.Response, error) {
+func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response, error) {
 	return &client.Response{}, c.err
 	return &client.Response{}, c.err
 }
 }
 
 
@@ -448,7 +448,7 @@ type watcherWithResp struct {
 	rs []*client.Response
 	rs []*client.Response
 }
 }
 
 
-func (w *watcherWithResp) Next() (*client.Response, error) {
+func (w *watcherWithResp) Next(context.Context) (*client.Response, error) {
 	if len(w.rs) == 0 {
 	if len(w.rs) == 0 {
 		return &client.Response{}, nil
 		return &client.Response{}, nil
 	}
 	}
@@ -461,7 +461,7 @@ type watcherWithErr struct {
 	err error
 	err error
 }
 }
 
 
-func (w *watcherWithErr) Next() (*client.Response, error) {
+func (w *watcherWithErr) Next(context.Context) (*client.Response, error) {
 	return &client.Response{}, w.err
 	return &client.Response{}, w.err
 }
 }
 
 
@@ -472,20 +472,20 @@ type clientWithRetry struct {
 	failTimes int
 	failTimes int
 }
 }
 
 
-func (c *clientWithRetry) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+func (c *clientWithRetry) Create(ctx context.Context, key string, value string, ttl time.Duration) (*client.Response, error) {
 	if c.failCount < c.failTimes {
 	if c.failCount < c.failTimes {
 		c.failCount++
 		c.failCount++
 		return nil, client.ErrTimeout
 		return nil, client.ErrTimeout
 	}
 	}
-	return c.clientWithResp.Create(key, value, ttl)
+	return c.clientWithResp.Create(ctx, key, value, ttl)
 }
 }
 
 
-func (c *clientWithRetry) Get(key string) (*client.Response, error) {
+func (c *clientWithRetry) Get(ctx context.Context, key string) (*client.Response, error) {
 	if c.failCount < c.failTimes {
 	if c.failCount < c.failTimes {
 		c.failCount++
 		c.failCount++
 		return nil, client.ErrTimeout
 		return nil, client.ErrTimeout
 	}
 	}
-	return c.clientWithResp.Get(key)
+	return c.clientWithResp.Get(ctx, key)
 }
 }
 
 
 // watcherWithRetry will timeout all requests up to failTimes
 // watcherWithRetry will timeout all requests up to failTimes
@@ -495,7 +495,7 @@ type watcherWithRetry struct {
 	failTimes int
 	failTimes int
 }
 }
 
 
-func (w *watcherWithRetry) Next() (*client.Response, error) {
+func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
 	if w.failCount < w.failTimes {
 	if w.failCount < w.failTimes {
 		w.failCount++
 		w.failCount++
 		return nil, client.ErrTimeout
 		return nil, client.ErrTimeout

+ 26 - 6
etcdctl/command/member_commands.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"os"
 	"strings"
 	"strings"
 
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/client"
 )
 )
@@ -42,13 +43,23 @@ func mustNewMembersAPI(c *cli.Context) client.MembersAPI {
 		}
 		}
 	}
 	}
 
 
-	mAPI, err := client.NewMembersAPI(&http.Transport{}, peers, client.DefaultRequestTimeout)
+	hc, err := client.NewHTTPClient(&http.Transport{}, peers)
 	if err != nil {
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err.Error())
 		fmt.Fprintln(os.Stderr, err.Error())
 		os.Exit(1)
 		os.Exit(1)
 	}
 	}
 
 
-	return mAPI
+	if !c.GlobalBool("no-sync") {
+		ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+		err := hc.Sync(ctx)
+		cancel()
+		if err != nil {
+			fmt.Fprintln(os.Stderr, err.Error())
+			os.Exit(1)
+		}
+	}
+
+	return client.NewMembersAPI(hc)
 }
 }
 
 
 func actionMemberList(c *cli.Context) {
 func actionMemberList(c *cli.Context) {
@@ -57,7 +68,9 @@ func actionMemberList(c *cli.Context) {
 		os.Exit(1)
 		os.Exit(1)
 	}
 	}
 	mAPI := mustNewMembersAPI(c)
 	mAPI := mustNewMembersAPI(c)
-	members, err := mAPI.List()
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	members, err := mAPI.List(ctx)
+	cancel()
 	if err != nil {
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err.Error())
 		fmt.Fprintln(os.Stderr, err.Error())
 		os.Exit(1)
 		os.Exit(1)
@@ -78,7 +91,9 @@ func actionMemberAdd(c *cli.Context) {
 	mAPI := mustNewMembersAPI(c)
 	mAPI := mustNewMembersAPI(c)
 
 
 	url := args[1]
 	url := args[1]
-	m, err := mAPI.Add(url)
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	m, err := mAPI.Add(ctx, url)
+	cancel()
 	if err != nil {
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err.Error())
 		fmt.Fprintln(os.Stderr, err.Error())
 		os.Exit(1)
 		os.Exit(1)
@@ -88,7 +103,9 @@ func actionMemberAdd(c *cli.Context) {
 	newName := args[0]
 	newName := args[0]
 	fmt.Printf("Added member named %s with ID %s to cluster\n", newName, newID)
 	fmt.Printf("Added member named %s with ID %s to cluster\n", newName, newID)
 
 
-	members, err := mAPI.List()
+	ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	members, err := mAPI.List(ctx)
+	cancel()
 	if err != nil {
 	if err != nil {
 		fmt.Fprintln(os.Stderr, err.Error())
 		fmt.Fprintln(os.Stderr, err.Error())
 		os.Exit(1)
 		os.Exit(1)
@@ -120,7 +137,10 @@ func actionMemberRemove(c *cli.Context) {
 
 
 	mAPI := mustNewMembersAPI(c)
 	mAPI := mustNewMembersAPI(c)
 	mID := args[0]
 	mID := args[0]
-	if err := mAPI.Remove(mID); err != nil {
+	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
+	err := mAPI.Remove(ctx, mID)
+	cancel()
+	if err != nil {
 		fmt.Fprintln(os.Stderr, err.Error())
 		fmt.Fprintln(os.Stderr, err.Error())
 		os.Exit(1)
 		os.Exit(1)
 	}
 	}