v3_lock_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "math/rand"
  17. "sync"
  18. "testing"
  19. "time"
  20. "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/clientv3/concurrency"
  22. "github.com/coreos/etcd/contrib/recipes"
  23. "golang.org/x/net/context"
  24. )
  25. func TestMutexSingleNode(t *testing.T) {
  26. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  27. defer clus.Terminate(t)
  28. var clients []*clientv3.Client
  29. testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
  30. closeClients(t, clients)
  31. }
  32. func TestMutexMultiNode(t *testing.T) {
  33. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  34. defer clus.Terminate(t)
  35. var clients []*clientv3.Client
  36. testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
  37. closeClients(t, clients)
  38. }
  39. func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
  40. // stream lock acquisitions
  41. lockedC := make(chan *concurrency.Mutex)
  42. for i := 0; i < waiters; i++ {
  43. go func() {
  44. session, err := concurrency.NewSession(chooseClient())
  45. if err != nil {
  46. t.Error(err)
  47. }
  48. m := concurrency.NewMutex(session, "test-mutex")
  49. if err := m.Lock(context.TODO()); err != nil {
  50. t.Fatalf("could not wait on lock (%v)", err)
  51. }
  52. lockedC <- m
  53. }()
  54. }
  55. // unlock locked mutexes
  56. timerC := time.After(time.Duration(waiters) * time.Second)
  57. for i := 0; i < waiters; i++ {
  58. select {
  59. case <-timerC:
  60. t.Fatalf("timed out waiting for lock %d", i)
  61. case m := <-lockedC:
  62. // lock acquired with m
  63. select {
  64. case <-lockedC:
  65. t.Fatalf("lock %d followers did not wait", i)
  66. default:
  67. }
  68. if err := m.Unlock(context.TODO()); err != nil {
  69. t.Fatalf("could not release lock (%v)", err)
  70. }
  71. }
  72. }
  73. }
  74. // TestMutexSessionRelock ensures that acquiring the same lock with the same
  75. // session will not result in deadlock.
  76. func TestMutexSessionRelock(t *testing.T) {
  77. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  78. defer clus.Terminate(t)
  79. session, err := concurrency.NewSession(clus.RandClient())
  80. if err != nil {
  81. t.Error(err)
  82. }
  83. m := concurrency.NewMutex(session, "test-mutex")
  84. if err := m.Lock(context.TODO()); err != nil {
  85. t.Fatal(err)
  86. }
  87. m2 := concurrency.NewMutex(session, "test-mutex")
  88. if err := m2.Lock(context.TODO()); err != nil {
  89. t.Fatal(err)
  90. }
  91. }
  92. func BenchmarkMutex4Waiters(b *testing.B) {
  93. // XXX switch tests to use TB interface
  94. clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
  95. defer clus.Terminate(nil)
  96. for i := 0; i < b.N; i++ {
  97. testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
  98. }
  99. }
  100. func TestRWMutexSingleNode(t *testing.T) {
  101. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  102. defer clus.Terminate(t)
  103. testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
  104. }
  105. func TestRWMutexMultiNode(t *testing.T) {
  106. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  107. defer clus.Terminate(t)
  108. testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
  109. }
  110. func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
  111. // stream rwlock acquistions
  112. rlockedC := make(chan *recipe.RWMutex, 1)
  113. wlockedC := make(chan *recipe.RWMutex, 1)
  114. for i := 0; i < waiters; i++ {
  115. go func() {
  116. session, err := concurrency.NewSession(chooseClient())
  117. if err != nil {
  118. t.Error(err)
  119. }
  120. rwm := recipe.NewRWMutex(session, "test-rwmutex")
  121. if rand.Intn(2) == 0 {
  122. if err := rwm.RLock(); err != nil {
  123. t.Fatalf("could not rlock (%v)", err)
  124. }
  125. rlockedC <- rwm
  126. } else {
  127. if err := rwm.Lock(); err != nil {
  128. t.Fatalf("could not lock (%v)", err)
  129. }
  130. wlockedC <- rwm
  131. }
  132. }()
  133. }
  134. // unlock locked rwmutexes
  135. timerC := time.After(time.Duration(waiters) * time.Second)
  136. for i := 0; i < waiters; i++ {
  137. select {
  138. case <-timerC:
  139. t.Fatalf("timed out waiting for lock %d", i)
  140. case wl := <-wlockedC:
  141. select {
  142. case <-rlockedC:
  143. t.Fatalf("rlock %d readers did not wait", i)
  144. default:
  145. }
  146. if err := wl.Unlock(); err != nil {
  147. t.Fatalf("could not release lock (%v)", err)
  148. }
  149. case rl := <-rlockedC:
  150. select {
  151. case <-wlockedC:
  152. t.Fatalf("rlock %d writers did not wait", i)
  153. default:
  154. }
  155. if err := rl.RUnlock(); err != nil {
  156. t.Fatalf("could not release rlock (%v)", err)
  157. }
  158. }
  159. }
  160. }
  161. func makeClients(t *testing.T, clients *[]*clientv3.Client, choose func() *member) func() *clientv3.Client {
  162. var mu sync.Mutex
  163. *clients = nil
  164. return func() *clientv3.Client {
  165. cli, err := NewClientV3(choose())
  166. if err != nil {
  167. t.Fatalf("cannot create client: %v", err)
  168. }
  169. mu.Lock()
  170. *clients = append(*clients, cli)
  171. mu.Unlock()
  172. return cli
  173. }
  174. }
  175. func makeSingleNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
  176. return makeClients(t, clients, func() *member {
  177. return clus.Members[0]
  178. })
  179. }
  180. func makeMultiNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
  181. return makeClients(t, clients, func() *member {
  182. return clus.Members[rand.Intn(len(clus.Members))]
  183. })
  184. }
  185. func closeClients(t *testing.T, clients []*clientv3.Client) {
  186. for _, cli := range clients {
  187. if err := cli.Close(); err != nil {
  188. t.Fatal(err)
  189. }
  190. }
  191. }