package gocql import ( "fmt" "log" "strings" "sync" "time" ) //ConnectionPool is the interface gocql expects to be exposed for a connection pool. type ConnectionPool interface { Pick(*Query) *Conn Size() int HandleError(*Conn, error, bool) Close() } //NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type. type NewPoolFunc func(*ClusterConfig) ConnectionPool //SimplePool is the current implementation of the connection pool inside gocql. This //pool is meant to be a simple default used by gocql so users can get up and running //quickly. type SimplePool struct { cfg *ClusterConfig hostPool *RoundRobin connPool map[string]*RoundRobin conns map[*Conn]struct{} keyspace string mu sync.Mutex cFillingPool chan int quit bool quitWait chan bool quitOnce sync.Once } //NewSimplePool is the function used by gocql to create the simple connection pool. //This is the default if no other pool type is specified. func NewSimplePool(cfg *ClusterConfig) ConnectionPool { pool := SimplePool{ cfg: cfg, hostPool: NewRoundRobin(), connPool: make(map[string]*RoundRobin), conns: make(map[*Conn]struct{}), quitWait: make(chan bool), cFillingPool: make(chan int, 1), keyspace: cfg.Keyspace, } //Walk through connecting to hosts. As soon as one host connects //defer the remaining connections to cluster.fillPool() for i := 0; i < len(pool.cfg.Hosts); i++ { addr := strings.TrimSpace(pool.cfg.Hosts[i]) if strings.Index(addr, ":") < 0 { addr = fmt.Sprintf("%s:%d", addr, pool.cfg.DefaultPort) } if pool.connect(addr) == nil { pool.cFillingPool <- 1 go pool.fillPool() break } } return &pool } func (c *SimplePool) connect(addr string) error { cfg := ConnConfig{ ProtoVersion: c.cfg.ProtoVersion, CQLVersion: c.cfg.CQLVersion, Timeout: c.cfg.Timeout, NumStreams: c.cfg.NumStreams, Compressor: c.cfg.Compressor, Authenticator: c.cfg.Authenticator, Keepalive: c.cfg.SocketKeepalive, } for { conn, err := Connect(addr, cfg, c) if err != nil { log.Printf("failed to connect to %q: %v", addr, err) return err } return c.addConn(conn) } } func (c *SimplePool) addConn(conn *Conn) error { c.mu.Lock() defer c.mu.Unlock() if c.quit { conn.Close() return nil } //Set the connection's keyspace if any before adding it to the pool if c.keyspace != "" { if err := conn.UseKeyspace(c.keyspace); err != nil { log.Printf("error setting connection keyspace. %v", err) conn.Close() return err } } connPool := c.connPool[conn.Address()] if connPool == nil { connPool = NewRoundRobin() c.connPool[conn.Address()] = connPool c.hostPool.AddNode(connPool) } connPool.AddNode(conn) c.conns[conn] = struct{}{} return nil } //fillPool manages the pool of connections making sure that each host has the correct //amount of connections defined. Also the method will test a host with one connection //instead of flooding the host with number of connections defined in the cluster config func (c *SimplePool) fillPool() { //Debounce large amounts of requests to fill pool select { case <-time.After(1 * time.Millisecond): return case <-c.cFillingPool: defer func() { c.cFillingPool <- 1 }() } c.mu.Lock() isClosed := c.quit c.mu.Unlock() //Exit if cluster(session) is closed if isClosed { return } //Walk through list of defined hosts for i := 0; i < len(c.cfg.Hosts); i++ { addr := strings.TrimSpace(c.cfg.Hosts[i]) if strings.Index(addr, ":") < 0 { addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort) } var numConns int = 1 //See if the host already has connections in the pool c.mu.Lock() conns, ok := c.connPool[addr] c.mu.Unlock() if ok { //if the host has enough connections just exit numConns = conns.Size() if numConns >= c.cfg.NumConns { continue } } else { //See if the host is reachable if err := c.connect(addr); err != nil { continue } } //This is reached if the host is responsive and needs more connections //Create connections for host synchronously to mitigate flooding the host. go func(a string, conns int) { for ; conns < c.cfg.NumConns; conns++ { c.connect(addr) } }(addr, numConns) } } // Should only be called if c.mu is locked func (c *SimplePool) removeConnLocked(conn *Conn) { conn.Close() connPool := c.connPool[conn.addr] if connPool == nil { return } connPool.RemoveNode(conn) if connPool.Size() == 0 { c.hostPool.RemoveNode(connPool) delete(c.connPool, conn.addr) } delete(c.conns, conn) } func (c *SimplePool) removeConn(conn *Conn) { c.mu.Lock() defer c.mu.Unlock() c.removeConnLocked(conn) } //HandleError is called by a Connection object to report to the pool an error has occured. //Logic is then executed within the pool to clean up the erroroneous connection and try to //top off the pool. func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) { if !closed { // ignore all non-fatal errors return } c.removeConn(conn) if !c.quit { go c.fillPool() // top off pool. } } //Pick selects a connection to be used by the query. func (c *SimplePool) Pick(qry *Query) *Conn { //Check if connections are available c.mu.Lock() conns := len(c.conns) c.mu.Unlock() if conns == 0 { //try to populate the pool before returning. c.fillPool() } return c.hostPool.Pick(qry) } //Size returns the number of connections currently active in the pool func (p *SimplePool) Size() int { p.mu.Lock() conns := len(p.conns) p.mu.Unlock() return conns } //Close kills the pool and all associated connections. func (c *SimplePool) Close() { c.quitOnce.Do(func() { c.mu.Lock() defer c.mu.Unlock() c.quit = true close(c.quitWait) for conn := range c.conns { c.removeConnLocked(conn) } }) }