util.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package test
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. var client = http.Client{
  13. Transport: &http.Transport{
  14. Dial: dialTimeoutFast,
  15. },
  16. }
  17. // Sending set commands
  18. func Set(stop chan bool) {
  19. stopSet := false
  20. i := 0
  21. c := etcd.NewClient(nil)
  22. for {
  23. key := fmt.Sprintf("%s_%v", "foo", i)
  24. result, err := c.Set(key, "bar", 0)
  25. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  26. select {
  27. case <-stop:
  28. stopSet = true
  29. default:
  30. }
  31. }
  32. select {
  33. case <-stop:
  34. stopSet = true
  35. default:
  36. }
  37. if stopSet {
  38. break
  39. }
  40. i++
  41. }
  42. stop <- true
  43. }
  44. // Create a cluster of etcd nodes
  45. func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
  46. argGroup := make([][]string, size)
  47. sslServer1 := []string{"-serverCAFile=../../fixtures/ca/ca.crt",
  48. "-serverCert=../../fixtures/ca/server.crt",
  49. "-serverKey=../../fixtures/ca/server.key.insecure",
  50. }
  51. sslServer2 := []string{"-serverCAFile=../../fixtures/ca/ca.crt",
  52. "-serverCert=../../fixtures/ca/server2.crt",
  53. "-serverKey=../../fixtures/ca/server2.key.insecure",
  54. }
  55. for i := 0; i < size; i++ {
  56. if i == 0 {
  57. argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
  58. if ssl {
  59. argGroup[i] = append(argGroup[i], sslServer1...)
  60. }
  61. } else {
  62. strI := strconv.Itoa(i + 1)
  63. argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
  64. if ssl {
  65. argGroup[i] = append(argGroup[i], sslServer2...)
  66. }
  67. }
  68. }
  69. etcds := make([]*os.Process, size)
  70. for i, _ := range etcds {
  71. var err error
  72. etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-f"), procAttr)
  73. if err != nil {
  74. return nil, nil, err
  75. }
  76. // TODOBP: Change this sleep to wait until the master is up.
  77. // The problem is that if the master isn't up then the children
  78. // have to retry. This retry can take upwards of 15 seconds
  79. // which slows tests way down and some of them fail.
  80. if i == 0 {
  81. time.Sleep(time.Second * 2)
  82. }
  83. }
  84. return argGroup, etcds, nil
  85. }
  86. // Destroy all the nodes in the cluster
  87. func DestroyCluster(etcds []*os.Process) error {
  88. for _, etcd := range etcds {
  89. err := etcd.Kill()
  90. if err != nil {
  91. panic(err.Error())
  92. }
  93. etcd.Release()
  94. }
  95. return nil
  96. }
  97. //
  98. func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
  99. leaderMap := make(map[int]string)
  100. baseAddrFormat := "http://0.0.0.0:400%d"
  101. for {
  102. knownLeader := "unknown"
  103. dead := 0
  104. var i int
  105. for i = 0; i < size; i++ {
  106. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  107. if err == nil {
  108. leaderMap[i] = leader
  109. if knownLeader == "unknown" {
  110. knownLeader = leader
  111. } else {
  112. if leader != knownLeader {
  113. break
  114. }
  115. }
  116. } else {
  117. dead++
  118. if dead > allowDeadNum {
  119. break
  120. }
  121. }
  122. }
  123. if i == size {
  124. select {
  125. case <-stop:
  126. return
  127. case <-leaderChan:
  128. leaderChan <- knownLeader
  129. default:
  130. leaderChan <- knownLeader
  131. }
  132. }
  133. if dead == 0 {
  134. select {
  135. case <-all:
  136. all <- true
  137. default:
  138. all <- true
  139. }
  140. }
  141. time.Sleep(time.Millisecond * 10)
  142. }
  143. }
  144. func getLeader(addr string) (string, error) {
  145. resp, err := client.Get(addr + "/v1/leader")
  146. if err != nil {
  147. return "", err
  148. }
  149. if resp.StatusCode != http.StatusOK {
  150. resp.Body.Close()
  151. return "", fmt.Errorf("no leader")
  152. }
  153. b, err := ioutil.ReadAll(resp.Body)
  154. resp.Body.Close()
  155. if err != nil {
  156. return "", err
  157. }
  158. return string(b), nil
  159. }
  160. // Dial with timeout
  161. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  162. return net.DialTimeout(network, addr, time.Millisecond*10)
  163. }