interaction_env_handler_stabilize.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright 2019 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafttest
  15. import (
  16. "bufio"
  17. "fmt"
  18. "strings"
  19. "testing"
  20. "github.com/cockroachdb/datadriven"
  21. "go.etcd.io/etcd/raft/raftpb"
  22. )
  23. func (env *InteractionEnv) handleStabilize(t *testing.T, d datadriven.TestData) error {
  24. var idxs []int
  25. for _, id := range ints(t, d) {
  26. idxs = append(idxs, id-1)
  27. }
  28. return env.Stabilize(idxs...)
  29. }
  30. // Stabilize repeatedly runs Ready handling on and message delivery to the set
  31. // of nodes specified via the idxs slice until reaching a fixed point.
  32. func (env *InteractionEnv) Stabilize(idxs ...int) error {
  33. var nodes []Node
  34. for _, idx := range idxs {
  35. nodes = append(nodes, env.Nodes[idx])
  36. }
  37. if len(nodes) == 0 {
  38. nodes = env.Nodes
  39. }
  40. withIndent := func(f func()) {
  41. orig := env.Output.Builder
  42. env.Output.Builder = &strings.Builder{}
  43. f()
  44. scanner := bufio.NewScanner(strings.NewReader(env.Output.Builder.String()))
  45. for scanner.Scan() {
  46. orig.WriteString(" " + scanner.Text() + "\n")
  47. }
  48. env.Output.Builder = orig
  49. }
  50. for {
  51. done := true
  52. for _, rn := range nodes {
  53. if rn.HasReady() {
  54. done = false
  55. idx := int(rn.Status().ID - 1)
  56. fmt.Fprintf(env.Output, "> %d handling Ready\n", idx+1)
  57. withIndent(func() { env.ProcessReady(idx) })
  58. }
  59. }
  60. for _, rn := range nodes {
  61. id := rn.Status().ID
  62. // NB: we grab the messages just to see whether to print the header.
  63. // DeliverMsgs will do it again.
  64. if msgs, _ := splitMsgs(env.Messages, id); len(msgs) > 0 {
  65. fmt.Fprintf(env.Output, "> %d receiving messages\n", id)
  66. withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) })
  67. done = false
  68. }
  69. }
  70. if done {
  71. return nil
  72. }
  73. }
  74. }
  75. func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
  76. // NB: this method does not reorder messages.
  77. for _, msg := range msgs {
  78. if msg.To == to {
  79. toMsgs = append(toMsgs, msg)
  80. } else {
  81. rmdr = append(rmdr, msg)
  82. }
  83. }
  84. return toMsgs, rmdr
  85. }