Browse Source

Merge pull request #4033 from heyitsanthony/raftexample-tests

Raftexample tests
Anthony Romano 10 years ago
parent
commit
d639e4f7f6

+ 15 - 18
contrib/raftexample/kvstore.go

@@ -58,25 +58,22 @@ func (s *kvstore) Propose(k string, v string) {
 }
 }
 
 
 func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
 func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
-	for {
-		select {
-		case data := <-commitC:
-			if data == nil {
-				// done replaying log; new data incoming
-				return
-			}
-
-			var data_kv kv
-			dec := gob.NewDecoder(bytes.NewBufferString(*data))
-			if err := dec.Decode(&data_kv); err != nil {
-				log.Fatalf("raftexample: could not decode message (%v)", err)
-			}
-			s.mu.Lock()
-			s.kvStore[data_kv.Key] = data_kv.Val
-			s.mu.Unlock()
-		case err := <-errorC:
-			log.Println(err)
+	for data := range commitC {
+		if data == nil {
+			// done replaying log; new data incoming
 			return
 			return
 		}
 		}
+
+		var data_kv kv
+		dec := gob.NewDecoder(bytes.NewBufferString(*data))
+		if err := dec.Decode(&data_kv); err != nil {
+			log.Fatalf("raftexample: could not decode message (%v)", err)
+		}
+		s.mu.Lock()
+		s.kvStore[data_kv.Key] = data_kv.Val
+		s.mu.Unlock()
+	}
+	if err, ok := <-errorC; ok {
+		log.Fatal(err)
 	}
 	}
 }
 }

+ 59 - 0
contrib/raftexample/listener.go

@@ -0,0 +1,59 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"errors"
+	"net"
+	"time"
+)
+
+// stoppableListener sets TCP keep-alive timeouts on accepted
+// connections and waits on stopc message
+type stoppableListener struct {
+	*net.TCPListener
+	stopc <-chan struct{}
+}
+
+func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) {
+	ln, err := net.Listen("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+	return &stoppableListener{ln.(*net.TCPListener), stopc}, nil
+}
+
+func (ln stoppableListener) Accept() (c net.Conn, err error) {
+	connc := make(chan *net.TCPConn, 1)
+	errc := make(chan error, 1)
+	go func() {
+		tc, err := ln.AcceptTCP()
+		if err != nil {
+			errc <- err
+		} else {
+			connc <- tc
+		}
+	}()
+	select {
+	case <-ln.stopc:
+		return nil, errors.New("server stopped")
+	case err := <-errc:
+		return nil, err
+	case tc := <-connc:
+		tc.SetKeepAlive(true)
+		tc.SetKeepAlivePeriod(3 * time.Minute)
+		return tc, nil
+	}
+}

+ 62 - 23
contrib/raftexample/raft.go

@@ -49,13 +49,16 @@ type raftNode struct {
 	raftStorage *raft.MemoryStorage
 	raftStorage *raft.MemoryStorage
 	wal         *wal.WAL
 	wal         *wal.WAL
 	transport   *rafthttp.Transport
 	transport   *rafthttp.Transport
+	stopc       chan struct{} // signals proposal channel closed
+	httpstopc   chan struct{} // signals http server to shutdown
+	httpdonec   chan struct{} // signals http server shutdown complete
 }
 }
 
 
 // newRaftNode initiates a raft instance and returns a committed log entry
 // newRaftNode initiates a raft instance and returns a committed log entry
 // channel and error channel. Proposals for log updates are sent over the
 // channel and error channel. Proposals for log updates are sent over the
 // provided the proposal channel. All log entries are replayed over the
 // provided the proposal channel. All log entries are replayed over the
 // commit channel, followed by a nil message (to indicate the channel is
 // commit channel, followed by a nil message (to indicate the channel is
-// current), then new log entries.
+// current), then new log entries. To shutdown, close proposeC and read errorC.
 func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
 func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
 	rc := &raftNode{
 	rc := &raftNode{
 		proposeC:    proposeC,
 		proposeC:    proposeC,
@@ -65,22 +68,31 @@ func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string
 		peers:       peers,
 		peers:       peers,
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		raftStorage: raft.NewMemoryStorage(),
 		raftStorage: raft.NewMemoryStorage(),
+		stopc:       make(chan struct{}),
+		httpstopc:   make(chan struct{}),
+		httpdonec:   make(chan struct{}),
 		// rest of structure populated after WAL replay
 		// rest of structure populated after WAL replay
 	}
 	}
 	go rc.startRaft()
 	go rc.startRaft()
 	return rc.commitC, rc.errorC
 	return rc.commitC, rc.errorC
 }
 }
 
 
-// publishEntries writes committed log entries to commit channel.
-func (rc *raftNode) publishEntries(ents []raftpb.Entry) {
+// publishEntries writes committed log entries to commit channel and returns
+// whether all entries could be published.
+func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 	for i := range ents {
 	for i := range ents {
 		if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
 		if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
 			// ignore conf changes and empty messages
 			// ignore conf changes and empty messages
 			continue
 			continue
 		}
 		}
 		s := string(ents[i].Data)
 		s := string(ents[i].Data)
-		rc.commitC <- &s
+		select {
+		case rc.commitC <- &s:
+		case <-rc.stopc:
+			return false
+		}
 	}
 	}
+	return true
 }
 }
 
 
 // openWAL returns a WAL ready for reading.
 // openWAL returns a WAL ready for reading.
@@ -122,12 +134,9 @@ func (rc *raftNode) replayWAL() *wal.WAL {
 }
 }
 
 
 func (rc *raftNode) writeError(err error) {
 func (rc *raftNode) writeError(err error) {
-	rc.errorC <- err
-	rc.stop()
-}
-
-func (rc *raftNode) stop() {
+	rc.stopHTTP()
 	close(rc.commitC)
 	close(rc.commitC)
+	rc.errorC <- err
 	close(rc.errorC)
 	close(rc.errorC)
 	rc.node.Stop()
 	rc.node.Stop()
 }
 }
@@ -178,38 +187,60 @@ func (rc *raftNode) startRaft() {
 	go rc.serveChannels()
 	go rc.serveChannels()
 }
 }
 
 
+// stop closes http, closes all channels, and stops raft.
+func (rc *raftNode) stop() {
+	rc.stopHTTP()
+	close(rc.commitC)
+	close(rc.errorC)
+	rc.node.Stop()
+}
+
+func (rc *raftNode) stopHTTP() {
+	rc.transport.Stop()
+	close(rc.httpstopc)
+	<-rc.httpdonec
+}
+
 func (rc *raftNode) serveChannels() {
 func (rc *raftNode) serveChannels() {
 	defer rc.wal.Close()
 	defer rc.wal.Close()
 
 
 	ticker := time.NewTicker(100 * time.Millisecond)
 	ticker := time.NewTicker(100 * time.Millisecond)
 	defer ticker.Stop()
 	defer ticker.Stop()
 
 
-	// event loop on client proposals and raft updates
+	// send proposals over raft
+	go func() {
+		for prop := range rc.proposeC {
+			// blocks until accepted by raft state machine
+			rc.node.Propose(context.TODO(), []byte(prop))
+		}
+		// client closed channel; shutdown raft if not already
+		close(rc.stopc)
+	}()
+
+	// event loop on raft state machine updates
 	for {
 	for {
 		select {
 		select {
 		case <-ticker.C:
 		case <-ticker.C:
 			rc.node.Tick()
 			rc.node.Tick()
 
 
-		// send proposals over raft
-		case prop, ok := <-rc.proposeC:
-			if !ok {
-				// client closed channel; shut down
-				rc.stop()
-				return
-			}
-			rc.node.Propose(context.TODO(), []byte(prop))
-
 		// store raft entries to wal, then publish over commit channel
 		// store raft entries to wal, then publish over commit channel
 		case rd := <-rc.node.Ready():
 		case rd := <-rc.node.Ready():
 			rc.wal.Save(rd.HardState, rd.Entries)
 			rc.wal.Save(rd.HardState, rd.Entries)
 			rc.raftStorage.Append(rd.Entries)
 			rc.raftStorage.Append(rd.Entries)
 			rc.transport.Send(rd.Messages)
 			rc.transport.Send(rd.Messages)
-			rc.publishEntries(rd.Entries)
+			if ok := rc.publishEntries(rd.Entries); !ok {
+				rc.stop()
+				return
+			}
 			rc.node.Advance()
 			rc.node.Advance()
 
 
 		case err := <-rc.transport.ErrorC:
 		case err := <-rc.transport.ErrorC:
 			rc.writeError(err)
 			rc.writeError(err)
 			return
 			return
+
+		case <-rc.stopc:
+			rc.stop()
+			return
 		}
 		}
 	}
 	}
 }
 }
@@ -220,10 +251,18 @@ func (rc *raftNode) serveRaft() {
 		log.Fatalf("raftexample: Failed parsing URL (%v)", err)
 		log.Fatalf("raftexample: Failed parsing URL (%v)", err)
 	}
 	}
 
 
-	srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()}
-	if err := srv.ListenAndServe(); err != nil {
-		log.Fatalf("raftexample: Failed serving rafthttp (%v)", err)
+	ln, err := newStoppableListener(url.Host, rc.httpstopc)
+	if err != nil {
+		log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
+	}
+
+	err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
+	select {
+	case <-rc.httpstopc:
+	default:
+		log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
 	}
 	}
+	close(rc.httpdonec)
 }
 }
 
 
 func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
 func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {

+ 147 - 0
contrib/raftexample/raftexample_test.go

@@ -0,0 +1,147 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"os"
+	"testing"
+)
+
+type cluster struct {
+	peers    []string
+	commitC  []<-chan *string
+	errorC   []<-chan error
+	proposeC []chan string
+}
+
+// newCluster creates a cluster of n nodes
+func newCluster(n int) *cluster {
+	peers := make([]string, n)
+	for i := range peers {
+		peers[i] = fmt.Sprintf("http://127.0.0.1:%d", 10000+i)
+	}
+
+	clus := &cluster{
+		peers:    peers,
+		commitC:  make([]<-chan *string, len(peers)),
+		errorC:   make([]<-chan error, len(peers)),
+		proposeC: make([]chan string, len(peers))}
+
+	for i := range clus.peers {
+		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
+		clus.proposeC[i] = make(chan string, 1)
+		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, clus.proposeC[i])
+	}
+
+	return clus
+}
+
+// sinkReplay reads all commits in each node's local log.
+func (clus *cluster) sinkReplay() {
+	for i := range clus.peers {
+		for s := range clus.commitC[i] {
+			if s == nil {
+				break
+			}
+		}
+	}
+}
+
+// Close closes all cluster nodes and returns an error if any failed.
+func (clus *cluster) Close() (err error) {
+	for i := range clus.peers {
+		close(clus.proposeC[i])
+		for range clus.commitC[i] {
+			// drain pending commits
+		}
+		// wait for channel to close
+		if erri, _ := <-clus.errorC[i]; erri != nil {
+			err = erri
+		}
+		// clean intermediates
+		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
+	}
+	return err
+}
+
+func (clus *cluster) closeNoErrors(t *testing.T) {
+	if err := clus.Close(); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestProposeOnCommit starts three nodes and feeds commits back into the proposal
+// channel. The intent is to ensure blocking on a proposal won't block raft progress.
+func TestProposeOnCommit(t *testing.T) {
+	clus := newCluster(3)
+	defer clus.closeNoErrors(t)
+
+	clus.sinkReplay()
+
+	donec := make(chan struct{})
+	for i := range clus.peers {
+		// feedback for "n" committed entries, then update donec
+		go func(pC chan<- string, cC <-chan *string, eC <-chan error) {
+			for n := 0; n < 100; n++ {
+				s, ok := <-cC
+				if !ok {
+					pC = nil
+				}
+				select {
+				case pC <- *s:
+					continue
+				case err, _ := <-eC:
+					t.Fatalf("eC message (%v)", err)
+				}
+			}
+			donec <- struct{}{}
+		}(clus.proposeC[i], clus.commitC[i], clus.errorC[i])
+
+		// one message feedback per node
+		go func() { clus.proposeC[i] <- "foo" }()
+	}
+
+	for range clus.peers {
+		<-donec
+	}
+}
+
+// TestCloseBeforeReplay tests closing the producer before raft starts.
+func TestCloseProposerBeforeReplay(t *testing.T) {
+	clus := newCluster(1)
+	// close before replay so raft never starts
+	defer clus.closeNoErrors(t)
+}
+
+// TestCloseProposerInflight tests closing the producer while
+// committed messages are being published to the client.
+func TestCloseProposerInflight(t *testing.T) {
+	clus := newCluster(1)
+	defer clus.closeNoErrors(t)
+
+	clus.sinkReplay()
+
+	// some inflight ops
+	go func() {
+		clus.proposeC[0] <- "foo"
+		clus.proposeC[0] <- "bar"
+	}()
+
+	// wait for one message
+	if c, ok := <-clus.commitC[0]; *c != "foo" || !ok {
+		t.Fatalf("Commit failed")
+	}
+}