|
|
@@ -0,0 +1,315 @@
|
|
|
+package gocql
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+/*ConnectionPool represents the interface gocql will use to work with a collection of connections.
|
|
|
+
|
|
|
+Purpose
|
|
|
+
|
|
|
+The connection pool in gocql opens and closes connections as well as selects an available connection
|
|
|
+for gocql to execute a query against. The pool is also respnsible for handling connection errors that
|
|
|
+are caught by the connection experiencing the error.
|
|
|
+
|
|
|
+A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
|
|
|
+upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
|
|
|
+lead to issues with variables being modified outside the expectations of the ConnectionPool type.
|
|
|
+
|
|
|
+Example of Single Connection Pool:
|
|
|
+
|
|
|
+ type SingleConnection struct {
|
|
|
+ conn *Conn
|
|
|
+ cfg *ClusterConfig
|
|
|
+ }
|
|
|
+
|
|
|
+ func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
|
|
|
+ addr := strings.TrimSpace(cfg.Hosts[0])
|
|
|
+ if strings.Index(addr, ":") < 0 {
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
|
|
|
+ }
|
|
|
+ connCfg := ConnConfig{
|
|
|
+ ProtoVersion: cfg.ProtoVersion,
|
|
|
+ CQLVersion: cfg.CQLVersion,
|
|
|
+ Timeout: cfg.Timeout,
|
|
|
+ NumStreams: cfg.NumStreams,
|
|
|
+ Compressor: cfg.Compressor,
|
|
|
+ Authenticator: cfg.Authenticator,
|
|
|
+ Keepalive: cfg.SocketKeepalive,
|
|
|
+ }
|
|
|
+ pool := SingleConnection{cfg:cfg}
|
|
|
+ pool.conn = Connect(addr,connCfg,pool)
|
|
|
+ return &pool
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
|
|
|
+ if closed {
|
|
|
+ connCfg := ConnConfig{
|
|
|
+ ProtoVersion: cfg.ProtoVersion,
|
|
|
+ CQLVersion: cfg.CQLVersion,
|
|
|
+ Timeout: cfg.Timeout,
|
|
|
+ NumStreams: cfg.NumStreams,
|
|
|
+ Compressor: cfg.Compressor,
|
|
|
+ Authenticator: cfg.Authenticator,
|
|
|
+ Keepalive: cfg.SocketKeepalive,
|
|
|
+ }
|
|
|
+ s.conn = Connect(conn.Address(),connCfg,s)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Pick(qry *Query) *Conn {
|
|
|
+ if s.conn.isClosed {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return s.conn
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Size() int {
|
|
|
+ return 1
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Close() {
|
|
|
+ s.conn.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+This is a very simple example of a type that exposes the connection pool interface. To assign
|
|
|
+this type as the connection pool to use you would assign it to the ClusterConfig like so:
|
|
|
+
|
|
|
+ cluster := NewCluster("127.0.0.1")
|
|
|
+ cluster.ConnPoolType = NewSingleConnection
|
|
|
+ ...
|
|
|
+ session, err := cluster.CreateSession()
|
|
|
+
|
|
|
+To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
|
|
|
+*/
|
|
|
+type ConnectionPool interface {
|
|
|
+ Pick(*Query) *Conn
|
|
|
+ Size() int
|
|
|
+ HandleError(*Conn, error, bool)
|
|
|
+ Close()
|
|
|
+}
|
|
|
+
|
|
|
+//NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
|
|
|
+type NewPoolFunc func(*ClusterConfig) ConnectionPool
|
|
|
+
|
|
|
+//SimplePool is the current implementation of the connection pool inside gocql. This
|
|
|
+//pool is meant to be a simple default used by gocql so users can get up and running
|
|
|
+//quickly.
|
|
|
+type SimplePool struct {
|
|
|
+ cfg *ClusterConfig
|
|
|
+ hostPool *RoundRobin
|
|
|
+ connPool map[string]*RoundRobin
|
|
|
+ conns map[*Conn]struct{}
|
|
|
+ keyspace string
|
|
|
+ mu sync.Mutex
|
|
|
+
|
|
|
+ cFillingPool chan int
|
|
|
+
|
|
|
+ quit bool
|
|
|
+ quitWait chan bool
|
|
|
+ quitOnce sync.Once
|
|
|
+}
|
|
|
+
|
|
|
+//NewSimplePool is the function used by gocql to create the simple connection pool.
|
|
|
+//This is the default if no other pool type is specified.
|
|
|
+func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
|
|
|
+ pool := SimplePool{
|
|
|
+ cfg: cfg,
|
|
|
+ hostPool: NewRoundRobin(),
|
|
|
+ connPool: make(map[string]*RoundRobin),
|
|
|
+ conns: make(map[*Conn]struct{}),
|
|
|
+ quitWait: make(chan bool),
|
|
|
+ cFillingPool: make(chan int, 1),
|
|
|
+ keyspace: cfg.Keyspace,
|
|
|
+ }
|
|
|
+ //Walk through connecting to hosts. As soon as one host connects
|
|
|
+ //defer the remaining connections to cluster.fillPool()
|
|
|
+ for i := 0; i < len(cfg.Hosts); i++ {
|
|
|
+ addr := strings.TrimSpace(cfg.Hosts[i])
|
|
|
+ if strings.Index(addr, ":") < 0 {
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
|
|
|
+ }
|
|
|
+ if pool.connect(addr) == nil {
|
|
|
+ pool.cFillingPool <- 1
|
|
|
+ go pool.fillPool()
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ return &pool
|
|
|
+}
|
|
|
+
|
|
|
+func (c *SimplePool) connect(addr string) error {
|
|
|
+ cfg := ConnConfig{
|
|
|
+ ProtoVersion: c.cfg.ProtoVersion,
|
|
|
+ CQLVersion: c.cfg.CQLVersion,
|
|
|
+ Timeout: c.cfg.Timeout,
|
|
|
+ NumStreams: c.cfg.NumStreams,
|
|
|
+ Compressor: c.cfg.Compressor,
|
|
|
+ Authenticator: c.cfg.Authenticator,
|
|
|
+ Keepalive: c.cfg.SocketKeepalive,
|
|
|
+ }
|
|
|
+
|
|
|
+ for {
|
|
|
+ conn, err := Connect(addr, cfg, c)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("failed to connect to %q: %v", addr, err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return c.addConn(conn)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *SimplePool) addConn(conn *Conn) error {
|
|
|
+ c.mu.Lock()
|
|
|
+ defer c.mu.Unlock()
|
|
|
+ if c.quit {
|
|
|
+ conn.Close()
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ //Set the connection's keyspace if any before adding it to the pool
|
|
|
+ if c.keyspace != "" {
|
|
|
+ if err := conn.UseKeyspace(c.keyspace); err != nil {
|
|
|
+ log.Printf("error setting connection keyspace. %v", err)
|
|
|
+ conn.Close()
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ connPool := c.connPool[conn.Address()]
|
|
|
+ if connPool == nil {
|
|
|
+ connPool = NewRoundRobin()
|
|
|
+ c.connPool[conn.Address()] = connPool
|
|
|
+ c.hostPool.AddNode(connPool)
|
|
|
+ }
|
|
|
+ connPool.AddNode(conn)
|
|
|
+ c.conns[conn] = struct{}{}
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+//fillPool manages the pool of connections making sure that each host has the correct
|
|
|
+//amount of connections defined. Also the method will test a host with one connection
|
|
|
+//instead of flooding the host with number of connections defined in the cluster config
|
|
|
+func (c *SimplePool) fillPool() {
|
|
|
+ //Debounce large amounts of requests to fill pool
|
|
|
+ select {
|
|
|
+ case <-time.After(1 * time.Millisecond):
|
|
|
+ return
|
|
|
+ case <-c.cFillingPool:
|
|
|
+ defer func() { c.cFillingPool <- 1 }()
|
|
|
+ }
|
|
|
+
|
|
|
+ c.mu.Lock()
|
|
|
+ isClosed := c.quit
|
|
|
+ c.mu.Unlock()
|
|
|
+ //Exit if cluster(session) is closed
|
|
|
+ if isClosed {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //Walk through list of defined hosts
|
|
|
+ for i := 0; i < len(c.cfg.Hosts); i++ {
|
|
|
+ addr := strings.TrimSpace(c.cfg.Hosts[i])
|
|
|
+ if strings.Index(addr, ":") < 0 {
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
|
|
|
+ }
|
|
|
+ var numConns int = 1
|
|
|
+ //See if the host already has connections in the pool
|
|
|
+ c.mu.Lock()
|
|
|
+ conns, ok := c.connPool[addr]
|
|
|
+ c.mu.Unlock()
|
|
|
+
|
|
|
+ if ok {
|
|
|
+ //if the host has enough connections just exit
|
|
|
+ numConns = conns.Size()
|
|
|
+ if numConns >= c.cfg.NumConns {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //See if the host is reachable
|
|
|
+ if err := c.connect(addr); err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //This is reached if the host is responsive and needs more connections
|
|
|
+ //Create connections for host synchronously to mitigate flooding the host.
|
|
|
+ go func(a string, conns int) {
|
|
|
+ for ; conns < c.cfg.NumConns; conns++ {
|
|
|
+ c.connect(addr)
|
|
|
+ }
|
|
|
+ }(addr, numConns)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Should only be called if c.mu is locked
|
|
|
+func (c *SimplePool) removeConnLocked(conn *Conn) {
|
|
|
+ conn.Close()
|
|
|
+ connPool := c.connPool[conn.addr]
|
|
|
+ if connPool == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ connPool.RemoveNode(conn)
|
|
|
+ if connPool.Size() == 0 {
|
|
|
+ c.hostPool.RemoveNode(connPool)
|
|
|
+ delete(c.connPool, conn.addr)
|
|
|
+ }
|
|
|
+ delete(c.conns, conn)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *SimplePool) removeConn(conn *Conn) {
|
|
|
+ c.mu.Lock()
|
|
|
+ defer c.mu.Unlock()
|
|
|
+ c.removeConnLocked(conn)
|
|
|
+}
|
|
|
+
|
|
|
+//HandleError is called by a Connection object to report to the pool an error has occured.
|
|
|
+//Logic is then executed within the pool to clean up the erroroneous connection and try to
|
|
|
+//top off the pool.
|
|
|
+func (c *SimplePool) HandleError(conn *Conn, err error, closed bool) {
|
|
|
+ if !closed {
|
|
|
+ // ignore all non-fatal errors
|
|
|
+ return
|
|
|
+ }
|
|
|
+ c.removeConn(conn)
|
|
|
+ if !c.quit {
|
|
|
+ go c.fillPool() // top off pool.
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//Pick selects a connection to be used by the query.
|
|
|
+func (c *SimplePool) Pick(qry *Query) *Conn {
|
|
|
+ //Check if connections are available
|
|
|
+ c.mu.Lock()
|
|
|
+ conns := len(c.conns)
|
|
|
+ c.mu.Unlock()
|
|
|
+
|
|
|
+ if conns == 0 {
|
|
|
+ //try to populate the pool before returning.
|
|
|
+ c.fillPool()
|
|
|
+ }
|
|
|
+
|
|
|
+ return c.hostPool.Pick(qry)
|
|
|
+}
|
|
|
+
|
|
|
+//Size returns the number of connections currently active in the pool
|
|
|
+func (p *SimplePool) Size() int {
|
|
|
+ p.mu.Lock()
|
|
|
+ conns := len(p.conns)
|
|
|
+ p.mu.Unlock()
|
|
|
+ return conns
|
|
|
+}
|
|
|
+
|
|
|
+//Close kills the pool and all associated connections.
|
|
|
+func (c *SimplePool) Close() {
|
|
|
+ c.quitOnce.Do(func() {
|
|
|
+ c.mu.Lock()
|
|
|
+ defer c.mu.Unlock()
|
|
|
+ c.quit = true
|
|
|
+ close(c.quitWait)
|
|
|
+ for conn := range c.conns {
|
|
|
+ c.removeConnLocked(conn)
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|