|
@@ -1,6 +1,11 @@
|
|
|
package gocql
|
|
package gocql
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "crypto/tls"
|
|
|
|
|
+ "crypto/x509"
|
|
|
|
|
+ "errors"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ "io/ioutil"
|
|
|
"log"
|
|
"log"
|
|
|
"sync"
|
|
"sync"
|
|
|
"time"
|
|
"time"
|
|
@@ -91,7 +96,7 @@ type ConnectionPool interface {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
|
|
//NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
|
|
|
-type NewPoolFunc func(*ClusterConfig) ConnectionPool
|
|
|
|
|
|
|
+type NewPoolFunc func(*ClusterConfig) (ConnectionPool, error)
|
|
|
|
|
|
|
|
//SimplePool is the current implementation of the connection pool inside gocql. This
|
|
//SimplePool is the current implementation of the connection pool inside gocql. This
|
|
|
//pool is meant to be a simple default used by gocql so users can get up and running
|
|
//pool is meant to be a simple default used by gocql so users can get up and running
|
|
@@ -115,11 +120,42 @@ type SimplePool struct {
|
|
|
quit bool
|
|
quit bool
|
|
|
quitWait chan bool
|
|
quitWait chan bool
|
|
|
quitOnce sync.Once
|
|
quitOnce sync.Once
|
|
|
|
|
+
|
|
|
|
|
+ tlsConfig *tls.Config
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
|
|
|
|
|
+ certPool := x509.NewCertPool()
|
|
|
|
|
+ // ca cert is optional
|
|
|
|
|
+ if sslOpts.CaPath != "" {
|
|
|
|
|
+ pem, err := ioutil.ReadFile(sslOpts.CaPath)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, fmt.Errorf("connectionpool: unable to open CA certs: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if !certPool.AppendCertsFromPEM(pem) {
|
|
|
|
|
+ return nil, errors.New("connectionpool: failed parsing or CA certs")
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ mycert, err := tls.LoadX509KeyPair(sslOpts.CertPath, sslOpts.KeyPath)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, fmt.Errorf("connectionpool: unable to load X509 key pair: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ config := &tls.Config{
|
|
|
|
|
+ Certificates: []tls.Certificate{mycert},
|
|
|
|
|
+ RootCAs: certPool,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ config.InsecureSkipVerify = !sslOpts.EnableHostVerification
|
|
|
|
|
+
|
|
|
|
|
+ return config, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//NewSimplePool is the function used by gocql to create the simple connection pool.
|
|
//NewSimplePool is the function used by gocql to create the simple connection pool.
|
|
|
//This is the default if no other pool type is specified.
|
|
//This is the default if no other pool type is specified.
|
|
|
-func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
|
|
|
|
|
|
|
+func NewSimplePool(cfg *ClusterConfig) (ConnectionPool, error) {
|
|
|
pool := &SimplePool{
|
|
pool := &SimplePool{
|
|
|
cfg: cfg,
|
|
cfg: cfg,
|
|
|
hostPool: NewRoundRobin(),
|
|
hostPool: NewRoundRobin(),
|
|
@@ -137,6 +173,14 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
|
|
|
pool.hosts[host] = &HostInfo{Peer: host}
|
|
pool.hosts[host] = &HostInfo{Peer: host}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if cfg.SslOpts != nil {
|
|
|
|
|
+ config, err := setupTLSConfig(cfg.SslOpts)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ pool.tlsConfig = config
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
//Walk through connecting to hosts. As soon as one host connects
|
|
//Walk through connecting to hosts. As soon as one host connects
|
|
|
//defer the remaining connections to cluster.fillPool()
|
|
//defer the remaining connections to cluster.fillPool()
|
|
|
for i := 0; i < len(cfg.Hosts); i++ {
|
|
for i := 0; i < len(cfg.Hosts); i++ {
|
|
@@ -149,7 +193,7 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return pool
|
|
|
|
|
|
|
+ return pool, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *SimplePool) connect(addr string) error {
|
|
func (c *SimplePool) connect(addr string) error {
|
|
@@ -162,7 +206,7 @@ func (c *SimplePool) connect(addr string) error {
|
|
|
Compressor: c.cfg.Compressor,
|
|
Compressor: c.cfg.Compressor,
|
|
|
Authenticator: c.cfg.Authenticator,
|
|
Authenticator: c.cfg.Authenticator,
|
|
|
Keepalive: c.cfg.SocketKeepalive,
|
|
Keepalive: c.cfg.SocketKeepalive,
|
|
|
- SslOpts: c.cfg.SslOpts,
|
|
|
|
|
|
|
+ TLSConfig: c.tlsConfig,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
conn, err := Connect(addr, cfg, c)
|
|
conn, err := Connect(addr, cfg, c)
|