pool_test.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package pool_test
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/go-redis/redis/v7/internal/pool"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. var _ = Describe("ConnPool", func() {
  12. c := context.Background()
  13. var connPool *pool.ConnPool
  14. BeforeEach(func() {
  15. connPool = pool.NewConnPool(&pool.Options{
  16. Dialer: dummyDialer,
  17. PoolSize: 10,
  18. PoolTimeout: time.Hour,
  19. IdleTimeout: time.Millisecond,
  20. IdleCheckFrequency: time.Millisecond,
  21. })
  22. })
  23. AfterEach(func() {
  24. connPool.Close()
  25. })
  26. It("should unblock client when conn is removed", func() {
  27. // Reserve one connection.
  28. cn, err := connPool.Get(c)
  29. Expect(err).NotTo(HaveOccurred())
  30. // Reserve all other connections.
  31. var cns []*pool.Conn
  32. for i := 0; i < 9; i++ {
  33. cn, err := connPool.Get(c)
  34. Expect(err).NotTo(HaveOccurred())
  35. cns = append(cns, cn)
  36. }
  37. started := make(chan bool, 1)
  38. done := make(chan bool, 1)
  39. go func() {
  40. defer GinkgoRecover()
  41. started <- true
  42. _, err := connPool.Get(c)
  43. Expect(err).NotTo(HaveOccurred())
  44. done <- true
  45. connPool.Put(cn)
  46. }()
  47. <-started
  48. // Check that Get is blocked.
  49. select {
  50. case <-done:
  51. Fail("Get is not blocked")
  52. case <-time.After(time.Millisecond):
  53. // ok
  54. }
  55. connPool.Remove(cn, nil)
  56. // Check that Get is unblocked.
  57. select {
  58. case <-done:
  59. // ok
  60. case <-time.After(time.Second):
  61. Fail("Get is not unblocked")
  62. }
  63. for _, cn := range cns {
  64. connPool.Put(cn)
  65. }
  66. })
  67. })
  68. var _ = Describe("MinIdleConns", func() {
  69. c := context.Background()
  70. const poolSize = 100
  71. var minIdleConns int
  72. var connPool *pool.ConnPool
  73. newConnPool := func() *pool.ConnPool {
  74. connPool := pool.NewConnPool(&pool.Options{
  75. Dialer: dummyDialer,
  76. PoolSize: poolSize,
  77. MinIdleConns: minIdleConns,
  78. PoolTimeout: 100 * time.Millisecond,
  79. IdleTimeout: -1,
  80. IdleCheckFrequency: -1,
  81. })
  82. Eventually(func() int {
  83. return connPool.Len()
  84. }).Should(Equal(minIdleConns))
  85. return connPool
  86. }
  87. assert := func() {
  88. It("has idle connections when created", func() {
  89. Expect(connPool.Len()).To(Equal(minIdleConns))
  90. Expect(connPool.IdleLen()).To(Equal(minIdleConns))
  91. })
  92. Context("after Get", func() {
  93. var cn *pool.Conn
  94. BeforeEach(func() {
  95. var err error
  96. cn, err = connPool.Get(c)
  97. Expect(err).NotTo(HaveOccurred())
  98. Eventually(func() int {
  99. return connPool.Len()
  100. }).Should(Equal(minIdleConns + 1))
  101. })
  102. It("has idle connections", func() {
  103. Expect(connPool.Len()).To(Equal(minIdleConns + 1))
  104. Expect(connPool.IdleLen()).To(Equal(minIdleConns))
  105. })
  106. Context("after Remove", func() {
  107. BeforeEach(func() {
  108. connPool.Remove(cn, nil)
  109. })
  110. It("has idle connections", func() {
  111. Expect(connPool.Len()).To(Equal(minIdleConns))
  112. Expect(connPool.IdleLen()).To(Equal(minIdleConns))
  113. })
  114. })
  115. })
  116. Describe("Get does not exceed pool size", func() {
  117. var mu sync.RWMutex
  118. var cns []*pool.Conn
  119. BeforeEach(func() {
  120. cns = make([]*pool.Conn, 0)
  121. perform(poolSize, func(_ int) {
  122. defer GinkgoRecover()
  123. cn, err := connPool.Get(c)
  124. Expect(err).NotTo(HaveOccurred())
  125. mu.Lock()
  126. cns = append(cns, cn)
  127. mu.Unlock()
  128. })
  129. Eventually(func() int {
  130. return connPool.Len()
  131. }).Should(BeNumerically(">=", poolSize))
  132. })
  133. It("Get is blocked", func() {
  134. done := make(chan struct{})
  135. go func() {
  136. connPool.Get(c)
  137. close(done)
  138. }()
  139. select {
  140. case <-done:
  141. Fail("Get is not blocked")
  142. case <-time.After(time.Millisecond):
  143. // ok
  144. }
  145. select {
  146. case <-done:
  147. // ok
  148. case <-time.After(time.Second):
  149. Fail("Get is not unblocked")
  150. }
  151. })
  152. Context("after Put", func() {
  153. BeforeEach(func() {
  154. perform(len(cns), func(i int) {
  155. mu.RLock()
  156. connPool.Put(cns[i])
  157. mu.RUnlock()
  158. })
  159. Eventually(func() int {
  160. return connPool.Len()
  161. }).Should(Equal(poolSize))
  162. })
  163. It("pool.Len is back to normal", func() {
  164. Expect(connPool.Len()).To(Equal(poolSize))
  165. Expect(connPool.IdleLen()).To(Equal(poolSize))
  166. })
  167. })
  168. Context("after Remove", func() {
  169. BeforeEach(func() {
  170. perform(len(cns), func(i int) {
  171. mu.RLock()
  172. connPool.Remove(cns[i], nil)
  173. mu.RUnlock()
  174. })
  175. Eventually(func() int {
  176. return connPool.Len()
  177. }).Should(Equal(minIdleConns))
  178. })
  179. It("has idle connections", func() {
  180. Expect(connPool.Len()).To(Equal(minIdleConns))
  181. Expect(connPool.IdleLen()).To(Equal(minIdleConns))
  182. })
  183. })
  184. })
  185. }
  186. Context("minIdleConns = 1", func() {
  187. BeforeEach(func() {
  188. minIdleConns = 1
  189. connPool = newConnPool()
  190. })
  191. AfterEach(func() {
  192. connPool.Close()
  193. })
  194. assert()
  195. })
  196. Context("minIdleConns = 32", func() {
  197. BeforeEach(func() {
  198. minIdleConns = 32
  199. connPool = newConnPool()
  200. })
  201. AfterEach(func() {
  202. connPool.Close()
  203. })
  204. assert()
  205. })
  206. })
  207. var _ = Describe("conns reaper", func() {
  208. c := context.Background()
  209. const idleTimeout = time.Minute
  210. const maxAge = time.Hour
  211. var connPool *pool.ConnPool
  212. var conns, staleConns, closedConns []*pool.Conn
  213. assert := func(typ string) {
  214. BeforeEach(func() {
  215. closedConns = nil
  216. connPool = pool.NewConnPool(&pool.Options{
  217. Dialer: dummyDialer,
  218. PoolSize: 10,
  219. IdleTimeout: idleTimeout,
  220. MaxConnAge: maxAge,
  221. PoolTimeout: time.Second,
  222. IdleCheckFrequency: time.Hour,
  223. OnClose: func(cn *pool.Conn) error {
  224. closedConns = append(closedConns, cn)
  225. return nil
  226. },
  227. })
  228. conns = nil
  229. // add stale connections
  230. staleConns = nil
  231. for i := 0; i < 3; i++ {
  232. cn, err := connPool.Get(c)
  233. Expect(err).NotTo(HaveOccurred())
  234. switch typ {
  235. case "idle":
  236. cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
  237. case "aged":
  238. cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
  239. }
  240. conns = append(conns, cn)
  241. staleConns = append(staleConns, cn)
  242. }
  243. // add fresh connections
  244. for i := 0; i < 3; i++ {
  245. cn, err := connPool.Get(c)
  246. Expect(err).NotTo(HaveOccurred())
  247. conns = append(conns, cn)
  248. }
  249. for _, cn := range conns {
  250. connPool.Put(cn)
  251. }
  252. Expect(connPool.Len()).To(Equal(6))
  253. Expect(connPool.IdleLen()).To(Equal(6))
  254. n, err := connPool.ReapStaleConns()
  255. Expect(err).NotTo(HaveOccurred())
  256. Expect(n).To(Equal(3))
  257. })
  258. AfterEach(func() {
  259. _ = connPool.Close()
  260. Expect(connPool.Len()).To(Equal(0))
  261. Expect(connPool.IdleLen()).To(Equal(0))
  262. Expect(len(closedConns)).To(Equal(len(conns)))
  263. Expect(closedConns).To(ConsistOf(conns))
  264. })
  265. It("reaps stale connections", func() {
  266. Expect(connPool.Len()).To(Equal(3))
  267. Expect(connPool.IdleLen()).To(Equal(3))
  268. })
  269. It("does not reap fresh connections", func() {
  270. n, err := connPool.ReapStaleConns()
  271. Expect(err).NotTo(HaveOccurred())
  272. Expect(n).To(Equal(0))
  273. })
  274. It("stale connections are closed", func() {
  275. Expect(len(closedConns)).To(Equal(len(staleConns)))
  276. Expect(closedConns).To(ConsistOf(staleConns))
  277. })
  278. It("pool is functional", func() {
  279. for j := 0; j < 3; j++ {
  280. var freeCns []*pool.Conn
  281. for i := 0; i < 3; i++ {
  282. cn, err := connPool.Get(c)
  283. Expect(err).NotTo(HaveOccurred())
  284. Expect(cn).NotTo(BeNil())
  285. freeCns = append(freeCns, cn)
  286. }
  287. Expect(connPool.Len()).To(Equal(3))
  288. Expect(connPool.IdleLen()).To(Equal(0))
  289. cn, err := connPool.Get(c)
  290. Expect(err).NotTo(HaveOccurred())
  291. Expect(cn).NotTo(BeNil())
  292. conns = append(conns, cn)
  293. Expect(connPool.Len()).To(Equal(4))
  294. Expect(connPool.IdleLen()).To(Equal(0))
  295. connPool.Remove(cn, nil)
  296. Expect(connPool.Len()).To(Equal(3))
  297. Expect(connPool.IdleLen()).To(Equal(0))
  298. for _, cn := range freeCns {
  299. connPool.Put(cn)
  300. }
  301. Expect(connPool.Len()).To(Equal(3))
  302. Expect(connPool.IdleLen()).To(Equal(3))
  303. }
  304. })
  305. }
  306. assert("idle")
  307. assert("aged")
  308. })
  309. var _ = Describe("race", func() {
  310. c := context.Background()
  311. var connPool *pool.ConnPool
  312. var C, N int
  313. BeforeEach(func() {
  314. C, N = 10, 1000
  315. if testing.Short() {
  316. C = 4
  317. N = 100
  318. }
  319. })
  320. AfterEach(func() {
  321. connPool.Close()
  322. })
  323. It("does not happen on Get, Put, and Remove", func() {
  324. connPool = pool.NewConnPool(&pool.Options{
  325. Dialer: dummyDialer,
  326. PoolSize: 10,
  327. PoolTimeout: time.Minute,
  328. IdleTimeout: time.Millisecond,
  329. IdleCheckFrequency: time.Millisecond,
  330. })
  331. perform(C, func(id int) {
  332. for i := 0; i < N; i++ {
  333. cn, err := connPool.Get(c)
  334. Expect(err).NotTo(HaveOccurred())
  335. if err == nil {
  336. connPool.Put(cn)
  337. }
  338. }
  339. }, func(id int) {
  340. for i := 0; i < N; i++ {
  341. cn, err := connPool.Get(c)
  342. Expect(err).NotTo(HaveOccurred())
  343. if err == nil {
  344. connPool.Remove(cn, nil)
  345. }
  346. }
  347. })
  348. })
  349. })