|
@@ -829,16 +829,19 @@ func TestRemoveNode(t *testing.T) {
|
|
|
|
|
|
|
|
func TestPublish(t *testing.T) {
|
|
func TestPublish(t *testing.T) {
|
|
|
n := &nodeProposeDataRecorder{}
|
|
n := &nodeProposeDataRecorder{}
|
|
|
|
|
+ cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
|
|
|
ch := make(chan interface{}, 1)
|
|
ch := make(chan interface{}, 1)
|
|
|
// simulate that request has gone through consensus
|
|
// simulate that request has gone through consensus
|
|
|
ch <- Response{}
|
|
ch <- Response{}
|
|
|
w := &waitWithResponse{ch: ch}
|
|
w := &waitWithResponse{ch: ch}
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
- Node: n,
|
|
|
|
|
- w: w,
|
|
|
|
|
|
|
+ Name: "node1",
|
|
|
|
|
+ ClientURLs: []string{"a", "b"},
|
|
|
|
|
+ Node: n,
|
|
|
|
|
+ ClusterStore: cs,
|
|
|
|
|
+ w: w,
|
|
|
}
|
|
}
|
|
|
- m := Member{ID: 1, Name: "node1"}
|
|
|
|
|
- srv.publish(m, time.Hour)
|
|
|
|
|
|
|
+ srv.publish(time.Hour)
|
|
|
|
|
|
|
|
data := n.data()
|
|
data := n.data()
|
|
|
if len(data) != 1 {
|
|
if len(data) != 1 {
|
|
@@ -851,39 +854,46 @@ func TestPublish(t *testing.T) {
|
|
|
if r.Method != "PUT" {
|
|
if r.Method != "PUT" {
|
|
|
t.Errorf("method = %s, want PUT", r.Method)
|
|
t.Errorf("method = %s, want PUT", r.Method)
|
|
|
}
|
|
}
|
|
|
- if r.Path != m.storeKey() {
|
|
|
|
|
- t.Errorf("path = %s, want %s", r.Path, m.storeKey())
|
|
|
|
|
|
|
+ wm := Member{ID: 1, Name: "node1", ClientURLs: []string{"a", "b"}}
|
|
|
|
|
+ if r.Path != wm.storeKey() {
|
|
|
|
|
+ t.Errorf("path = %s, want %s", r.Path, wm.storeKey())
|
|
|
}
|
|
}
|
|
|
var gm Member
|
|
var gm Member
|
|
|
if err := json.Unmarshal([]byte(r.Val), &gm); err != nil {
|
|
if err := json.Unmarshal([]byte(r.Val), &gm); err != nil {
|
|
|
t.Fatalf("unmarshal val error: %v", err)
|
|
t.Fatalf("unmarshal val error: %v", err)
|
|
|
}
|
|
}
|
|
|
- if !reflect.DeepEqual(gm, m) {
|
|
|
|
|
- t.Errorf("member = %v, want %v", gm, m)
|
|
|
|
|
|
|
+ if !reflect.DeepEqual(gm, wm) {
|
|
|
|
|
+ t.Errorf("member = %v, want %v", gm, wm)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TestPublishStopped tests that publish will be stopped if server is stopped.
|
|
// TestPublishStopped tests that publish will be stopped if server is stopped.
|
|
|
func TestPublishStopped(t *testing.T) {
|
|
func TestPublishStopped(t *testing.T) {
|
|
|
|
|
+ cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
- Node: &nodeRecorder{},
|
|
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
- done: make(chan struct{}),
|
|
|
|
|
|
|
+ Name: "node1",
|
|
|
|
|
+ Node: &nodeRecorder{},
|
|
|
|
|
+ ClusterStore: cs,
|
|
|
|
|
+ w: &waitRecorder{},
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
srv.Stop()
|
|
srv.Stop()
|
|
|
- srv.publish(Member{ID: 1, Name: "node1"}, time.Hour)
|
|
|
|
|
|
|
+ srv.publish(time.Hour)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
|
func TestPublishRetry(t *testing.T) {
|
|
func TestPublishRetry(t *testing.T) {
|
|
|
n := &nodeRecorder{}
|
|
n := &nodeRecorder{}
|
|
|
|
|
+ cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}})
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
- Node: n,
|
|
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
- done: make(chan struct{}),
|
|
|
|
|
|
|
+ Name: "node1",
|
|
|
|
|
+ Node: n,
|
|
|
|
|
+ ClusterStore: cs,
|
|
|
|
|
+ w: &waitRecorder{},
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
time.AfterFunc(500*time.Microsecond, srv.Stop)
|
|
time.AfterFunc(500*time.Microsecond, srv.Stop)
|
|
|
- srv.publish(Member{ID: 1, Name: "node1"}, 10*time.Nanosecond)
|
|
|
|
|
|
|
+ srv.publish(10 * time.Nanosecond)
|
|
|
|
|
|
|
|
action := n.Action()
|
|
action := n.Action()
|
|
|
// multiple Propose + Stop
|
|
// multiple Propose + Stop
|
|
@@ -1197,3 +1207,11 @@ func (w *waitWithResponse) Register(id int64) <-chan interface{} {
|
|
|
return w.ch
|
|
return w.ch
|
|
|
}
|
|
}
|
|
|
func (w *waitWithResponse) Trigger(id int64, x interface{}) {}
|
|
func (w *waitWithResponse) Trigger(id int64, x interface{}) {}
|
|
|
|
|
+
|
|
|
|
|
+func mustClusterStore(t *testing.T, membs []Member) ClusterStore {
|
|
|
|
|
+ c := Cluster{}
|
|
|
|
|
+ if err := c.AddSlice(membs); err != nil {
|
|
|
|
|
+ t.Fatalf("error creating cluster from %v: %v", membs, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ return NewClusterStore(&getAllStore{}, c)
|
|
|
|
|
+}
|