test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. var client = http.Client{
  13. Transport: &http.Transport{
  14. Dial: dialTimeoutFast,
  15. },
  16. }
  17. // Sending set commands
  18. func set(stop chan bool) {
  19. stopSet := false
  20. i := 0
  21. c := etcd.NewClient()
  22. for {
  23. key := fmt.Sprintf("%s_%v", "foo", i)
  24. result, err := c.Set(key, "bar", 0)
  25. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  26. select {
  27. case <-stop:
  28. stopSet = true
  29. default:
  30. }
  31. }
  32. select {
  33. case <-stop:
  34. stopSet = true
  35. default:
  36. }
  37. if stopSet {
  38. break
  39. }
  40. i++
  41. }
  42. fmt.Println("set stop")
  43. stop <- true
  44. }
  45. // Create a cluster of etcd nodes
  46. func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
  47. argGroup := make([][]string, size)
  48. for i := 0; i < size; i++ {
  49. if i == 0 {
  50. argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
  51. } else {
  52. strI := strconv.Itoa(i + 1)
  53. argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
  54. }
  55. }
  56. etcds := make([]*os.Process, size)
  57. for i, _ := range etcds {
  58. var err error
  59. etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-f"), procAttr)
  60. if err != nil {
  61. return nil, nil, err
  62. }
  63. // TODOBP: Change this sleep to wait until the master is up.
  64. // The problem is that if the master isn't up then the children
  65. // have to retry. This retry can take upwards of 15 seconds
  66. // which slows tests way down and some of them fail.
  67. if i == 0 {
  68. time.Sleep(time.Second)
  69. }
  70. }
  71. return argGroup, etcds, nil
  72. }
  73. // Destroy all the nodes in the cluster
  74. func destroyCluster(etcds []*os.Process) error {
  75. for i, etcd := range etcds {
  76. err := etcd.Kill()
  77. fmt.Println("kill ", i)
  78. if err != nil {
  79. panic(err.Error())
  80. }
  81. etcd.Release()
  82. }
  83. return nil
  84. }
  85. //
  86. func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
  87. leaderMap := make(map[int]string)
  88. baseAddrFormat := "http://0.0.0.0:400%d/leader"
  89. for {
  90. knownLeader := "unknown"
  91. dead := 0
  92. var i int
  93. for i = 0; i < size; i++ {
  94. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  95. if err == nil {
  96. leaderMap[i] = leader
  97. if knownLeader == "unknown" {
  98. knownLeader = leader
  99. } else {
  100. if leader != knownLeader {
  101. break
  102. }
  103. }
  104. } else {
  105. dead++
  106. if dead > allowDeadNum {
  107. break
  108. }
  109. }
  110. }
  111. if i == size {
  112. select {
  113. case <-leaderChan:
  114. leaderChan <- knownLeader
  115. default:
  116. leaderChan <- knownLeader
  117. }
  118. }
  119. time.Sleep(time.Millisecond * 10)
  120. }
  121. }
  122. func getLeader(addr string) (string, error) {
  123. resp, err := client.Get(addr)
  124. if err != nil {
  125. return "", err
  126. }
  127. if resp.StatusCode != http.StatusOK {
  128. resp.Body.Close()
  129. return "", fmt.Errorf("no leader")
  130. }
  131. b, err := ioutil.ReadAll(resp.Body)
  132. resp.Body.Close()
  133. if err != nil {
  134. return "", err
  135. }
  136. return string(b), nil
  137. }
  138. func directSet() {
  139. c := make(chan bool, 1000)
  140. for i := 0; i < 1000; i++ {
  141. go send(c)
  142. }
  143. for i := 0; i < 1000; i++ {
  144. <-c
  145. }
  146. }
  147. func send(c chan bool) {
  148. for i := 0; i < 10; i++ {
  149. command := &SetCommand{}
  150. command.Key = "foo"
  151. command.Value = "bar"
  152. command.ExpireTime = time.Unix(0, 0)
  153. raftServer.Do(command)
  154. }
  155. c <- true
  156. }
  157. // Dial with timeout
  158. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  159. return net.DialTimeout(network, addr, time.Millisecond*10)
  160. }