|
|
@@ -8,7 +8,84 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-//ConnectionPool is the interface gocql expects to be exposed for a connection pool.
|
|
|
+/*ConnectionPool represents the interface gocql will use to work with a collection of connections.
|
|
|
+
|
|
|
+Purpose
|
|
|
+
|
|
|
+The connection pool in gocql opens and closes connections as well as selects an available connection
|
|
|
+for gocql to execute a query against. The pool is also respnsible for handling connection errors that
|
|
|
+are caught by the connection experiencing the error.
|
|
|
+
|
|
|
+A connection pool should make a copy of the variables used from the ClusterConfig provided to the pool
|
|
|
+upon creation. ClusterConfig is a pointer and can be modified after the creation of the pool. This can
|
|
|
+lead to issues with variables being modified outside the expectations of the ConnectionPool type.
|
|
|
+
|
|
|
+Example of Single Connection Pool:
|
|
|
+
|
|
|
+ type SingleConnection struct {
|
|
|
+ conn *Conn
|
|
|
+ cfg *ClusterConfig
|
|
|
+ }
|
|
|
+
|
|
|
+ func NewSingleConnection(cfg *ClusterConfig) ConnectionPool {
|
|
|
+ addr := strings.TrimSpace(cfg.Hosts[0])
|
|
|
+ if strings.Index(addr, ":") < 0 {
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
|
|
|
+ }
|
|
|
+ connCfg := ConnConfig{
|
|
|
+ ProtoVersion: cfg.ProtoVersion,
|
|
|
+ CQLVersion: cfg.CQLVersion,
|
|
|
+ Timeout: cfg.Timeout,
|
|
|
+ NumStreams: cfg.NumStreams,
|
|
|
+ Compressor: cfg.Compressor,
|
|
|
+ Authenticator: cfg.Authenticator,
|
|
|
+ Keepalive: cfg.SocketKeepalive,
|
|
|
+ }
|
|
|
+ pool := SingleConnection{cfg:cfg}
|
|
|
+ pool.conn = Connect(addr,connCfg,pool)
|
|
|
+ return &pool
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) HandleError(conn *Conn, err error, closed bool) {
|
|
|
+ if closed {
|
|
|
+ connCfg := ConnConfig{
|
|
|
+ ProtoVersion: cfg.ProtoVersion,
|
|
|
+ CQLVersion: cfg.CQLVersion,
|
|
|
+ Timeout: cfg.Timeout,
|
|
|
+ NumStreams: cfg.NumStreams,
|
|
|
+ Compressor: cfg.Compressor,
|
|
|
+ Authenticator: cfg.Authenticator,
|
|
|
+ Keepalive: cfg.SocketKeepalive,
|
|
|
+ }
|
|
|
+ s.conn = Connect(conn.Address(),connCfg,s)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Pick(qry *Query) *Conn {
|
|
|
+ if s.conn.isClosed {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return s.conn
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Size() int {
|
|
|
+ return 1
|
|
|
+ }
|
|
|
+
|
|
|
+ func (s *SingleConnection) Close() {
|
|
|
+ s.conn.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+This is a very simple example of a type that exposes the connection pool interface. To assign
|
|
|
+this type as the connection pool to use you would assign it to the ClusterConfig like so:
|
|
|
+
|
|
|
+ cluster := NewCluster("127.0.0.1")
|
|
|
+ cluster.ConnPoolType = NewSingleConnection
|
|
|
+ ...
|
|
|
+ session, err := cluster.CreateSession()
|
|
|
+
|
|
|
+To see a more complete example of a ConnectionPool implementation please see the SimplePool type.
|
|
|
+*/
|
|
|
type ConnectionPool interface {
|
|
|
Pick(*Query) *Conn
|
|
|
Size() int
|
|
|
@@ -51,10 +128,10 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
|
|
|
}
|
|
|
//Walk through connecting to hosts. As soon as one host connects
|
|
|
//defer the remaining connections to cluster.fillPool()
|
|
|
- for i := 0; i < len(pool.cfg.Hosts); i++ {
|
|
|
- addr := strings.TrimSpace(pool.cfg.Hosts[i])
|
|
|
+ for i := 0; i < len(cfg.Hosts); i++ {
|
|
|
+ addr := strings.TrimSpace(cfg.Hosts[i])
|
|
|
if strings.Index(addr, ":") < 0 {
|
|
|
- addr = fmt.Sprintf("%s:%d", addr, pool.cfg.DefaultPort)
|
|
|
+ addr = fmt.Sprintf("%s:%d", addr, cfg.DefaultPort)
|
|
|
}
|
|
|
if pool.connect(addr) == nil {
|
|
|
pool.cFillingPool <- 1
|