test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. var client = http.Client{
  13. Transport: &http.Transport{
  14. Dial: dialTimeoutFast,
  15. },
  16. }
  17. // Sending set commands
  18. func set(stop chan bool) {
  19. stopSet := false
  20. i := 0
  21. c := etcd.NewClient()
  22. for {
  23. key := fmt.Sprintf("%s_%v", "foo", i)
  24. result, err := c.Set(key, "bar", 0)
  25. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  26. select {
  27. case <-stop:
  28. stopSet = true
  29. default:
  30. }
  31. }
  32. select {
  33. case <-stop:
  34. stopSet = true
  35. default:
  36. }
  37. if stopSet {
  38. break
  39. }
  40. i++
  41. }
  42. fmt.Println("set stop")
  43. stop <- true
  44. }
  45. // Create a cluster of etcd nodes
  46. func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
  47. argGroup := make([][]string, size)
  48. sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
  49. "-serverCert=./fixtures/ca/server.crt",
  50. "-serverKey=./fixtures/ca/server.key.insecure",
  51. }
  52. sslServer2 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
  53. "-serverCert=./fixtures/ca/server2.crt",
  54. "-serverKey=./fixtures/ca/server2.key.insecure",
  55. }
  56. for i := 0; i < size; i++ {
  57. if i == 0 {
  58. argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
  59. if ssl {
  60. argGroup[i] = append(argGroup[i], sslServer1...)
  61. }
  62. } else {
  63. strI := strconv.Itoa(i + 1)
  64. argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
  65. if ssl {
  66. argGroup[i] = append(argGroup[i], sslServer2...)
  67. }
  68. }
  69. }
  70. etcds := make([]*os.Process, size)
  71. for i, _ := range etcds {
  72. var err error
  73. etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-f"), procAttr)
  74. if err != nil {
  75. return nil, nil, err
  76. }
  77. // TODOBP: Change this sleep to wait until the master is up.
  78. // The problem is that if the master isn't up then the children
  79. // have to retry. This retry can take upwards of 15 seconds
  80. // which slows tests way down and some of them fail.
  81. if i == 0 {
  82. time.Sleep(time.Second)
  83. }
  84. }
  85. return argGroup, etcds, nil
  86. }
  87. // Destroy all the nodes in the cluster
  88. func destroyCluster(etcds []*os.Process) error {
  89. for i, etcd := range etcds {
  90. err := etcd.Kill()
  91. fmt.Println("kill ", i)
  92. if err != nil {
  93. panic(err.Error())
  94. }
  95. etcd.Release()
  96. }
  97. return nil
  98. }
  99. //
  100. func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
  101. leaderMap := make(map[int]string)
  102. baseAddrFormat := "http://0.0.0.0:400%d"
  103. for {
  104. knownLeader := "unknown"
  105. dead := 0
  106. var i int
  107. for i = 0; i < size; i++ {
  108. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  109. if err == nil {
  110. leaderMap[i] = leader
  111. if knownLeader == "unknown" {
  112. knownLeader = leader
  113. } else {
  114. if leader != knownLeader {
  115. break
  116. }
  117. }
  118. } else {
  119. dead++
  120. if dead > allowDeadNum {
  121. break
  122. }
  123. }
  124. }
  125. if i == size {
  126. select {
  127. case <-leaderChan:
  128. leaderChan <- knownLeader
  129. default:
  130. leaderChan <- knownLeader
  131. }
  132. }
  133. time.Sleep(time.Millisecond * 10)
  134. }
  135. }
  136. func getLeader(addr string) (string, error) {
  137. resp, err := client.Get(addr + "/leader")
  138. if err != nil {
  139. return "", err
  140. }
  141. if resp.StatusCode != http.StatusOK {
  142. resp.Body.Close()
  143. return "", fmt.Errorf("no leader")
  144. }
  145. b, err := ioutil.ReadAll(resp.Body)
  146. resp.Body.Close()
  147. if err != nil {
  148. return "", err
  149. }
  150. return string(b), nil
  151. }
  152. func directSet() {
  153. c := make(chan bool, 1000)
  154. for i := 0; i < 1000; i++ {
  155. go send(c)
  156. }
  157. for i := 0; i < 1000; i++ {
  158. <-c
  159. }
  160. }
  161. func send(c chan bool) {
  162. for i := 0; i < 10; i++ {
  163. command := &SetCommand{}
  164. command.Key = "foo"
  165. command.Value = "bar"
  166. command.ExpireTime = time.Unix(0, 0)
  167. raftServer.Do(command)
  168. }
  169. c <- true
  170. }
  171. // Dial with timeout
  172. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  173. return net.DialTimeout(network, addr, time.Millisecond*10)
  174. }