| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- package pool_test
- import (
- "context"
- "sync"
- "testing"
- "time"
- "github.com/go-redis/redis/v7/internal/pool"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- )
- var _ = Describe("ConnPool", func() {
- c := context.Background()
- var connPool *pool.ConnPool
- BeforeEach(func() {
- connPool = pool.NewConnPool(&pool.Options{
- Dialer: dummyDialer,
- PoolSize: 10,
- PoolTimeout: time.Hour,
- IdleTimeout: time.Millisecond,
- IdleCheckFrequency: time.Millisecond,
- })
- })
- AfterEach(func() {
- connPool.Close()
- })
- It("should unblock client when conn is removed", func() {
- // Reserve one connection.
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- // Reserve all other connections.
- var cns []*pool.Conn
- for i := 0; i < 9; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- cns = append(cns, cn)
- }
- started := make(chan bool, 1)
- done := make(chan bool, 1)
- go func() {
- defer GinkgoRecover()
- started <- true
- _, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- done <- true
- connPool.Put(cn)
- }()
- <-started
- // Check that Get is blocked.
- select {
- case <-done:
- Fail("Get is not blocked")
- case <-time.After(time.Millisecond):
- // ok
- }
- connPool.Remove(cn, nil)
- // Check that Get is unblocked.
- select {
- case <-done:
- // ok
- case <-time.After(time.Second):
- Fail("Get is not unblocked")
- }
- for _, cn := range cns {
- connPool.Put(cn)
- }
- })
- })
- var _ = Describe("MinIdleConns", func() {
- c := context.Background()
- const poolSize = 100
- var minIdleConns int
- var connPool *pool.ConnPool
- newConnPool := func() *pool.ConnPool {
- connPool := pool.NewConnPool(&pool.Options{
- Dialer: dummyDialer,
- PoolSize: poolSize,
- MinIdleConns: minIdleConns,
- PoolTimeout: 100 * time.Millisecond,
- IdleTimeout: -1,
- IdleCheckFrequency: -1,
- })
- Eventually(func() int {
- return connPool.Len()
- }).Should(Equal(minIdleConns))
- return connPool
- }
- assert := func() {
- It("has idle connections when created", func() {
- Expect(connPool.Len()).To(Equal(minIdleConns))
- Expect(connPool.IdleLen()).To(Equal(minIdleConns))
- })
- Context("after Get", func() {
- var cn *pool.Conn
- BeforeEach(func() {
- var err error
- cn, err = connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- Eventually(func() int {
- return connPool.Len()
- }).Should(Equal(minIdleConns + 1))
- })
- It("has idle connections", func() {
- Expect(connPool.Len()).To(Equal(minIdleConns + 1))
- Expect(connPool.IdleLen()).To(Equal(minIdleConns))
- })
- Context("after Remove", func() {
- BeforeEach(func() {
- connPool.Remove(cn, nil)
- })
- It("has idle connections", func() {
- Expect(connPool.Len()).To(Equal(minIdleConns))
- Expect(connPool.IdleLen()).To(Equal(minIdleConns))
- })
- })
- })
- Describe("Get does not exceed pool size", func() {
- var mu sync.RWMutex
- var cns []*pool.Conn
- BeforeEach(func() {
- cns = make([]*pool.Conn, 0)
- perform(poolSize, func(_ int) {
- defer GinkgoRecover()
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- mu.Lock()
- cns = append(cns, cn)
- mu.Unlock()
- })
- Eventually(func() int {
- return connPool.Len()
- }).Should(BeNumerically(">=", poolSize))
- })
- It("Get is blocked", func() {
- done := make(chan struct{})
- go func() {
- connPool.Get(c)
- close(done)
- }()
- select {
- case <-done:
- Fail("Get is not blocked")
- case <-time.After(time.Millisecond):
- // ok
- }
- select {
- case <-done:
- // ok
- case <-time.After(time.Second):
- Fail("Get is not unblocked")
- }
- })
- Context("after Put", func() {
- BeforeEach(func() {
- perform(len(cns), func(i int) {
- mu.RLock()
- connPool.Put(cns[i])
- mu.RUnlock()
- })
- Eventually(func() int {
- return connPool.Len()
- }).Should(Equal(poolSize))
- })
- It("pool.Len is back to normal", func() {
- Expect(connPool.Len()).To(Equal(poolSize))
- Expect(connPool.IdleLen()).To(Equal(poolSize))
- })
- })
- Context("after Remove", func() {
- BeforeEach(func() {
- perform(len(cns), func(i int) {
- mu.RLock()
- connPool.Remove(cns[i], nil)
- mu.RUnlock()
- })
- Eventually(func() int {
- return connPool.Len()
- }).Should(Equal(minIdleConns))
- })
- It("has idle connections", func() {
- Expect(connPool.Len()).To(Equal(minIdleConns))
- Expect(connPool.IdleLen()).To(Equal(minIdleConns))
- })
- })
- })
- }
- Context("minIdleConns = 1", func() {
- BeforeEach(func() {
- minIdleConns = 1
- connPool = newConnPool()
- })
- AfterEach(func() {
- connPool.Close()
- })
- assert()
- })
- Context("minIdleConns = 32", func() {
- BeforeEach(func() {
- minIdleConns = 32
- connPool = newConnPool()
- })
- AfterEach(func() {
- connPool.Close()
- })
- assert()
- })
- })
- var _ = Describe("conns reaper", func() {
- c := context.Background()
- const idleTimeout = time.Minute
- const maxAge = time.Hour
- var connPool *pool.ConnPool
- var conns, staleConns, closedConns []*pool.Conn
- assert := func(typ string) {
- BeforeEach(func() {
- closedConns = nil
- connPool = pool.NewConnPool(&pool.Options{
- Dialer: dummyDialer,
- PoolSize: 10,
- IdleTimeout: idleTimeout,
- MaxConnAge: maxAge,
- PoolTimeout: time.Second,
- IdleCheckFrequency: time.Hour,
- OnClose: func(cn *pool.Conn) error {
- closedConns = append(closedConns, cn)
- return nil
- },
- })
- conns = nil
- // add stale connections
- staleConns = nil
- for i := 0; i < 3; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- switch typ {
- case "idle":
- cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
- case "aged":
- cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
- }
- conns = append(conns, cn)
- staleConns = append(staleConns, cn)
- }
- // add fresh connections
- for i := 0; i < 3; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- conns = append(conns, cn)
- }
- for _, cn := range conns {
- connPool.Put(cn)
- }
- Expect(connPool.Len()).To(Equal(6))
- Expect(connPool.IdleLen()).To(Equal(6))
- n, err := connPool.ReapStaleConns()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(3))
- })
- AfterEach(func() {
- _ = connPool.Close()
- Expect(connPool.Len()).To(Equal(0))
- Expect(connPool.IdleLen()).To(Equal(0))
- Expect(len(closedConns)).To(Equal(len(conns)))
- Expect(closedConns).To(ConsistOf(conns))
- })
- It("reaps stale connections", func() {
- Expect(connPool.Len()).To(Equal(3))
- Expect(connPool.IdleLen()).To(Equal(3))
- })
- It("does not reap fresh connections", func() {
- n, err := connPool.ReapStaleConns()
- Expect(err).NotTo(HaveOccurred())
- Expect(n).To(Equal(0))
- })
- It("stale connections are closed", func() {
- Expect(len(closedConns)).To(Equal(len(staleConns)))
- Expect(closedConns).To(ConsistOf(staleConns))
- })
- It("pool is functional", func() {
- for j := 0; j < 3; j++ {
- var freeCns []*pool.Conn
- for i := 0; i < 3; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- Expect(cn).NotTo(BeNil())
- freeCns = append(freeCns, cn)
- }
- Expect(connPool.Len()).To(Equal(3))
- Expect(connPool.IdleLen()).To(Equal(0))
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- Expect(cn).NotTo(BeNil())
- conns = append(conns, cn)
- Expect(connPool.Len()).To(Equal(4))
- Expect(connPool.IdleLen()).To(Equal(0))
- connPool.Remove(cn, nil)
- Expect(connPool.Len()).To(Equal(3))
- Expect(connPool.IdleLen()).To(Equal(0))
- for _, cn := range freeCns {
- connPool.Put(cn)
- }
- Expect(connPool.Len()).To(Equal(3))
- Expect(connPool.IdleLen()).To(Equal(3))
- }
- })
- }
- assert("idle")
- assert("aged")
- })
- var _ = Describe("race", func() {
- c := context.Background()
- var connPool *pool.ConnPool
- var C, N int
- BeforeEach(func() {
- C, N = 10, 1000
- if testing.Short() {
- C = 4
- N = 100
- }
- })
- AfterEach(func() {
- connPool.Close()
- })
- It("does not happen on Get, Put, and Remove", func() {
- connPool = pool.NewConnPool(&pool.Options{
- Dialer: dummyDialer,
- PoolSize: 10,
- PoolTimeout: time.Minute,
- IdleTimeout: time.Millisecond,
- IdleCheckFrequency: time.Millisecond,
- })
- perform(C, func(id int) {
- for i := 0; i < N; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- if err == nil {
- connPool.Put(cn)
- }
- }
- }, func(id int) {
- for i := 0; i < N; i++ {
- cn, err := connPool.Get(c)
- Expect(err).NotTo(HaveOccurred())
- if err == nil {
- connPool.Remove(cn, nil)
- }
- }
- })
- })
- })
|