Browse Source

Merge pull request #1061 from jonboulle/server_interface

etcdserver: introduce Server interface
Jonathan Boulle 11 years ago
parent
commit
763c276d27
5 changed files with 56 additions and 34 deletions
  1. 6 3
      etcdserver/etcdhttp/http.go
  2. 33 11
      etcdserver/server.go
  3. 10 10
      etcdserver/server_test.go
  4. 4 6
      functional/http_functional_test.go
  5. 3 4
      main.go

+ 6 - 3
etcdserver/etcdhttp/http.go

@@ -38,7 +38,7 @@ var errClosed = errors.New("etcdhttp: client closed connection")
 // raft communication.
 type Handler struct {
 	Timeout time.Duration
-	Server  *etcdserver.Server
+	Server  etcdserver.Server
 	// TODO: dynamic configuration may make this outdated. take care of it.
 	// TODO: dynamic configuration may introduce race also.
 	Peers Peers
@@ -127,9 +127,12 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
 		log.Println("etcdhttp: error unmarshaling raft message:", err)
 	}
 	log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
-	if err := h.Server.Node.Step(ctx, m); err != nil {
-		log.Println("etcdhttp: error stepping raft messages:", err)
+	if err := h.Server.Process(ctx, m); err != nil {
+		log.Println("etcdhttp: error processing raft message:", err)
+		writeError(w, err)
+		return
 	}
+	w.WriteHeader(http.StatusOK)
 }
 
 // genID generates a random id that is: n < 0 < n.

+ 33 - 11
etcdserver/server.go

@@ -18,6 +18,7 @@ var (
 )
 
 type SendFunc func(m []raftpb.Message)
+type SaveFunc func(st raftpb.State, ents []raftpb.Entry)
 
 type Response struct {
 	Event   *store.Event
@@ -25,7 +26,24 @@ type Response struct {
 	err     error
 }
 
-type Server struct {
+type Server interface {
+	// Start performs any initialization of the Server necessary for it to
+	// begin serving requests. It must be called before Do or Process.
+	// Start must be non-blocking; any long-running server functionality
+	// should be implemented in goroutines.
+	Start()
+	// Stop terminates the Server and performs any necessary finalization.
+	// Do and Process cannot be called after Stop has been invoked.
+	Stop()
+	// Do takes a request and attempts to fulfil it, returning a Response.
+	Do(ctx context.Context, r pb.Request) (Response, error)
+	// Process takes a raft message and applies it to the server's raft state
+	// machine, respecting any timeout of the given context.
+	Process(ctx context.Context, m raftpb.Message) error
+}
+
+// EtcdServer is the production implementation of the Server interface
+type EtcdServer struct {
 	w    wait.Wait
 	done chan struct{}
 
@@ -34,27 +52,31 @@ type Server struct {
 
 	// Send specifies the send function for sending msgs to peers. Send
 	// MUST NOT block. It is okay to drop messages, since clients should
-	// timeout and reissue their messages.  If Send is nil, Server will
+	// timeout and reissue their messages.  If Send is nil, server will
 	// panic.
 	Send SendFunc
 
 	// Save specifies the save function for saving ents to stable storage.
 	// Save MUST block until st and ents are on stable storage.  If Send is
-	// nil, Server will panic.
+	// nil, server will panic.
 	Save func(st raftpb.State, ents []raftpb.Entry)
 
 	Ticker <-chan time.Time
 }
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
-// modify a Servers fields after it has been sent to Start.
-func Start(s *Server) {
+// modify a server's fields after it has been sent to Start.
+func (s *EtcdServer) Start() {
 	s.w = wait.New()
 	s.done = make(chan struct{})
 	go s.run()
 }
 
-func (s *Server) run() {
+func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
+	return s.Node.Step(ctx, m)
+}
+
+func (s *EtcdServer) run() {
 	for {
 		select {
 		case <-s.Ticker:
@@ -79,9 +101,9 @@ func (s *Server) run() {
 	}
 }
 
-// Stop stops the server, and shutsdown the running goroutine. Stop should be
-// called after a Start(s), otherwise it will panic.
-func (s *Server) Stop() {
+// Stop stops the server, and shuts down the running goroutine. Stop should be
+// called after a Start(s), otherwise it will block forever.
+func (s *EtcdServer) Stop() {
 	s.Node.Stop()
 	close(s.done)
 }
@@ -91,7 +113,7 @@ func (s *Server) Stop() {
 // Quorum == true, r will be sent through consensus before performing its
 // respective operation. Do will block until an action is performed or there is
 // an error.
-func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
+func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	if r.Id == 0 {
 		panic("r.Id cannot be 0")
 	}
@@ -137,7 +159,7 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
 }
 
 // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
-func (s *Server) apply(r pb.Request) Response {
+func (s *EtcdServer) apply(r pb.Request) Response {
 	f := func(ev *store.Event, err error) Response {
 		return Response{Event: ev, err: err}
 	}

+ 10 - 10
etcdserver/server_test.go

@@ -39,7 +39,7 @@ func TestDoLocalAction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		store := &storeRecorder{}
-		srv := &Server{Store: store}
+		srv := &EtcdServer{Store: store}
 		resp, err := srv.Do(context.TODO(), tt.req)
 
 		if err != tt.werr {
@@ -117,7 +117,7 @@ func TestApply(t *testing.T) {
 
 	for i, tt := range tests {
 		store := &storeRecorder{}
-		srv := &Server{Store: store}
+		srv := &EtcdServer{Store: store}
 		resp := srv.apply(tt.req)
 
 		if !reflect.DeepEqual(resp, tt.wresp) {
@@ -136,7 +136,7 @@ func testServer(t *testing.T, ns int64) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	ss := make([]*Server, ns)
+	ss := make([]*EtcdServer, ns)
 
 	send := func(msgs []raftpb.Message) {
 		for _, m := range msgs {
@@ -155,14 +155,14 @@ func testServer(t *testing.T, ns int64) {
 		n := raft.Start(id, peers, 10, 1)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
-		srv := &Server{
+		srv := &EtcdServer{
 			Node:   n,
 			Store:  store.New(),
 			Send:   send,
 			Save:   func(_ raftpb.State, _ []raftpb.Entry) {},
 			Ticker: tk.C,
 		}
-		Start(srv)
+		srv.Start()
 		// TODO(xiangli): randomize election timeout
 		// then remove this sleep.
 		time.Sleep(1 * time.Millisecond)
@@ -224,14 +224,14 @@ func TestDoProposal(t *testing.T) {
 		tk := make(chan time.Time)
 		// this makes <-tk always successful, which accelerates internal clock
 		close(tk)
-		srv := &Server{
+		srv := &EtcdServer{
 			Node:   n,
 			Store:  st,
 			Send:   func(_ []raftpb.Message) {},
 			Save:   func(_ raftpb.State, _ []raftpb.Entry) {},
 			Ticker: tk,
 		}
-		Start(srv)
+		srv.Start()
 		resp, err := srv.Do(ctx, tt)
 		srv.Stop()
 
@@ -254,7 +254,7 @@ func TestDoProposalCancelled(t *testing.T) {
 	n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
 	st := &storeRecorder{}
 	wait := &waitRecorder{}
-	srv := &Server{
+	srv := &EtcdServer{
 		// TODO: use fake node for better testability
 		Node:  n,
 		Store: st,
@@ -291,7 +291,7 @@ func TestDoProposalStopped(t *testing.T) {
 	tk := make(chan time.Time)
 	// this makes <-tk always successful, which accelarates internal clock
 	close(tk)
-	srv := &Server{
+	srv := &EtcdServer{
 		// TODO: use fake node for better testability
 		Node:   n,
 		Store:  st,
@@ -299,7 +299,7 @@ func TestDoProposalStopped(t *testing.T) {
 		Save:   func(_ raftpb.State, _ []raftpb.Entry) {},
 		Ticker: tk,
 	}
-	Start(srv)
+	srv.Start()
 
 	done := make(chan struct{})
 	var err error

+ 4 - 6
functional/http_functional_test.go

@@ -24,18 +24,16 @@ func TestSet(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	st := store.New()
-
 	n := raft.Start(1, []int64{1}, 0, 0)
 	n.Campaign(ctx)
 
-	srv := &etcdserver.Server{
+	srv := &etcdserver.EtcdServer{
+		Store: store.New(),
 		Node:  n,
-		Store: st,
-		Send:  etcdserver.SendFunc(nopSend),
 		Save:  func(st raftpb.State, ents []raftpb.Entry) {},
+		Send:  etcdserver.SendFunc(nopSend),
 	}
-	etcdserver.Start(srv)
+	srv.Start()
 	defer srv.Stop()
 
 	h := etcdhttp.Handler{

+ 3 - 4
main.go

@@ -75,15 +75,14 @@ func startEtcd() http.Handler {
 
 	n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal"))
 
-	tk := time.NewTicker(100 * time.Millisecond)
-	s := &etcdserver.Server{
+	s := &etcdserver.EtcdServer{
 		Store:  store.New(),
 		Node:   n,
 		Save:   w.Save,
 		Send:   etcdhttp.Sender(*peers),
-		Ticker: tk.C,
+		Ticker: time.Tick(100 * time.Millisecond),
 	}
-	etcdserver.Start(s)
+	s.Start()
 
 	h := etcdhttp.Handler{
 		Timeout: *timeout,