test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-etcd/etcd"
  5. "io/ioutil"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "time"
  11. )
  12. var client = http.Client{
  13. Transport: &http.Transport{
  14. Dial: dialTimeoutFast,
  15. },
  16. }
  17. // Sending set commands
  18. func set(stop chan bool) {
  19. stopSet := false
  20. i := 0
  21. for {
  22. key := fmt.Sprintf("%s_%v", "foo", i)
  23. result, err := etcd.Set(key, "bar", 0)
  24. if err != nil || result.Key != "/"+key || result.Value != "bar" {
  25. select {
  26. case <-stop:
  27. stopSet = true
  28. default:
  29. }
  30. }
  31. select {
  32. case <-stop:
  33. stopSet = true
  34. default:
  35. }
  36. if stopSet {
  37. break
  38. }
  39. i++
  40. }
  41. fmt.Println("set stop")
  42. stop <- true
  43. }
  44. // Create a cluster of etcd nodes
  45. func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
  46. argGroup := make([][]string, size)
  47. for i := 0; i < size; i++ {
  48. if i == 0 {
  49. argGroup[i] = []string{"etcd", "-d=/tmp/node1"}
  50. } else {
  51. strI := strconv.Itoa(i + 1)
  52. argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
  53. }
  54. }
  55. etcds := make([]*os.Process, size)
  56. for i, _ := range etcds {
  57. var err error
  58. etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
  59. if err != nil {
  60. return nil, nil, err
  61. }
  62. }
  63. return argGroup, etcds, nil
  64. }
  65. // Destroy all the nodes in the cluster
  66. func destroyCluster(etcds []*os.Process) error {
  67. for i, etcd := range etcds {
  68. err := etcd.Kill()
  69. fmt.Println("kill ", i)
  70. if err != nil {
  71. panic(err.Error())
  72. }
  73. etcd.Release()
  74. }
  75. return nil
  76. }
  77. //
  78. func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
  79. leaderMap := make(map[int]string)
  80. baseAddrFormat := "http://0.0.0.0:400%d/leader"
  81. for {
  82. knownLeader := "unknown"
  83. dead := 0
  84. var i int
  85. for i = 0; i < size; i++ {
  86. leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
  87. if err == nil {
  88. leaderMap[i] = leader
  89. if knownLeader == "unknown" {
  90. knownLeader = leader
  91. } else {
  92. if leader != knownLeader {
  93. break
  94. }
  95. }
  96. } else {
  97. dead++
  98. if dead > allowDeadNum {
  99. break
  100. }
  101. }
  102. }
  103. if i == size {
  104. select {
  105. case <-leaderChan:
  106. leaderChan <- knownLeader
  107. default:
  108. leaderChan <- knownLeader
  109. }
  110. }
  111. time.Sleep(time.Millisecond * 10)
  112. }
  113. }
  114. func getLeader(addr string) (string, error) {
  115. resp, err := client.Get(addr)
  116. if err != nil {
  117. return "", err
  118. }
  119. if resp.StatusCode != http.StatusOK {
  120. resp.Body.Close()
  121. return "", fmt.Errorf("no leader")
  122. }
  123. b, err := ioutil.ReadAll(resp.Body)
  124. resp.Body.Close()
  125. if err != nil {
  126. return "", err
  127. }
  128. return string(b), nil
  129. }
  130. // Dial with timeout
  131. func dialTimeoutFast(network, addr string) (net.Conn, error) {
  132. return net.DialTimeout(network, addr, time.Millisecond*10)
  133. }