main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "flag"
  17. "fmt"
  18. "log"
  19. "math/rand"
  20. "os"
  21. "strings"
  22. "sync"
  23. "time"
  24. "golang.org/x/net/context"
  25. "golang.org/x/time/rate"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/codes"
  28. "github.com/coreos/etcd/clientv3"
  29. "github.com/coreos/etcd/clientv3/concurrency"
  30. )
  31. var clientTimeout int64
  32. func init() {
  33. rand.Seed(time.Now().UTC().UnixNano())
  34. }
  35. func main() {
  36. log.SetFlags(log.Lmicroseconds)
  37. endpointStr := flag.String("endpoints", "localhost:2379", "endpoints of etcd cluster")
  38. mode := flag.String("mode", "watcher", "test mode (election, lock-racer, lease-renewer, watcher)")
  39. round := flag.Int("rounds", 100, "number of rounds to run")
  40. flag.Int64Var(&clientTimeout, "client-timeout", 60, "max timeout seconds for a client to get connection")
  41. flag.Parse()
  42. eps := strings.Split(*endpointStr, ",")
  43. switch *mode {
  44. case "election":
  45. runElection(eps, *round)
  46. case "lock-racer":
  47. runRacer(eps, *round)
  48. case "lease-renewer":
  49. runLeaseRenewer(eps)
  50. case "watcher":
  51. runWatcher(eps, *round)
  52. default:
  53. fmt.Fprintf(os.Stderr, "unsupported mode %v\n", *mode)
  54. }
  55. }
  56. func runElection(eps []string, rounds int) {
  57. rcs := make([]roundClient, 15)
  58. validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
  59. for range rcs {
  60. releasec <- struct{}{}
  61. }
  62. for i := range rcs {
  63. v := fmt.Sprintf("%d", i)
  64. observedLeader := ""
  65. validateWaiters := 0
  66. rcs[i].c = newClient(eps)
  67. var (
  68. s *concurrency.Session
  69. err error
  70. )
  71. for {
  72. s, err = concurrency.NewSession(rcs[i].c)
  73. if err == nil {
  74. break
  75. }
  76. }
  77. e := concurrency.NewElection(s, "electors")
  78. rcs[i].acquire = func() error {
  79. <-releasec
  80. ctx, cancel := context.WithCancel(context.Background())
  81. go func() {
  82. if ol, ok := <-e.Observe(ctx); ok {
  83. observedLeader = string(ol.Kvs[0].Value)
  84. if observedLeader != v {
  85. cancel()
  86. }
  87. }
  88. }()
  89. err = e.Campaign(ctx, v)
  90. if err == nil {
  91. observedLeader = v
  92. }
  93. if observedLeader == v {
  94. validateWaiters = len(rcs)
  95. }
  96. select {
  97. case <-ctx.Done():
  98. return nil
  99. default:
  100. cancel()
  101. return err
  102. }
  103. }
  104. rcs[i].validate = func() error {
  105. if l, err := e.Leader(context.TODO()); err == nil && l != observedLeader {
  106. return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
  107. }
  108. validatec <- struct{}{}
  109. return nil
  110. }
  111. rcs[i].release = func() error {
  112. for validateWaiters > 0 {
  113. select {
  114. case <-validatec:
  115. validateWaiters--
  116. default:
  117. return fmt.Errorf("waiting on followers")
  118. }
  119. }
  120. if err := e.Resign(context.TODO()); err != nil {
  121. return err
  122. }
  123. if observedLeader == v {
  124. for range rcs {
  125. releasec <- struct{}{}
  126. }
  127. }
  128. observedLeader = ""
  129. return nil
  130. }
  131. }
  132. doRounds(rcs, rounds)
  133. }
  134. func runLeaseRenewer(eps []string) {
  135. c := newClient(eps)
  136. ctx := context.Background()
  137. for {
  138. var (
  139. l *clientv3.LeaseGrantResponse
  140. lk *clientv3.LeaseKeepAliveResponse
  141. err error
  142. )
  143. for {
  144. l, err = c.Lease.Grant(ctx, 5)
  145. if err == nil {
  146. break
  147. }
  148. }
  149. expire := time.Now().Add(time.Duration(l.TTL-1) * time.Second)
  150. for {
  151. lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
  152. if grpc.Code(err) == codes.NotFound {
  153. if time.Since(expire) < 0 {
  154. log.Printf("bad renew! exceeded: %v", time.Since(expire))
  155. for {
  156. lk, err = c.Lease.KeepAliveOnce(ctx, l.ID)
  157. fmt.Println(lk, err)
  158. time.Sleep(time.Second)
  159. }
  160. }
  161. log.Printf("lost lease %d, expire: %v\n", l.ID, expire)
  162. break
  163. }
  164. if err != nil {
  165. continue
  166. }
  167. expire = time.Now().Add(time.Duration(lk.TTL-1) * time.Second)
  168. log.Printf("renewed lease %d, expire: %v\n", lk.ID, expire)
  169. time.Sleep(time.Duration(lk.TTL-2) * time.Second)
  170. }
  171. }
  172. }
  173. func runRacer(eps []string, round int) {
  174. rcs := make([]roundClient, 15)
  175. ctx := context.Background()
  176. cnt := 0
  177. for i := range rcs {
  178. rcs[i].c = newClient(eps)
  179. var (
  180. s *concurrency.Session
  181. err error
  182. )
  183. for {
  184. s, err = concurrency.NewSession(rcs[i].c)
  185. if err == nil {
  186. break
  187. }
  188. }
  189. m := concurrency.NewMutex(s, "racers")
  190. rcs[i].acquire = func() error { return m.Lock(ctx) }
  191. rcs[i].validate = func() error {
  192. if cnt++; cnt != 1 {
  193. return fmt.Errorf("bad lock; count: %d", cnt)
  194. }
  195. return nil
  196. }
  197. rcs[i].release = func() error {
  198. if err := m.Unlock(ctx); err != nil {
  199. return err
  200. }
  201. cnt = 0
  202. return nil
  203. }
  204. }
  205. doRounds(rcs, round)
  206. }
  207. func runWatcher(eps []string, limit int) {
  208. ctx := context.Background()
  209. for round := 0; round < limit; round++ {
  210. performWatchOnPrefixes(ctx, eps, round)
  211. }
  212. }
  213. func performWatchOnPrefixes(ctx context.Context, eps []string, round int) {
  214. runningTime := 60 * time.Second // time for which operation should be performed
  215. noOfPrefixes := 36 // total number of prefixes which will be watched upon
  216. watchPerPrefix := 10 // number of watchers per prefix
  217. reqRate := 30 // put request per second
  218. keyPrePrefix := 30 // max number of keyPrePrefixs for put operation
  219. prefixes := generateUniqueKeys(5, noOfPrefixes)
  220. keys := generateRandomKeys(10, keyPrePrefix)
  221. roundPrefix := fmt.Sprint("%16x", round)
  222. var (
  223. revision int64
  224. wg sync.WaitGroup
  225. gr *clientv3.GetResponse
  226. err error
  227. )
  228. // create client for performing get and put operations
  229. client := newClient(eps)
  230. defer client.Close()
  231. // get revision using get request
  232. gr = getWithRetry(client, ctx, "non-existant")
  233. revision = gr.Header.Revision
  234. ctxt, cancel := context.WithDeadline(ctx, time.Now().Add(runningTime))
  235. defer cancel()
  236. // generate and put keys in cluster
  237. limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
  238. go func() {
  239. var modrevision int64
  240. for _, key := range keys {
  241. for _, prefix := range prefixes {
  242. key := roundPrefix + "-" + prefix + "-" + key
  243. // limit key put as per reqRate
  244. if err = limiter.Wait(ctxt); err != nil {
  245. break
  246. }
  247. modrevision = 0
  248. gr = getWithRetry(client, ctxt, key)
  249. kvs := gr.Kvs
  250. if len(kvs) > 0 {
  251. modrevision = gr.Kvs[0].ModRevision
  252. }
  253. for {
  254. txn := client.Txn(ctxt)
  255. _, err = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", modrevision)).Then(clientv3.OpPut(key, key)).Commit()
  256. if err == nil {
  257. break
  258. }
  259. if err == context.DeadlineExceeded {
  260. return
  261. }
  262. }
  263. }
  264. }
  265. }()
  266. ctxc, cancelc := context.WithCancel(ctx)
  267. wcs := make([]clientv3.WatchChan, 0)
  268. rcs := make([]*clientv3.Client, 0)
  269. wg.Add(noOfPrefixes * watchPerPrefix)
  270. for _, prefix := range prefixes {
  271. for j := 0; j < watchPerPrefix; j++ {
  272. go func(prefix string) {
  273. defer wg.Done()
  274. rc := newClient(eps)
  275. rcs = append(rcs, rc)
  276. wc := rc.Watch(ctxc, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
  277. wcs = append(wcs, wc)
  278. for n := 0; n < len(keys); {
  279. select {
  280. case watchChan := <-wc:
  281. for _, event := range watchChan.Events {
  282. expectedKey := prefix + "-" + keys[n]
  283. receivedKey := string(event.Kv.Key)
  284. if expectedKey != receivedKey {
  285. log.Fatalf("expected key %q, got %q for prefix : %q\n", expectedKey, receivedKey, prefix)
  286. }
  287. n++
  288. }
  289. case <-ctxt.Done():
  290. return
  291. }
  292. }
  293. }(roundPrefix + "-" + prefix)
  294. }
  295. }
  296. wg.Wait()
  297. // cancel all watch channels
  298. cancelc()
  299. // verify all watch channels are closed
  300. for e, wc := range wcs {
  301. if _, ok := <-wc; ok {
  302. log.Fatalf("expected wc to be closed, but received %v", e)
  303. }
  304. }
  305. for _, rc := range rcs {
  306. rc.Close()
  307. }
  308. deletePrefixWithRety(client, ctx, roundPrefix)
  309. }
  310. func deletePrefixWithRety(client *clientv3.Client, ctx context.Context, key string) {
  311. for {
  312. if _, err := client.Delete(ctx, key, clientv3.WithRange(key+"z")); err == nil {
  313. return
  314. }
  315. }
  316. }
  317. func getWithRetry(client *clientv3.Client, ctx context.Context, key string) *clientv3.GetResponse {
  318. for {
  319. if gr, err := client.Get(ctx, key); err == nil {
  320. return gr
  321. }
  322. }
  323. }
  324. func generateUniqueKeys(maxstrlen uint, keynos int) []string {
  325. keyMap := make(map[string]bool)
  326. keys := make([]string, 0)
  327. count := 0
  328. key := ""
  329. for {
  330. key = generateRandomKey(maxstrlen)
  331. _, ok := keyMap[key]
  332. if !ok {
  333. keyMap[key] = true
  334. keys = append(keys, key)
  335. count++
  336. if len(keys) == keynos {
  337. break
  338. }
  339. }
  340. }
  341. return keys
  342. }
  343. func generateRandomKeys(maxstrlen uint, keynos int) []string {
  344. keys := make([]string, 0)
  345. key := ""
  346. for i := 0; i < keynos; i++ {
  347. key = generateRandomKey(maxstrlen)
  348. keys = append(keys, key)
  349. }
  350. return keys
  351. }
  352. func generateRandomKey(strlen uint) string {
  353. chars := "abcdefghijklmnopqrstuvwxyz0123456789"
  354. result := make([]byte, strlen)
  355. for i := 0; i < int(strlen); i++ {
  356. result[i] = chars[rand.Intn(len(chars))]
  357. }
  358. key := string(result)
  359. return key
  360. }
  361. func newClient(eps []string) *clientv3.Client {
  362. c, err := clientv3.New(clientv3.Config{
  363. Endpoints: eps,
  364. DialTimeout: time.Duration(clientTimeout) * time.Second,
  365. })
  366. if err != nil {
  367. log.Fatal(err)
  368. }
  369. return c
  370. }
  371. type roundClient struct {
  372. c *clientv3.Client
  373. progress int
  374. acquire func() error
  375. validate func() error
  376. release func() error
  377. }
  378. func doRounds(rcs []roundClient, rounds int) {
  379. var mu sync.Mutex
  380. var wg sync.WaitGroup
  381. wg.Add(len(rcs))
  382. finished := make(chan struct{}, 0)
  383. for i := range rcs {
  384. go func(rc *roundClient) {
  385. defer wg.Done()
  386. for rc.progress < rounds {
  387. for rc.acquire() != nil { /* spin */
  388. }
  389. mu.Lock()
  390. if err := rc.validate(); err != nil {
  391. log.Fatal(err)
  392. }
  393. mu.Unlock()
  394. time.Sleep(10 * time.Millisecond)
  395. rc.progress++
  396. finished <- struct{}{}
  397. mu.Lock()
  398. for rc.release() != nil {
  399. mu.Unlock()
  400. mu.Lock()
  401. }
  402. mu.Unlock()
  403. }
  404. }(&rcs[i])
  405. }
  406. start := time.Now()
  407. for i := 1; i < len(rcs)*rounds+1; i++ {
  408. select {
  409. case <-finished:
  410. if i%100 == 0 {
  411. fmt.Printf("finished %d, took %v\n", i, time.Since(start))
  412. start = time.Now()
  413. }
  414. case <-time.After(time.Minute):
  415. log.Panic("no progress after 1 minute!")
  416. }
  417. }
  418. wg.Wait()
  419. for _, rc := range rcs {
  420. rc.c.Close()
  421. }
  422. }