test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. //"net/url"
  12. )
  13. var client = http.Client{
  14. Transport: &http.Transport{
  15. Dial: dialTimeoutFast,
  16. },
  17. }
  18. // Sending set commands
  19. func set(stop chan bool) {
  20. stopSet := false
  21. i := 0
  22. c := etcd.NewClient()
  23. for {
  24. key := fmt.Sprintf("%s_%v", "foo", i)
  25. result, err := c.Set(key, "bar", 0)
  26. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  27. select {
  28. case <-stop:
  29. stopSet = true
  30. default:
  31. }
  32. }
  33. select {
  34. case <-stop:
  35. stopSet = true
  36. default:
  37. }
  38. if stopSet {
  39. break
  40. }
  41. i++
  42. }
  43. fmt.Println("set stop")
  44. stop <- true
  45. }
  46. // Create a cluster of etcd nodes
  47. func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
  48. argGroup := make([][]string, size)
  49. for i := 0; i < size; i++ {
  50. if i == 0 {
  51. argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
  52. } else {
  53. strI := strconv.Itoa(i + 1)
  54. argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
  55. }
  56. }
  57. etcds := make([]*os.Process, size)
  58. for i, _ := range etcds {
  59. var err error
  60. etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-f"), procAttr)
  61. if err != nil {
  62. return nil, nil, err
  63. }
  64. // TODOBP: Change this sleep to wait until the master is up.
  65. // The problem is that if the master isn't up then the children
  66. // have to retry. This retry can take upwards of 15 seconds
  67. // which slows tests way down and some of them fail.
  68. if i == 0 {
  69. time.Sleep(time.Second)
  70. }
  71. }
  72. return argGroup, etcds, nil
  73. }
  74. // Destroy all the nodes in the cluster
  75. func destroyCluster(etcds []*os.Process) error {
  76. for i, etcd := range etcds {
  77. err := etcd.Kill()
  78. fmt.Println("kill ", i)
  79. if err != nil {
  80. panic(err.Error())
  81. }
  82. etcd.Release()
  83. }
  84. return nil
  85. }
  86. //
  87. func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
  88. leaderMap := make(map[int]string)
  89. baseAddrFormat := "http://0.0.0.0:400%d"
  90. for {
  91. knownLeader := "unknown"
  92. dead := 0
  93. var i int
  94. for i = 0; i < size; i++ {
  95. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  96. if err == nil {
  97. leaderMap[i] = leader
  98. if knownLeader == "unknown" {
  99. knownLeader = leader
  100. } else {
  101. if leader != knownLeader {
  102. break
  103. }
  104. }
  105. } else {
  106. dead++
  107. if dead > allowDeadNum {
  108. break
  109. }
  110. }
  111. }
  112. if i == size {
  113. select {
  114. case <-leaderChan:
  115. leaderChan <- knownLeader
  116. default:
  117. leaderChan <- knownLeader
  118. }
  119. }
  120. time.Sleep(time.Millisecond * 10)
  121. }
  122. }
  123. func getLeader(addr string) (string, error) {
  124. resp, err := client.Get(addr + "/leader")
  125. if err != nil {
  126. return "", err
  127. }
  128. if resp.StatusCode != http.StatusOK {
  129. resp.Body.Close()
  130. return "", fmt.Errorf("no leader")
  131. }
  132. b, err := ioutil.ReadAll(resp.Body)
  133. resp.Body.Close()
  134. if err != nil {
  135. return "", err
  136. }
  137. return string(b), nil
  138. }
  139. func directSet() {
  140. c := make(chan bool, 1000)
  141. for i := 0; i < 1000; i++ {
  142. go send(c)
  143. }
  144. for i := 0; i < 1000; i++ {
  145. <-c
  146. }
  147. }
  148. func send(c chan bool) {
  149. for i := 0; i < 10; i++ {
  150. command := &SetCommand{}
  151. command.Key = "foo"
  152. command.Value = "bar"
  153. command.ExpireTime = time.Unix(0, 0)
  154. raftServer.Do(command)
  155. }
  156. c <- true
  157. }
  158. // Dial with timeout
  159. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  160. return net.DialTimeout(network, addr, time.Millisecond*10)
  161. }