v3_lock_test.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "context"
  17. "math/rand"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/clientv3"
  22. "github.com/coreos/etcd/clientv3/concurrency"
  23. "github.com/coreos/etcd/contrib/recipes"
  24. "github.com/coreos/etcd/mvcc/mvccpb"
  25. "github.com/coreos/etcd/pkg/testutil"
  26. )
  27. func TestMutexSingleNode(t *testing.T) {
  28. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  29. defer clus.Terminate(t)
  30. var clients []*clientv3.Client
  31. testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
  32. closeClients(t, clients)
  33. }
  34. func TestMutexMultiNode(t *testing.T) {
  35. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  36. defer clus.Terminate(t)
  37. var clients []*clientv3.Client
  38. testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
  39. closeClients(t, clients)
  40. }
  41. func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
  42. // stream lock acquisitions
  43. lockedC := make(chan *concurrency.Mutex)
  44. for i := 0; i < waiters; i++ {
  45. go func() {
  46. session, err := concurrency.NewSession(chooseClient())
  47. if err != nil {
  48. t.Error(err)
  49. }
  50. m := concurrency.NewMutex(session, "test-mutex")
  51. if err := m.Lock(context.TODO()); err != nil {
  52. t.Fatalf("could not wait on lock (%v)", err)
  53. }
  54. lockedC <- m
  55. }()
  56. }
  57. // unlock locked mutexes
  58. timerC := time.After(time.Duration(waiters) * time.Second)
  59. for i := 0; i < waiters; i++ {
  60. select {
  61. case <-timerC:
  62. t.Fatalf("timed out waiting for lock %d", i)
  63. case m := <-lockedC:
  64. // lock acquired with m
  65. select {
  66. case <-lockedC:
  67. t.Fatalf("lock %d followers did not wait", i)
  68. default:
  69. }
  70. if err := m.Unlock(context.TODO()); err != nil {
  71. t.Fatalf("could not release lock (%v)", err)
  72. }
  73. }
  74. }
  75. }
  76. // TestMutexSessionRelock ensures that acquiring the same lock with the same
  77. // session will not result in deadlock.
  78. func TestMutexSessionRelock(t *testing.T) {
  79. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  80. defer clus.Terminate(t)
  81. session, err := concurrency.NewSession(clus.RandClient())
  82. if err != nil {
  83. t.Error(err)
  84. }
  85. m := concurrency.NewMutex(session, "test-mutex")
  86. if err := m.Lock(context.TODO()); err != nil {
  87. t.Fatal(err)
  88. }
  89. m2 := concurrency.NewMutex(session, "test-mutex")
  90. if err := m2.Lock(context.TODO()); err != nil {
  91. t.Fatal(err)
  92. }
  93. }
  94. // TestMutexWaitsOnCurrentHolder ensures a mutex is only acquired once all
  95. // waiters older than the new owner are gone by testing the case where
  96. // the waiter prior to the acquirer expires before the current holder.
  97. func TestMutexWaitsOnCurrentHolder(t *testing.T) {
  98. defer testutil.AfterTest(t)
  99. clus := NewClusterV3(t, &ClusterConfig{Size: 1})
  100. defer clus.Terminate(t)
  101. cctx := context.Background()
  102. cli := clus.Client(0)
  103. firstOwnerSession, err := concurrency.NewSession(cli)
  104. if err != nil {
  105. t.Error(err)
  106. }
  107. defer firstOwnerSession.Close()
  108. firstOwnerMutex := concurrency.NewMutex(firstOwnerSession, "test-mutex")
  109. if err = firstOwnerMutex.Lock(cctx); err != nil {
  110. t.Fatal(err)
  111. }
  112. victimSession, err := concurrency.NewSession(cli)
  113. if err != nil {
  114. t.Error(err)
  115. }
  116. defer victimSession.Close()
  117. victimDonec := make(chan struct{})
  118. go func() {
  119. defer close(victimDonec)
  120. concurrency.NewMutex(victimSession, "test-mutex").Lock(cctx)
  121. }()
  122. // ensure mutexes associated with firstOwnerSession and victimSession waits before new owner
  123. wch := cli.Watch(cctx, "test-mutex", clientv3.WithPrefix(), clientv3.WithRev(1))
  124. putCounts := 0
  125. for putCounts < 2 {
  126. select {
  127. case wrp := <-wch:
  128. putCounts += len(wrp.Events)
  129. case <-time.After(time.Second):
  130. t.Fatal("failed to receive watch response")
  131. }
  132. }
  133. if putCounts != 2 {
  134. t.Fatalf("expect 2 put events, but got %v", putCounts)
  135. }
  136. newOwnerSession, err := concurrency.NewSession(cli)
  137. if err != nil {
  138. t.Error(err)
  139. }
  140. defer newOwnerSession.Close()
  141. newOwnerDonec := make(chan struct{})
  142. go func() {
  143. defer close(newOwnerDonec)
  144. concurrency.NewMutex(newOwnerSession, "test-mutex").Lock(cctx)
  145. }()
  146. select {
  147. case wrp := <-wch:
  148. if len(wrp.Events) != 1 {
  149. t.Fatalf("expect a event, but got %v events", len(wrp.Events))
  150. }
  151. if e := wrp.Events[0]; e.Type != mvccpb.PUT {
  152. t.Fatalf("expect a put event on prefix test-mutex, but got event type %v", e.Type)
  153. }
  154. case <-time.After(time.Second):
  155. t.Fatalf("failed to receive a watch response")
  156. }
  157. // simulate losing the client that's next in line to acquire the lock
  158. victimSession.Close()
  159. // ensures the deletion of victim waiter from server side.
  160. select {
  161. case wrp := <-wch:
  162. if len(wrp.Events) != 1 {
  163. t.Fatalf("expect a event, but got %v events", len(wrp.Events))
  164. }
  165. if e := wrp.Events[0]; e.Type != mvccpb.DELETE {
  166. t.Fatalf("expect a delete event on prefix test-mutex, but got event type %v", e.Type)
  167. }
  168. case <-time.After(time.Second):
  169. t.Fatal("failed to receive a watch response")
  170. }
  171. select {
  172. case <-newOwnerDonec:
  173. t.Fatal("new owner obtained lock before first owner unlocked")
  174. default:
  175. }
  176. if err := firstOwnerMutex.Unlock(cctx); err != nil {
  177. t.Fatal(err)
  178. }
  179. select {
  180. case <-newOwnerDonec:
  181. case <-time.After(time.Second):
  182. t.Fatal("new owner failed to obtain lock")
  183. }
  184. select {
  185. case <-victimDonec:
  186. case <-time.After(time.Second):
  187. t.Fatal("victim mutex failed to exit after first owner releases lock")
  188. }
  189. }
  190. func BenchmarkMutex4Waiters(b *testing.B) {
  191. // XXX switch tests to use TB interface
  192. clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
  193. defer clus.Terminate(nil)
  194. for i := 0; i < b.N; i++ {
  195. testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
  196. }
  197. }
  198. func TestRWMutexSingleNode(t *testing.T) {
  199. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  200. defer clus.Terminate(t)
  201. testRWMutex(t, 5, func() *clientv3.Client { return clus.clients[0] })
  202. }
  203. func TestRWMutexMultiNode(t *testing.T) {
  204. clus := NewClusterV3(t, &ClusterConfig{Size: 3})
  205. defer clus.Terminate(t)
  206. testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
  207. }
  208. func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
  209. // stream rwlock acquistions
  210. rlockedC := make(chan *recipe.RWMutex, 1)
  211. wlockedC := make(chan *recipe.RWMutex, 1)
  212. for i := 0; i < waiters; i++ {
  213. go func() {
  214. session, err := concurrency.NewSession(chooseClient())
  215. if err != nil {
  216. t.Error(err)
  217. }
  218. rwm := recipe.NewRWMutex(session, "test-rwmutex")
  219. if rand.Intn(2) == 0 {
  220. if err := rwm.RLock(); err != nil {
  221. t.Fatalf("could not rlock (%v)", err)
  222. }
  223. rlockedC <- rwm
  224. } else {
  225. if err := rwm.Lock(); err != nil {
  226. t.Fatalf("could not lock (%v)", err)
  227. }
  228. wlockedC <- rwm
  229. }
  230. }()
  231. }
  232. // unlock locked rwmutexes
  233. timerC := time.After(time.Duration(waiters) * time.Second)
  234. for i := 0; i < waiters; i++ {
  235. select {
  236. case <-timerC:
  237. t.Fatalf("timed out waiting for lock %d", i)
  238. case wl := <-wlockedC:
  239. select {
  240. case <-rlockedC:
  241. t.Fatalf("rlock %d readers did not wait", i)
  242. default:
  243. }
  244. if err := wl.Unlock(); err != nil {
  245. t.Fatalf("could not release lock (%v)", err)
  246. }
  247. case rl := <-rlockedC:
  248. select {
  249. case <-wlockedC:
  250. t.Fatalf("rlock %d writers did not wait", i)
  251. default:
  252. }
  253. if err := rl.RUnlock(); err != nil {
  254. t.Fatalf("could not release rlock (%v)", err)
  255. }
  256. }
  257. }
  258. }
  259. func makeClients(t *testing.T, clients *[]*clientv3.Client, choose func() *member) func() *clientv3.Client {
  260. var mu sync.Mutex
  261. *clients = nil
  262. return func() *clientv3.Client {
  263. cli, err := NewClientV3(choose())
  264. if err != nil {
  265. t.Fatalf("cannot create client: %v", err)
  266. }
  267. mu.Lock()
  268. *clients = append(*clients, cli)
  269. mu.Unlock()
  270. return cli
  271. }
  272. }
  273. func makeSingleNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
  274. return makeClients(t, clients, func() *member {
  275. return clus.Members[0]
  276. })
  277. }
  278. func makeMultiNodeClients(t *testing.T, clus *cluster, clients *[]*clientv3.Client) func() *clientv3.Client {
  279. return makeClients(t, clients, func() *member {
  280. return clus.Members[rand.Intn(len(clus.Members))]
  281. })
  282. }
  283. func closeClients(t *testing.T, clients []*clientv3.Client) {
  284. for _, cli := range clients {
  285. if err := cli.Close(); err != nil {
  286. t.Fatal(err)
  287. }
  288. }
  289. }