Browse Source

Add an option to enable TCP keepalive.

This is only available on go 1.2 onwards. Defaults to being off
and only enabled explicitly.
Chris Bannister 11 years ago
parent
commit
52364c6341
4 changed files with 60 additions and 16 deletions
  1. 19 16
      cluster.go
  2. 7 0
      conn.go
  3. 13 0
      conn_go11.go
  4. 21 0
      conn_go12.go

+ 19 - 16
cluster.go

@@ -18,22 +18,23 @@ import (
 // behavior to fit the most common use cases. Applications that requre a
 // behavior to fit the most common use cases. Applications that requre a
 // different setup must implement their own cluster.
 // different setup must implement their own cluster.
 type ClusterConfig struct {
 type ClusterConfig struct {
-	Hosts          []string      // addresses for the initial connections
-	CQLVersion     string        // CQL version (default: 3.0.0)
-	ProtoVersion   int           // version of the native protocol (default: 2)
-	Timeout        time.Duration // connection timeout (default: 600ms)
-	DefaultPort    int           // default port (default: 9042)
-	Keyspace       string        // initial keyspace (optional)
-	NumConns       int           // number of connections per host (default: 2)
-	NumStreams     int           // number of streams per connection (default: 128)
-	DelayMin       time.Duration // minimum reconnection delay (default: 1s)
-	DelayMax       time.Duration // maximum reconnection delay (default: 10min)
-	StartupMin     int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
-	StartupTimeout time.Duration // amount of to wait for a connection (default: 5s)
-	Consistency    Consistency   // default consistency level (default: Quorum)
-	Compressor     Compressor    // compression algorithm (default: nil)
-	Authenticator  Authenticator // authenticator (default: nil)
-	RetryPolicy    RetryPolicy   // Default retry policy to use for queries(default:0)
+	Hosts           []string      // addresses for the initial connections
+	CQLVersion      string        // CQL version (default: 3.0.0)
+	ProtoVersion    int           // version of the native protocol (default: 2)
+	Timeout         time.Duration // connection timeout (default: 600ms)
+	DefaultPort     int           // default port (default: 9042)
+	Keyspace        string        // initial keyspace (optional)
+	NumConns        int           // number of connections per host (default: 2)
+	NumStreams      int           // number of streams per connection (default: 128)
+	DelayMin        time.Duration // minimum reconnection delay (default: 1s)
+	DelayMax        time.Duration // maximum reconnection delay (default: 10min)
+	StartupMin      int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
+	StartupTimeout  time.Duration // amount of to wait for a connection (default: 5s)
+	Consistency     Consistency   // default consistency level (default: Quorum)
+	Compressor      Compressor    // compression algorithm (default: nil)
+	Authenticator   Authenticator // authenticator (default: nil)
+	RetryPolicy     RetryPolicy   // Default retry policy to use for queries (default: 0)
+	SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
 }
 }
 
 
 // NewCluster generates a new config for the default cluster implementation.
 // NewCluster generates a new config for the default cluster implementation.
@@ -119,7 +120,9 @@ func (c *clusterImpl) connect(addr string) {
 		NumStreams:    c.cfg.NumStreams,
 		NumStreams:    c.cfg.NumStreams,
 		Compressor:    c.cfg.Compressor,
 		Compressor:    c.cfg.Compressor,
 		Authenticator: c.cfg.Authenticator,
 		Authenticator: c.cfg.Authenticator,
+		Keepalive:     c.cfg.SocketKeepalive,
 	}
 	}
+
 	delay := c.cfg.DelayMin
 	delay := c.cfg.DelayMin
 	for {
 	for {
 		conn, err := Connect(addr, cfg, c)
 		conn, err := Connect(addr, cfg, c)

+ 7 - 0
conn.go

@@ -59,6 +59,7 @@ type ConnConfig struct {
 	NumStreams    int
 	NumStreams    int
 	Compressor    Compressor
 	Compressor    Compressor
 	Authenticator Authenticator
 	Authenticator Authenticator
+	Keepalive     time.Duration
 }
 }
 
 
 // Conn is a single connection to a Cassandra node. It can be used to execute
 // Conn is a single connection to a Cassandra node. It can be used to execute
@@ -90,6 +91,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
 	if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
 		cfg.NumStreams = 128
 		cfg.NumStreams = 128
 	}
 	}
@@ -109,6 +111,11 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 		compressor: cfg.Compressor,
 		compressor: cfg.Compressor,
 		auth:       cfg.Authenticator,
 		auth:       cfg.Authenticator,
 	}
 	}
+
+	if cfg.Keepalive > 0 {
+		c.setKeepalive(cfg.Keepalive)
+	}
+
 	for i := 0; i < cap(c.uniq); i++ {
 	for i := 0; i < cap(c.uniq); i++ {
 		c.uniq <- uint8(i)
 		c.uniq <- uint8(i)
 	}
 	}

+ 13 - 0
conn_go11.go

@@ -0,0 +1,13 @@
+// +build !go1.2
+
+package gocql
+
+import (
+	"log"
+	"time"
+)
+
+func (c *Conn) setKeepalive(d time.Duration) error {
+	log.Println("WARN: KeepAlive provided but not supported on Go < 1.2")
+	return nil
+}

+ 21 - 0
conn_go12.go

@@ -0,0 +1,21 @@
+// +build go1.2
+
+package gocql
+
+import (
+	"net"
+	"time"
+)
+
+func (c *Conn) setKeepalive(d time.Duration) error {
+	if tc, ok := c.conn.(*net.TCPConn); ok {
+		err := tc.SetKeepAlivePeriod(d)
+		if err != nil {
+			return err
+		}
+
+		return tc.SetKeepAlive(true)
+	}
+
+	return nil
+}