|
|
@@ -403,7 +403,6 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// TODO: test ErrIDRemoved
|
|
|
func TestApplyConfChangeError(t *testing.T) {
|
|
|
cl := newCluster("")
|
|
|
cl.SetStore(store.New())
|
|
|
@@ -511,29 +510,18 @@ func TestDoProposal(t *testing.T) {
|
|
|
pb.Request{Method: "DELETE", ID: 1},
|
|
|
pb.Request{Method: "GET", ID: 1, Quorum: true},
|
|
|
}
|
|
|
-
|
|
|
for i, tt := range tests {
|
|
|
- ctx, _ := context.WithCancel(context.Background())
|
|
|
- s := raft.NewMemoryStorage()
|
|
|
- n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
|
|
st := &storeRecorder{}
|
|
|
- tk := make(chan time.Time)
|
|
|
- // this makes <-tk always successful, which accelerates internal clock
|
|
|
- close(tk)
|
|
|
- cl := newCluster("abc")
|
|
|
- cl.SetStore(store.New())
|
|
|
srv := &EtcdServer{
|
|
|
- node: n,
|
|
|
- raftStorage: s,
|
|
|
+ node: newNodeCommitter(),
|
|
|
+ raftStorage: raft.NewMemoryStorage(),
|
|
|
store: st,
|
|
|
transport: &nopTransporter{},
|
|
|
storage: &storageRecorder{},
|
|
|
- Ticker: tk,
|
|
|
- Cluster: cl,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
srv.start()
|
|
|
- resp, err := srv.Do(ctx, tt)
|
|
|
+ resp, err := srv.Do(context.Background(), tt)
|
|
|
srv.Stop()
|
|
|
|
|
|
action := st.Action()
|
|
|
@@ -551,34 +539,16 @@ func TestDoProposal(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestDoProposalCancelled(t *testing.T) {
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
- // node cannot make any progress because there are two nodes
|
|
|
- s := raft.NewMemoryStorage()
|
|
|
- n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
|
|
|
- st := &storeRecorder{}
|
|
|
wait := &waitRecorder{}
|
|
|
srv := &EtcdServer{
|
|
|
- // TODO: use fake node for better testability
|
|
|
- node: n,
|
|
|
- raftStorage: s,
|
|
|
- store: st,
|
|
|
- w: wait,
|
|
|
- reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
+ node: &nodeRecorder{},
|
|
|
+ w: wait,
|
|
|
+ reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
-
|
|
|
- done := make(chan struct{})
|
|
|
- var err error
|
|
|
- go func() {
|
|
|
- _, err = srv.Do(ctx, pb.Request{Method: "PUT"})
|
|
|
- close(done)
|
|
|
- }()
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
cancel()
|
|
|
- <-done
|
|
|
+ _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
|
|
|
|
|
|
- gaction := st.Action()
|
|
|
- if len(gaction) != 0 {
|
|
|
- t.Errorf("len(action) = %v, want 0", len(gaction))
|
|
|
- }
|
|
|
if err != ErrCanceled {
|
|
|
t.Fatalf("err = %v, want %v", err, ErrCanceled)
|
|
|
}
|
|
|
@@ -589,88 +559,57 @@ func TestDoProposalCancelled(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestDoProposalTimeout(t *testing.T) {
|
|
|
- ctx, _ := context.WithTimeout(context.Background(), 0)
|
|
|
srv := &EtcdServer{
|
|
|
node: &nodeRecorder{},
|
|
|
w: &waitRecorder{},
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
- _, err := srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
|
|
+ ctx, _ := context.WithTimeout(context.Background(), 0)
|
|
|
+ _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
|
|
|
if err != ErrTimeout {
|
|
|
t.Fatalf("err = %v, want %v", err, ErrTimeout)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func TestDoProposalStopped(t *testing.T) {
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
- defer cancel()
|
|
|
- // node cannot make any progress because there are two nodes
|
|
|
- s := raft.NewMemoryStorage()
|
|
|
- n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
|
|
|
- st := &storeRecorder{}
|
|
|
- tk := make(chan time.Time)
|
|
|
- // this makes <-tk always successful, which accelarates internal clock
|
|
|
- close(tk)
|
|
|
- cl := newCluster("abc")
|
|
|
- cl.SetStore(store.New())
|
|
|
srv := &EtcdServer{
|
|
|
- // TODO: use fake node for better testability
|
|
|
- node: n,
|
|
|
- raftStorage: s,
|
|
|
- store: st,
|
|
|
- transport: &nopTransporter{},
|
|
|
- storage: &storageRecorder{},
|
|
|
- Ticker: tk,
|
|
|
- Cluster: cl,
|
|
|
- reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
- }
|
|
|
- srv.start()
|
|
|
-
|
|
|
- done := make(chan struct{})
|
|
|
- var err error
|
|
|
- go func() {
|
|
|
- _, err = srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
|
|
- close(done)
|
|
|
- }()
|
|
|
- srv.Stop()
|
|
|
- <-done
|
|
|
-
|
|
|
- action := st.Action()
|
|
|
- if len(action) != 0 {
|
|
|
- t.Errorf("len(action) = %v, want 0", len(action))
|
|
|
+ node: &nodeRecorder{},
|
|
|
+ w: &waitRecorder{},
|
|
|
+ reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
+ srv.done = make(chan struct{})
|
|
|
+ close(srv.done)
|
|
|
+ _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
|
|
|
if err != ErrStopped {
|
|
|
t.Errorf("err = %v, want %v", err, ErrStopped)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
|
|
|
+// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
|
|
|
func TestSync(t *testing.T) {
|
|
|
- n := &nodeProposeDataRecorder{}
|
|
|
+ n := &nodeRecorder{}
|
|
|
srv := &EtcdServer{
|
|
|
node: n,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
- done := make(chan struct{})
|
|
|
- go func() {
|
|
|
- srv.sync(10 * time.Second)
|
|
|
- close(done)
|
|
|
- }()
|
|
|
-
|
|
|
// check that sync is non-blocking
|
|
|
- select {
|
|
|
- case <-done:
|
|
|
- case <-time.After(time.Second):
|
|
|
+ timer := time.AfterFunc(time.Second, func() {
|
|
|
t.Fatalf("sync should be non-blocking but did not return after 1s!")
|
|
|
- }
|
|
|
-
|
|
|
+ })
|
|
|
+ srv.sync(10 * time.Second)
|
|
|
+ timer.Stop()
|
|
|
testutil.ForceGosched()
|
|
|
- data := n.data()
|
|
|
- if len(data) != 1 {
|
|
|
- t.Fatalf("len(proposeData) = %d, want 1", len(data))
|
|
|
+
|
|
|
+ action := n.Action()
|
|
|
+ if len(action) != 1 {
|
|
|
+ t.Fatalf("len(action) = %d, want 1", len(action))
|
|
|
+ }
|
|
|
+ if action[0].name != "Propose" {
|
|
|
+ t.Fatalf("action = %s, want Propose", action[0].name)
|
|
|
}
|
|
|
+ data := action[0].params[0].([]byte)
|
|
|
var r pb.Request
|
|
|
- if err := r.Unmarshal(data[0]); err != nil {
|
|
|
+ if err := r.Unmarshal(data); err != nil {
|
|
|
t.Fatalf("unmarshal request error: %v", err)
|
|
|
}
|
|
|
if r.Method != "SYNC" {
|
|
|
@@ -686,21 +625,14 @@ func TestSyncTimeout(t *testing.T) {
|
|
|
node: n,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
- done := make(chan struct{})
|
|
|
- go func() {
|
|
|
- srv.sync(0)
|
|
|
- close(done)
|
|
|
- }()
|
|
|
-
|
|
|
// check that sync is non-blocking
|
|
|
- select {
|
|
|
- case <-done:
|
|
|
- case <-time.After(time.Second):
|
|
|
+ timer := time.AfterFunc(time.Second, func() {
|
|
|
t.Fatalf("sync should be non-blocking but did not return after 1s!")
|
|
|
- }
|
|
|
+ })
|
|
|
+ srv.sync(0)
|
|
|
+ timer.Stop()
|
|
|
|
|
|
// give time for goroutine in sync to cancel
|
|
|
- // TODO: use fake clock
|
|
|
testutil.ForceGosched()
|
|
|
w := []action{action{name: "Propose blocked"}}
|
|
|
if g := n.Action(); !reflect.DeepEqual(g, w) {
|
|
|
@@ -710,24 +642,9 @@ func TestSyncTimeout(t *testing.T) {
|
|
|
|
|
|
// TODO: TestNoSyncWhenNoLeader
|
|
|
|
|
|
-// blockingNodeProposer implements the node interface to allow users to
|
|
|
-// block until Propose has been called and then verify the Proposed data
|
|
|
-type blockingNodeProposer struct {
|
|
|
- ch chan []byte
|
|
|
- readyNode
|
|
|
-}
|
|
|
-
|
|
|
-func (n *blockingNodeProposer) Propose(_ context.Context, data []byte) error {
|
|
|
- n.ch <- data
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
// TestSyncTrigger tests that the server proposes a SYNC request when its sync timer ticks
|
|
|
func TestSyncTrigger(t *testing.T) {
|
|
|
- n := &blockingNodeProposer{
|
|
|
- ch: make(chan []byte),
|
|
|
- readyNode: *newReadyNode(),
|
|
|
- }
|
|
|
+ n := newReadyNode()
|
|
|
st := make(chan time.Time, 1)
|
|
|
srv := &EtcdServer{
|
|
|
node: n,
|
|
|
@@ -739,6 +656,7 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
srv.start()
|
|
|
+ defer srv.Stop()
|
|
|
// trigger the server to become a leader and accept sync requests
|
|
|
n.readyc <- raft.Ready{
|
|
|
SoftState: &raft.SoftState{
|
|
|
@@ -747,13 +665,16 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
}
|
|
|
// trigger a sync request
|
|
|
st <- time.Time{}
|
|
|
- var data []byte
|
|
|
- select {
|
|
|
- case <-time.After(time.Second):
|
|
|
- t.Fatalf("did not receive proposed request as expected!")
|
|
|
- case data = <-n.ch:
|
|
|
+ testutil.ForceGosched()
|
|
|
+
|
|
|
+ action := n.Action()
|
|
|
+ if len(action) != 1 {
|
|
|
+ t.Fatalf("len(action) = %d, want 1", len(action))
|
|
|
}
|
|
|
- srv.Stop()
|
|
|
+ if action[0].name != "Propose" {
|
|
|
+ t.Fatalf("action = %s, want Propose", action[0].name)
|
|
|
+ }
|
|
|
+ data := action[0].params[0].([]byte)
|
|
|
var req pb.Request
|
|
|
if err := req.Unmarshal(data); err != nil {
|
|
|
t.Fatalf("error unmarshalling data: %v", err)
|
|
|
@@ -764,29 +685,17 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
// snapshot should snapshot the store and cut the persistent
|
|
|
-// TODO: node.Compact is called... we need to make the node an interface
|
|
|
func TestSnapshot(t *testing.T) {
|
|
|
s := raft.NewMemoryStorage()
|
|
|
- n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
|
|
- defer n.Stop()
|
|
|
-
|
|
|
- // Now we can have an election and persist the rest of the log.
|
|
|
- // This causes HardState.Commit to advance. HardState.Commit must
|
|
|
- // be > 0 to snapshot.
|
|
|
- n.Campaign(context.Background())
|
|
|
- rd := <-n.Ready()
|
|
|
- s.Append(rd.Entries)
|
|
|
- n.Advance()
|
|
|
-
|
|
|
+ s.Append([]raftpb.Entry{{Index: 1}})
|
|
|
st := &storeRecorder{}
|
|
|
p := &storageRecorder{}
|
|
|
srv := &EtcdServer{
|
|
|
+ node: &nodeRecorder{},
|
|
|
+ raftStorage: s,
|
|
|
store: st,
|
|
|
storage: p,
|
|
|
- node: n,
|
|
|
- raftStorage: s,
|
|
|
}
|
|
|
-
|
|
|
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
|
|
|
gaction := st.Action()
|
|
|
if len(gaction) != 1 {
|
|
|
@@ -795,7 +704,6 @@ func TestSnapshot(t *testing.T) {
|
|
|
if !reflect.DeepEqual(gaction[0], action{name: "Save"}) {
|
|
|
t.Errorf("action = %s, want Save", gaction[0])
|
|
|
}
|
|
|
-
|
|
|
gaction = p.Action()
|
|
|
if len(gaction) != 2 {
|
|
|
t.Fatalf("len(action) = %d, want 2", len(gaction))
|
|
|
@@ -810,39 +718,28 @@ func TestSnapshot(t *testing.T) {
|
|
|
|
|
|
// Applied > SnapCount should trigger a SaveSnap event
|
|
|
func TestTriggerSnap(t *testing.T) {
|
|
|
- ctx := context.Background()
|
|
|
- s := raft.NewMemoryStorage()
|
|
|
- n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
|
|
- n.Campaign(ctx)
|
|
|
+ snapc := 10
|
|
|
st := &storeRecorder{}
|
|
|
p := &storageRecorder{}
|
|
|
- cl := newCluster("abc")
|
|
|
- cl.SetStore(store.New())
|
|
|
srv := &EtcdServer{
|
|
|
+ node: newNodeCommitter(),
|
|
|
+ raftStorage: raft.NewMemoryStorage(),
|
|
|
store: st,
|
|
|
transport: &nopTransporter{},
|
|
|
storage: p,
|
|
|
- node: n,
|
|
|
- raftStorage: s,
|
|
|
- snapCount: 10,
|
|
|
- Cluster: cl,
|
|
|
+ snapCount: uint64(snapc),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
|
-
|
|
|
srv.start()
|
|
|
- // wait for saving nop
|
|
|
- time.Sleep(time.Millisecond)
|
|
|
- for i := 0; uint64(i) < srv.snapCount-1; i++ {
|
|
|
- srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
|
|
+ for i := 0; i < snapc+1; i++ {
|
|
|
+ srv.Do(context.Background(), pb.Request{Method: "PUT"})
|
|
|
}
|
|
|
- // wait for saving the last entry
|
|
|
- time.Sleep(time.Millisecond)
|
|
|
srv.Stop()
|
|
|
|
|
|
gaction := p.Action()
|
|
|
// each operation is recorded as a Save
|
|
|
- // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
|
|
|
- wcnt := 2 + int(srv.snapCount)
|
|
|
+ // (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + Cut + SaveSnap
|
|
|
+ wcnt := 3 + snapc
|
|
|
if len(gaction) != wcnt {
|
|
|
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
|
|
}
|
|
|
@@ -1066,10 +963,8 @@ func TestUpdateMember(t *testing.T) {
|
|
|
|
|
|
// TODO: test server could stop itself when being removed
|
|
|
|
|
|
-// TODO: test wait trigger correctness in multi-server case
|
|
|
-
|
|
|
func TestPublish(t *testing.T) {
|
|
|
- n := &nodeProposeDataRecorder{}
|
|
|
+ n := &nodeRecorder{}
|
|
|
ch := make(chan interface{}, 1)
|
|
|
// simulate that request has gone through consensus
|
|
|
ch <- Response{}
|
|
|
@@ -1084,12 +979,16 @@ func TestPublish(t *testing.T) {
|
|
|
}
|
|
|
srv.publish(time.Hour)
|
|
|
|
|
|
- data := n.data()
|
|
|
- if len(data) != 1 {
|
|
|
- t.Fatalf("len(proposeData) = %d, want 1", len(data))
|
|
|
+ action := n.Action()
|
|
|
+ if len(action) != 1 {
|
|
|
+ t.Fatalf("len(action) = %d, want 1", len(action))
|
|
|
}
|
|
|
+ if action[0].name != "Propose" {
|
|
|
+ t.Fatalf("action = %s, want Propose", action[0].name)
|
|
|
+ }
|
|
|
+ data := action[0].params[0].([]byte)
|
|
|
var r pb.Request
|
|
|
- if err := r.Unmarshal(data[0]); err != nil {
|
|
|
+ if err := r.Unmarshal(data); err != nil {
|
|
|
t.Fatalf("unmarshal request error: %v", err)
|
|
|
}
|
|
|
if r.Method != "PUT" {
|
|
|
@@ -1230,6 +1129,10 @@ func TestGetBool(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func boolp(b bool) *bool { return &b }
|
|
|
+
|
|
|
+func stringp(s string) *string { return &s }
|
|
|
+
|
|
|
type action struct {
|
|
|
name string
|
|
|
params []interface{}
|
|
|
@@ -1361,9 +1264,14 @@ func (w *waitRecorder) Trigger(id uint64, x interface{}) {
|
|
|
w.action = append(w.action, action{name: "Trigger"})
|
|
|
}
|
|
|
|
|
|
-func boolp(b bool) *bool { return &b }
|
|
|
+type waitWithResponse struct {
|
|
|
+ ch <-chan interface{}
|
|
|
+}
|
|
|
|
|
|
-func stringp(s string) *string { return &s }
|
|
|
+func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
|
|
|
+ return w.ch
|
|
|
+}
|
|
|
+func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
|
|
|
|
|
type storageRecorder struct {
|
|
|
recorder
|
|
|
@@ -1385,39 +1293,17 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
|
|
}
|
|
|
func (p *storageRecorder) Close() error { return nil }
|
|
|
|
|
|
-type readyNode struct {
|
|
|
- readyc chan raft.Ready
|
|
|
-}
|
|
|
-
|
|
|
-func newReadyNode() *readyNode {
|
|
|
- readyc := make(chan raft.Ready, 1)
|
|
|
- return &readyNode{readyc: readyc}
|
|
|
-}
|
|
|
-func (n *readyNode) Tick() {}
|
|
|
-func (n *readyNode) Campaign(ctx context.Context) error { return nil }
|
|
|
-func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
|
|
|
-func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
|
|
|
- return nil
|
|
|
-}
|
|
|
-func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
|
|
|
-func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
|
|
-func (n *readyNode) Advance() {}
|
|
|
-func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { return nil }
|
|
|
-func (n *readyNode) Stop() {}
|
|
|
-func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte) {}
|
|
|
-
|
|
|
type nodeRecorder struct {
|
|
|
recorder
|
|
|
}
|
|
|
|
|
|
func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) }
|
|
|
-
|
|
|
func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
|
|
n.record(action{name: "Campaign"})
|
|
|
return nil
|
|
|
}
|
|
|
func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
- n.record(action{name: "Propose"})
|
|
|
+ n.record(action{name: "Propose", params: []interface{}{data}})
|
|
|
return nil
|
|
|
}
|
|
|
func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
|
|
|
@@ -1441,26 +1327,6 @@ func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
|
|
|
n.record(action{name: "Compact"})
|
|
|
}
|
|
|
|
|
|
-type nodeProposeDataRecorder struct {
|
|
|
- nodeRecorder
|
|
|
- sync.Mutex
|
|
|
- d [][]byte
|
|
|
-}
|
|
|
-
|
|
|
-func (n *nodeProposeDataRecorder) data() [][]byte {
|
|
|
- n.Lock()
|
|
|
- d := n.d
|
|
|
- n.Unlock()
|
|
|
- return d
|
|
|
-}
|
|
|
-func (n *nodeProposeDataRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
- n.nodeRecorder.Propose(ctx, data)
|
|
|
- n.Lock()
|
|
|
- n.d = append(n.d, data)
|
|
|
- n.Unlock()
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
type nodeProposalBlockerRecorder struct {
|
|
|
nodeRecorder
|
|
|
}
|
|
|
@@ -1499,14 +1365,40 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange
|
|
|
return &raftpb.ConfState{}
|
|
|
}
|
|
|
|
|
|
-type waitWithResponse struct {
|
|
|
- ch <-chan interface{}
|
|
|
+// nodeCommitter commits proposed data immediately.
|
|
|
+type nodeCommitter struct {
|
|
|
+ nodeRecorder
|
|
|
+ readyc chan raft.Ready
|
|
|
+ index uint64
|
|
|
}
|
|
|
|
|
|
-func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
|
|
|
- return w.ch
|
|
|
+func newNodeCommitter() *nodeCommitter {
|
|
|
+ readyc := make(chan raft.Ready, 1)
|
|
|
+ return &nodeCommitter{readyc: readyc}
|
|
|
}
|
|
|
-func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
|
|
+func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
|
|
|
+ n.index++
|
|
|
+ ents := []raftpb.Entry{{Index: n.index, Data: data}}
|
|
|
+ n.readyc <- raft.Ready{
|
|
|
+ Entries: ents,
|
|
|
+ CommittedEntries: ents,
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (n *nodeCommitter) Ready() <-chan raft.Ready {
|
|
|
+ return n.readyc
|
|
|
+}
|
|
|
+
|
|
|
+type readyNode struct {
|
|
|
+ nodeRecorder
|
|
|
+ readyc chan raft.Ready
|
|
|
+}
|
|
|
+
|
|
|
+func newReadyNode() *readyNode {
|
|
|
+ readyc := make(chan raft.Ready, 1)
|
|
|
+ return &readyNode{readyc: readyc}
|
|
|
+}
|
|
|
+func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
|
|
|
|
|
type nopTransporter struct{}
|
|
|
|