|
|
@@ -23,21 +23,23 @@ import (
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/http/httptest"
|
|
|
- "net/url"
|
|
|
"os"
|
|
|
"strings"
|
|
|
"testing"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/coreos/etcd/client"
|
|
|
"github.com/coreos/etcd/etcdserver"
|
|
|
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
|
|
- "github.com/coreos/etcd/pkg/transport"
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
+
|
|
|
+ "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- tickDuration = 10 * time.Millisecond
|
|
|
- clusterName = "etcd"
|
|
|
+ tickDuration = 10 * time.Millisecond
|
|
|
+ clusterName = "etcd"
|
|
|
+ requestTimeout = 2 * time.Second
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
@@ -52,41 +54,27 @@ func testCluster(t *testing.T, size int) {
|
|
|
defer afterTest(t)
|
|
|
c := &cluster{Size: size}
|
|
|
c.Launch(t)
|
|
|
+ defer c.Terminate(t)
|
|
|
for i := 0; i < size; i++ {
|
|
|
- for _, u := range c.Members[i].ClientURLs {
|
|
|
+ for j, u := range c.Members[i].ClientURLs {
|
|
|
+ cc := mustNewHTTPClient(t, []string{u.String()})
|
|
|
+ kapi := client.NewKeysAPI(cc)
|
|
|
+ // TODO: we retry it here because MsgProp may be dropped due to
|
|
|
+ // sender reaches its max serving. make it reliable that we don't
|
|
|
+ // need to worry about it.
|
|
|
var err error
|
|
|
- for j := 0; j < 3; j++ {
|
|
|
- if err = setKey(u, "/foo", "bar"); err == nil {
|
|
|
+ for k := 0; k < 3; k++ {
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
+ if _, err = kapi.Create(ctx, fmt.Sprintf("/%d%d%d", i, j, k), "bar", -1); err == nil {
|
|
|
break
|
|
|
}
|
|
|
+ cancel()
|
|
|
}
|
|
|
if err != nil {
|
|
|
- t.Errorf("setKey on %v error: %v", u.String(), err)
|
|
|
+ t.Errorf("create on %s error: %v", u.String(), err)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- c.Terminate(t)
|
|
|
-}
|
|
|
-
|
|
|
-// TODO: use etcd client
|
|
|
-func setKey(u url.URL, key string, value string) error {
|
|
|
- u.Path = "/v2/keys" + key
|
|
|
- v := url.Values{"value": []string{value}}
|
|
|
- req, err := http.NewRequest("PUT", u.String(), strings.NewReader(v.Encode()))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
|
- resp, err := http.DefaultClient.Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- ioutil.ReadAll(resp.Body)
|
|
|
- resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
|
|
- return fmt.Errorf("statusCode = %d, want %d or %d", resp.StatusCode, http.StatusOK, http.StatusCreated)
|
|
|
- }
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
type cluster struct {
|
|
|
@@ -130,13 +118,7 @@ func (c *cluster) Launch(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
m.NewCluster = true
|
|
|
- m.Transport, err = transport.NewTransport(transport.TLSInfo{})
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- // TODO: need the support of graceful stop in Sender to remove this
|
|
|
- m.Transport.DisableKeepAlives = true
|
|
|
- m.Transport.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
|
|
+ m.Transport = newTransport()
|
|
|
|
|
|
m.Launch(t)
|
|
|
c.Members = append(c.Members, m)
|
|
|
@@ -223,3 +205,19 @@ func (m *member) Terminate(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
|
|
|
+ cc, err := client.NewHTTPClient(newTransport(), eps)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ return cc
|
|
|
+}
|
|
|
+
|
|
|
+func newTransport() *http.Transport {
|
|
|
+ tr := &http.Transport{}
|
|
|
+ // TODO: need the support of graceful stop in Sender to remove this
|
|
|
+ tr.DisableKeepAlives = true
|
|
|
+ tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
|
|
+ return tr
|
|
|
+}
|