Browse Source

contrib/raftexample: follow pipeline guidelines closer

close raft commit channel before issuing raft error since it's done
sending
chz 10 years ago
parent
commit
b73a11ff45

+ 15 - 18
contrib/raftexample/kvstore.go

@@ -58,25 +58,22 @@ func (s *kvstore) Propose(k string, v string) {
 }
 
 func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
-	for {
-		select {
-		case data := <-commitC:
-			if data == nil {
-				// done replaying log; new data incoming
-				return
-			}
-
-			var data_kv kv
-			dec := gob.NewDecoder(bytes.NewBufferString(*data))
-			if err := dec.Decode(&data_kv); err != nil {
-				log.Fatalf("raftexample: could not decode message (%v)", err)
-			}
-			s.mu.Lock()
-			s.kvStore[data_kv.Key] = data_kv.Val
-			s.mu.Unlock()
-		case err := <-errorC:
-			log.Println(err)
+	for data := range commitC {
+		if data == nil {
+			// done replaying log; new data incoming
 			return
 		}
+
+		var data_kv kv
+		dec := gob.NewDecoder(bytes.NewBufferString(*data))
+		if err := dec.Decode(&data_kv); err != nil {
+			log.Fatalf("raftexample: could not decode message (%v)", err)
+		}
+		s.mu.Lock()
+		s.kvStore[data_kv.Key] = data_kv.Val
+		s.mu.Unlock()
+	}
+	if err, ok := <-errorC; ok {
+		log.Fatal(err)
 	}
 }

+ 4 - 6
contrib/raftexample/raft.go

@@ -122,12 +122,8 @@ func (rc *raftNode) replayWAL() *wal.WAL {
 }
 
 func (rc *raftNode) writeError(err error) {
-	rc.errorC <- err
-	rc.stop()
-}
-
-func (rc *raftNode) stop() {
 	close(rc.commitC)
+	rc.errorC <- err
 	close(rc.errorC)
 	rc.node.Stop()
 }
@@ -214,7 +210,9 @@ func (rc *raftNode) serveChannels() {
 			return
 
 		case <-stopc:
-			rc.stop()
+			close(rc.commitC)
+			close(rc.errorC)
+			rc.node.Stop()
 			return
 		}
 	}

+ 7 - 3
contrib/raftexample/raftexample_test.go

@@ -87,11 +87,15 @@ func TestProposeOnCommit(t *testing.T) {
 		// feedback for "n" committed entries, then update donec
 		go func(pC chan<- string, cC <-chan *string, eC <-chan error) {
 			for n := 0; n < 100; n++ {
+				s, ok := <-cC
+				if !ok {
+					pC = nil
+				}
 				select {
-				case s := <-cC:
-					pC <- *s
+				case pC <- *s:
+					continue
 				case err, _ := <-eC:
-					t.Fatalf("eC closed (%v)", err)
+					t.Fatalf("eC message (%v)", err)
 				}
 			}
 			donec <- struct{}{}