Browse Source

etcdserver: add publish func

Yicheng Qin 11 years ago
parent
commit
89077167c3
2 changed files with 110 additions and 0 deletions
  1. 35 0
      etcdserver/server.go
  2. 75 0
      etcdserver/server_test.go

+ 35 - 0
etcdserver/server.go

@@ -1,6 +1,7 @@
 package etcdserver
 package etcdserver
 
 
 import (
 import (
+	"encoding/json"
 	"errors"
 	"errors"
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
@@ -317,6 +318,40 @@ func (s *EtcdServer) sync(timeout time.Duration) {
 	}()
 	}()
 }
 }
 
 
+// publish registers server information into the cluster. The information
+// is the json format of the given member.
+// The function keeps attempting to register until it succeeds,
+// or its server is stopped.
+func (s *EtcdServer) publish(m Member, retryInterval time.Duration) {
+	b, err := json.Marshal(m)
+	if err != nil {
+		log.Printf("etcdserver: json marshal error: %v", err)
+		return
+	}
+	req := pb.Request{
+		Id:     GenID(),
+		Method: "PUT",
+		Path:   m.storeKey(),
+		Val:    string(b),
+	}
+
+	for {
+		ctx, cancel := context.WithTimeout(context.Background(), retryInterval)
+		_, err := s.Do(ctx, req)
+		cancel()
+		switch err {
+		case nil:
+			log.Printf("etcdserver: published %+v to the cluster", m)
+			return
+		case ErrStopped:
+			log.Printf("etcdserver: aborting publish because server is stopped")
+			return
+		default:
+			log.Printf("etcdserver: publish error: %v", err)
+		}
+	}
+}
+
 func getExpirationTime(r *pb.Request) time.Time {
 func getExpirationTime(r *pb.Request) time.Time {
 	var t time.Time
 	var t time.Time
 	if r.Expiration != 0 {
 	if r.Expiration != 0 {

+ 75 - 0
etcdserver/server_test.go

@@ -1,6 +1,7 @@
 package etcdserver
 package etcdserver
 
 
 import (
 import (
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"math/rand"
 	"math/rand"
 	"reflect"
 	"reflect"
@@ -826,6 +827,71 @@ func TestRemoveNode(t *testing.T) {
 
 
 // TODO: test wait trigger correctness in multi-server case
 // TODO: test wait trigger correctness in multi-server case
 
 
+func TestPublish(t *testing.T) {
+	n := &nodeProposeDataRecorder{}
+	ch := make(chan interface{}, 1)
+	// simulate that request has gone through consensus
+	ch <- Response{}
+	w := &waitWithResponse{ch: ch}
+	srv := &EtcdServer{
+		Node: n,
+		w:    w,
+	}
+	m := Member{ID: 1, Name: "node1"}
+	srv.publish(m, time.Hour)
+
+	data := n.data()
+	if len(data) != 1 {
+		t.Fatalf("len(proposeData) = %d, want 1", len(data))
+	}
+	var r pb.Request
+	if err := r.Unmarshal(data[0]); err != nil {
+		t.Fatalf("unmarshal request error: %v", err)
+	}
+	if r.Method != "PUT" {
+		t.Errorf("method = %s, want PUT", r.Method)
+	}
+	if r.Path != m.storeKey() {
+		t.Errorf("path = %s, want %s", r.Path, m.storeKey())
+	}
+	var gm Member
+	if err := json.Unmarshal([]byte(r.Val), &gm); err != nil {
+		t.Fatalf("unmarshal val error: %v", err)
+	}
+	if !reflect.DeepEqual(gm, m) {
+		t.Errorf("member = %v, want %v", gm, m)
+	}
+}
+
+// TestPublishStopped tests that publish will be stopped if server is stopped.
+func TestPublishStopped(t *testing.T) {
+	srv := &EtcdServer{
+		Node: &nodeRecorder{},
+		w:    &waitRecorder{},
+		done: make(chan struct{}),
+	}
+	srv.Stop()
+	srv.publish(Member{ID: 1, Name: "node1"}, time.Hour)
+}
+
+// TestPublishRetry tests that publish will keep retry until success.
+func TestPublishRetry(t *testing.T) {
+	n := &nodeRecorder{}
+	srv := &EtcdServer{
+		Node: n,
+		w:    &waitRecorder{},
+		done: make(chan struct{}),
+	}
+	time.AfterFunc(500*time.Microsecond, srv.Stop)
+	srv.publish(Member{ID: 1, Name: "node1"}, 10*time.Nanosecond)
+
+	action := n.Action()
+	// multiple Propose + Stop
+	if len(action) < 3 {
+		t.Errorf("len(action) = %d, want >= 3", action)
+	}
+}
+
 func TestGetBool(t *testing.T) {
 func TestGetBool(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		b    *bool
 		b    *bool
@@ -1122,3 +1188,12 @@ func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
 func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 	n.record(action{name: "ApplyConfChange:" + conf.Type.String()})
 	n.record(action{name: "ApplyConfChange:" + conf.Type.String()})
 }
 }
+
+type waitWithResponse struct {
+	ch <-chan interface{}
+}
+
+func (w *waitWithResponse) Register(id int64) <-chan interface{} {
+	return w.ch
+}
+func (w *waitWithResponse) Trigger(id int64, x interface{}) {}