Browse Source

etcdserver: publish self info when start

Yicheng Qin 11 years ago
parent
commit
a40a270e19
4 changed files with 28 additions and 15 deletions
  1. 3 3
      etcdserver/etcdhttp/http_test.go
  2. 13 2
      etcdserver/server.go
  3. 9 9
      etcdserver/server_test.go
  4. 3 1
      main.go

+ 3 - 3
etcdserver/etcdhttp/http_test.go

@@ -712,8 +712,8 @@ func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver
 func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
 func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
 	return fs.err
 	return fs.err
 }
 }
-func (fs *errServer) Start() {}
-func (fs *errServer) Stop()  {}
+func (fs *errServer) Start(m etcdserver.Member) {}
+func (fs *errServer) Stop()                     {}
 
 
 // errReader implements io.Reader to facilitate a broken request.
 // errReader implements io.Reader to facilitate a broken request.
 type errReader struct{}
 type errReader struct{}
@@ -838,7 +838,7 @@ func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.R
 	return rs.res, nil
 	return rs.res, nil
 }
 }
 func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
 func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
-func (rs *resServer) Start()                                            {}
+func (rs *resServer) Start(m etcdserver.Member)                         {}
 func (rs *resServer) Stop()                                             {}
 func (rs *resServer) Stop()                                             {}
 
 
 func mustMarshalEvent(t *testing.T, ev *store.Event) string {
 func mustMarshalEvent(t *testing.T, ev *store.Event) string {

+ 13 - 2
etcdserver/server.go

@@ -19,6 +19,8 @@ import (
 const (
 const (
 	defaultSyncTimeout = time.Second
 	defaultSyncTimeout = time.Second
 	DefaultSnapCount   = 10000
 	DefaultSnapCount   = 10000
+	// TODO: calculated based on heartbeat interval
+	defaultPublishRetryInterval = 5 * time.Second
 )
 )
 
 
 var (
 var (
@@ -57,7 +59,7 @@ type Server interface {
 	// begin serving requests. It must be called before Do or Process.
 	// begin serving requests. It must be called before Do or Process.
 	// Start must be non-blocking; any long-running server functionality
 	// Start must be non-blocking; any long-running server functionality
 	// should be implemented in goroutines.
 	// should be implemented in goroutines.
-	Start()
+	Start(m Member)
 	// Stop terminates the Server and performs any necessary finalization.
 	// Stop terminates the Server and performs any necessary finalization.
 	// Do and Process cannot be called after Stop has been invoked.
 	// Do and Process cannot be called after Stop has been invoked.
 	Stop()
 	Stop()
@@ -102,7 +104,16 @@ type EtcdServer struct {
 
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // modify a server's fields after it has been sent to Start.
 // modify a server's fields after it has been sent to Start.
-func (s *EtcdServer) Start() {
+// It also starts a goroutine to publish its server information.
+func (s *EtcdServer) Start(m Member) {
+	s.start()
+	go s.publish(m, defaultPublishRetryInterval)
+}
+
+// start prepares and starts server in a new goroutine. It is no longer safe to
+// modify a server's fields after it has been sent to Start.
+// This function is just used for testing.
+func (s *EtcdServer) start() {
 	if s.SnapCount == 0 {
 	if s.SnapCount == 0 {
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
 		s.SnapCount = DefaultSnapCount
 		s.SnapCount = DefaultSnapCount

+ 9 - 9
etcdserver/server_test.go

@@ -400,7 +400,7 @@ func testServer(t *testing.T, ns int64) {
 			Storage: &storageRecorder{},
 			Storage: &storageRecorder{},
 			Ticker:  tk.C,
 			Ticker:  tk.C,
 		}
 		}
-		srv.Start()
+		srv.start()
 		// TODO(xiangli): randomize election timeout
 		// TODO(xiangli): randomize election timeout
 		// then remove this sleep.
 		// then remove this sleep.
 		time.Sleep(1 * time.Millisecond)
 		time.Sleep(1 * time.Millisecond)
@@ -469,7 +469,7 @@ func TestDoProposal(t *testing.T) {
 			Storage: &storageRecorder{},
 			Storage: &storageRecorder{},
 			Ticker:  tk,
 			Ticker:  tk,
 		}
 		}
-		srv.Start()
+		srv.start()
 		resp, err := srv.Do(ctx, tt)
 		resp, err := srv.Do(ctx, tt)
 		srv.Stop()
 		srv.Stop()
 
 
@@ -539,7 +539,7 @@ func TestDoProposalStopped(t *testing.T) {
 		Storage: &storageRecorder{},
 		Storage: &storageRecorder{},
 		Ticker:  tk,
 		Ticker:  tk,
 	}
 	}
-	srv.Start()
+	srv.start()
 
 
 	done := make(chan struct{})
 	done := make(chan struct{})
 	var err error
 	var err error
@@ -639,7 +639,7 @@ func TestSyncTrigger(t *testing.T) {
 		Storage:    &storageRecorder{},
 		Storage:    &storageRecorder{},
 		SyncTicker: st,
 		SyncTicker: st,
 	}
 	}
-	srv.Start()
+	srv.start()
 	// trigger the server to become a leader and accept sync requests
 	// trigger the server to become a leader and accept sync requests
 	n.readyc <- raft.Ready{
 	n.readyc <- raft.Ready{
 		SoftState: &raft.SoftState{
 		SoftState: &raft.SoftState{
@@ -710,7 +710,7 @@ func TestTriggerSnap(t *testing.T) {
 		SnapCount: 10,
 		SnapCount: 10,
 	}
 	}
 
 
-	s.Start()
+	s.start()
 	for i := 0; int64(i) < s.SnapCount; i++ {
 	for i := 0; int64(i) < s.SnapCount; i++ {
 		s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
 		s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
 	}
 	}
@@ -741,7 +741,7 @@ func TestRecvSnapshot(t *testing.T) {
 		Node:    n,
 		Node:    n,
 	}
 	}
 
 
-	s.Start()
+	s.start()
 	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
 	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
 	// make goroutines move forward to receive snapshot
 	// make goroutines move forward to receive snapshot
 	pkg.ForceGosched()
 	pkg.ForceGosched()
@@ -769,7 +769,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 		Node:    n,
 		Node:    n,
 	}
 	}
 
 
-	s.Start()
+	s.start()
 	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
 	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Index: 1}}
 	// make goroutines move forward to receive snapshot
 	// make goroutines move forward to receive snapshot
 	pkg.ForceGosched()
 	pkg.ForceGosched()
@@ -794,7 +794,7 @@ func TestAddNode(t *testing.T) {
 		Send:    func(_ []raftpb.Message) {},
 		Send:    func(_ []raftpb.Message) {},
 		Storage: &storageRecorder{},
 		Storage: &storageRecorder{},
 	}
 	}
-	s.Start()
+	s.start()
 	s.AddNode(context.TODO(), 1, []byte("foo"))
 	s.AddNode(context.TODO(), 1, []byte("foo"))
 	gaction := n.Action()
 	gaction := n.Action()
 	s.Stop()
 	s.Stop()
@@ -814,7 +814,7 @@ func TestRemoveNode(t *testing.T) {
 		Send:    func(_ []raftpb.Message) {},
 		Send:    func(_ []raftpb.Message) {},
 		Storage: &storageRecorder{},
 		Storage: &storageRecorder{},
 	}
 	}
-	s.Start()
+	s.start()
 	s.RemoveNode(context.TODO(), 1)
 	s.RemoveNode(context.TODO(), 1)
 	gaction := n.Action()
 	gaction := n.Action()
 	s.Stop()
 	s.Stop()

+ 3 - 1
main.go

@@ -205,7 +205,9 @@ func startEtcd() {
 		SnapCount:    *snapCount,
 		SnapCount:    *snapCount,
 		ClusterStore: cls,
 		ClusterStore: cls,
 	}
 	}
-	s.Start()
+	member := *self
+	member.ClientURLs = *addrs
+	s.Start(member)
 
 
 	ch := &pkg.CORSHandler{
 	ch := &pkg.CORSHandler{
 		Handler: etcdhttp.NewClientHandler(s, cls, *timeout),
 		Handler: etcdhttp.NewClientHandler(s, cls, *timeout),