cluster.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "net"
  19. "strings"
  20. "time"
  21. "golang.org/x/net/context"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
  24. "golang.org/x/time/rate"
  25. "google.golang.org/grpc"
  26. )
  27. const (
  28. peerURLPort = 2380
  29. failpointPort = 2381
  30. )
  31. type cluster struct {
  32. v2Only bool // to be deprecated
  33. datadir string
  34. stressQPS int
  35. stressKeySize int
  36. stressKeySuffixRange int
  37. stressKeyRangeLimit int
  38. Size int
  39. Stressers []Stresser
  40. Members []*member
  41. }
  42. type ClusterStatus struct {
  43. AgentStatuses map[string]client.Status
  44. }
  45. func (c *cluster) bootstrap(agentEndpoints []string) error {
  46. size := len(agentEndpoints)
  47. members := make([]*member, size)
  48. memberNameURLs := make([]string, size)
  49. for i, u := range agentEndpoints {
  50. agent, err := client.NewAgent(u)
  51. if err != nil {
  52. return err
  53. }
  54. host, _, err := net.SplitHostPort(u)
  55. if err != nil {
  56. return err
  57. }
  58. members[i] = &member{
  59. Agent: agent,
  60. Endpoint: u,
  61. Name: fmt.Sprintf("etcd-%d", i),
  62. ClientURL: fmt.Sprintf("http://%s:2379", host),
  63. PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort),
  64. FailpointURL: fmt.Sprintf("http://%s:%d", host, failpointPort),
  65. }
  66. memberNameURLs[i] = members[i].ClusterEntry()
  67. }
  68. clusterStr := strings.Join(memberNameURLs, ",")
  69. token := fmt.Sprint(rand.Int())
  70. for i, m := range members {
  71. flags := append(
  72. m.Flags(),
  73. "--data-dir", c.datadir,
  74. "--initial-cluster-token", token,
  75. "--initial-cluster", clusterStr)
  76. if _, err := m.Agent.Start(flags...); err != nil {
  77. // cleanup
  78. for _, m := range members[:i] {
  79. m.Agent.Terminate()
  80. }
  81. return err
  82. }
  83. }
  84. // TODO: Too intensive stressers can panic etcd member with
  85. // 'out of memory' error. Put rate limits in server side.
  86. stressN := 100
  87. c.Stressers = make([]Stresser, len(members))
  88. limiter := rate.NewLimiter(rate.Limit(c.stressQPS), c.stressQPS)
  89. for i, m := range members {
  90. if c.v2Only {
  91. c.Stressers[i] = &stresserV2{
  92. Endpoint: m.ClientURL,
  93. keySize: c.stressKeySize,
  94. keySuffixRange: c.stressKeySuffixRange,
  95. N: stressN,
  96. }
  97. } else {
  98. c.Stressers[i] = &stresser{
  99. Endpoint: m.grpcAddr(),
  100. keySize: c.stressKeySize,
  101. keySuffixRange: c.stressKeySuffixRange,
  102. keyRangeLimit: c.stressKeyRangeLimit,
  103. N: stressN,
  104. rateLimiter: limiter,
  105. }
  106. }
  107. go c.Stressers[i].Stress()
  108. }
  109. c.Size = size
  110. c.Members = members
  111. return nil
  112. }
  113. func (c *cluster) Reset() error {
  114. eps := make([]string, len(c.Members))
  115. for i, m := range c.Members {
  116. eps[i] = m.Endpoint
  117. }
  118. return c.bootstrap(eps)
  119. }
  120. func (c *cluster) WaitHealth() error {
  121. var err error
  122. // wait 60s to check cluster health.
  123. // TODO: set it to a reasonable value. It is set that high because
  124. // follower may use long time to catch up the leader when reboot under
  125. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  126. healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
  127. if c.v2Only {
  128. healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
  129. }
  130. for i := 0; i < 60; i++ {
  131. for _, m := range c.Members {
  132. if err = healthFunc(m); err != nil {
  133. break
  134. }
  135. }
  136. if err == nil {
  137. return nil
  138. }
  139. plog.Warningf("#%d setHealthKey error (%v)", i, err)
  140. time.Sleep(time.Second)
  141. }
  142. return err
  143. }
  144. // GetLeader returns the index of leader and error if any.
  145. func (c *cluster) GetLeader() (int, error) {
  146. if c.v2Only {
  147. return 0, nil
  148. }
  149. for i, m := range c.Members {
  150. isLeader, err := m.IsLeader()
  151. if isLeader || err != nil {
  152. return i, err
  153. }
  154. }
  155. return 0, fmt.Errorf("no leader found")
  156. }
  157. func (c *cluster) Report() (success, failure int) {
  158. for _, stress := range c.Stressers {
  159. s, f := stress.Report()
  160. success += s
  161. failure += f
  162. }
  163. return
  164. }
  165. func (c *cluster) Cleanup() error {
  166. var lasterr error
  167. for _, m := range c.Members {
  168. if err := m.Agent.Cleanup(); err != nil {
  169. lasterr = err
  170. }
  171. }
  172. for _, s := range c.Stressers {
  173. s.Cancel()
  174. }
  175. return lasterr
  176. }
  177. func (c *cluster) Terminate() {
  178. for _, m := range c.Members {
  179. m.Agent.Terminate()
  180. }
  181. for _, s := range c.Stressers {
  182. s.Cancel()
  183. }
  184. }
  185. func (c *cluster) Status() ClusterStatus {
  186. cs := ClusterStatus{
  187. AgentStatuses: make(map[string]client.Status),
  188. }
  189. for _, m := range c.Members {
  190. s, err := m.Agent.Status()
  191. // TODO: add a.Desc() as a key of the map
  192. desc := m.Endpoint
  193. if err != nil {
  194. cs.AgentStatuses[desc] = client.Status{State: "unknown"}
  195. plog.Printf("failed to get the status of agent [%s]", desc)
  196. }
  197. cs.AgentStatuses[desc] = s
  198. }
  199. return cs
  200. }
  201. func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  202. revs := make(map[string]int64)
  203. hashes := make(map[string]int64)
  204. for _, m := range c.Members {
  205. rev, hash, err := m.RevHash()
  206. if err != nil {
  207. return nil, nil, err
  208. }
  209. revs[m.ClientURL] = rev
  210. hashes[m.ClientURL] = hash
  211. }
  212. return revs, hashes, nil
  213. }
  214. func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  215. if rev <= 0 {
  216. return nil
  217. }
  218. for i, m := range c.Members {
  219. u := m.ClientURL
  220. conn, derr := m.dialGRPC()
  221. if derr != nil {
  222. plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u)
  223. err = derr
  224. continue
  225. }
  226. kvc := pb.NewKVClient(conn)
  227. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  228. plog.Printf("[compact kv #%d] starting (endpoint %s)", i, u)
  229. _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
  230. cancel()
  231. conn.Close()
  232. succeed := true
  233. if cerr != nil {
  234. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  235. plog.Printf("[compact kv #%d] already compacted (endpoint %s)", i, u)
  236. } else {
  237. plog.Warningf("[compact kv #%d] error %v (endpoint %s)", i, cerr, u)
  238. err = cerr
  239. succeed = false
  240. }
  241. }
  242. if succeed {
  243. plog.Printf("[compact kv #%d] done (endpoint %s)", i, u)
  244. }
  245. }
  246. return err
  247. }
  248. func (c *cluster) checkCompact(rev int64) error {
  249. if rev == 0 {
  250. return nil
  251. }
  252. for _, m := range c.Members {
  253. if err := m.CheckCompact(rev); err != nil {
  254. return err
  255. }
  256. }
  257. return nil
  258. }
  259. func (c *cluster) defrag() error {
  260. for _, m := range c.Members {
  261. if err := m.Defrag(); err != nil {
  262. return err
  263. }
  264. }
  265. return nil
  266. }