test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. var client = http.Client{
  13. Transport: &http.Transport{
  14. Dial: dialTimeoutFast,
  15. },
  16. }
  17. // Sending set commands
  18. func set(stop chan bool) {
  19. stopSet := false
  20. i := 0
  21. c := etcd.NewClient()
  22. for {
  23. key := fmt.Sprintf("%s_%v", "foo", i)
  24. result, err := c.Set(key, "bar", 0)
  25. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  26. select {
  27. case <-stop:
  28. stopSet = true
  29. default:
  30. }
  31. }
  32. select {
  33. case <-stop:
  34. stopSet = true
  35. default:
  36. }
  37. if stopSet {
  38. break
  39. }
  40. i++
  41. }
  42. fmt.Println("set stop")
  43. stop <- true
  44. }
  45. // Create a cluster of etcd nodes
  46. func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
  47. argGroup := make([][]string, size)
  48. sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
  49. "-serverCert=./fixtures/ca/server.crt",
  50. "-serverKey=./fixtures/ca/server.key.insecure",
  51. }
  52. sslServer2 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
  53. "-serverCert=./fixtures/ca/server2.crt",
  54. "-serverKey=./fixtures/ca/server2.key.insecure",
  55. }
  56. for i := 0; i < size; i++ {
  57. if i == 0 {
  58. argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
  59. if ssl {
  60. argGroup[i] = append(argGroup[i], sslServer1...)
  61. }
  62. } else {
  63. strI := strconv.Itoa(i + 1)
  64. argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
  65. if ssl {
  66. argGroup[i] = append(argGroup[i], sslServer2...)
  67. }
  68. }
  69. }
  70. etcds := make([]*os.Process, size)
  71. for i, _ := range etcds {
  72. var err error
  73. etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-f"), procAttr)
  74. if err != nil {
  75. return nil, nil, err
  76. }
  77. // TODOBP: Change this sleep to wait until the master is up.
  78. // The problem is that if the master isn't up then the children
  79. // have to retry. This retry can take upwards of 15 seconds
  80. // which slows tests way down and some of them fail.
  81. if i == 0 {
  82. time.Sleep(time.Second)
  83. }
  84. }
  85. return argGroup, etcds, nil
  86. }
  87. // Destroy all the nodes in the cluster
  88. func destroyCluster(etcds []*os.Process) error {
  89. for i, etcd := range etcds {
  90. err := etcd.Kill()
  91. fmt.Println("kill ", i)
  92. if err != nil {
  93. panic(err.Error())
  94. }
  95. etcd.Release()
  96. }
  97. return nil
  98. }
  99. //
  100. func monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
  101. leaderMap := make(map[int]string)
  102. baseAddrFormat := "http://0.0.0.0:400%d"
  103. for {
  104. knownLeader := "unknown"
  105. dead := 0
  106. var i int
  107. for i = 0; i < size; i++ {
  108. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  109. if err == nil {
  110. //fmt.Printf("leader:[%d]->%s\n", i, leader)
  111. leaderMap[i] = leader
  112. if knownLeader == "unknown" {
  113. knownLeader = leader
  114. } else {
  115. if leader != knownLeader {
  116. break
  117. }
  118. }
  119. } else {
  120. //fmt.Printf("dead: [%d]\n", i)
  121. dead++
  122. if dead > allowDeadNum {
  123. break
  124. }
  125. }
  126. }
  127. if i == size {
  128. //fmt.Println("leader found")
  129. select {
  130. case <- stop:
  131. return
  132. case <-leaderChan:
  133. leaderChan <- knownLeader
  134. default:
  135. leaderChan <- knownLeader
  136. }
  137. }
  138. if dead == 0 {
  139. select {
  140. case <-all:
  141. all <- true
  142. default:
  143. all <- true
  144. }
  145. }
  146. time.Sleep(time.Millisecond * 10)
  147. }
  148. }
  149. func getLeader(addr string) (string, error) {
  150. resp, err := client.Get(addr + "/leader")
  151. if err != nil {
  152. return "", err
  153. }
  154. if resp.StatusCode != http.StatusOK {
  155. resp.Body.Close()
  156. return "", fmt.Errorf("no leader")
  157. }
  158. b, err := ioutil.ReadAll(resp.Body)
  159. resp.Body.Close()
  160. if err != nil {
  161. return "", err
  162. }
  163. return string(b), nil
  164. }
  165. func directSet() {
  166. c := make(chan bool, 1000)
  167. for i := 0; i < 1000; i++ {
  168. go send(c)
  169. }
  170. for i := 0; i < 1000; i++ {
  171. <-c
  172. }
  173. }
  174. func send(c chan bool) {
  175. for i := 0; i < 10; i++ {
  176. command := &SetCommand{}
  177. command.Key = "foo"
  178. command.Value = "bar"
  179. command.ExpireTime = time.Unix(0, 0)
  180. raftServer.Do(command)
  181. }
  182. c <- true
  183. }
  184. // Dial with timeout
  185. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  186. return net.DialTimeout(network, addr, time.Millisecond*10)
  187. }