瀏覽代碼

Merge pull request #118 from Zariel/socket-keepalive

Add an option to enable TCP keepalive.
Ben Hood 11 年之前
父節點
當前提交
03f7e4fe3e
共有 4 個文件被更改,包括 60 次插入16 次删除
  1. 19 16
      cluster.go
  2. 7 0
      conn.go
  3. 13 0
      conn_go11.go
  4. 21 0
      conn_go12.go

+ 19 - 16
cluster.go

@@ -18,22 +18,23 @@ import (
 // behavior to fit the most common use cases. Applications that requre a
 // different setup must implement their own cluster.
 type ClusterConfig struct {
-	Hosts          []string      // addresses for the initial connections
-	CQLVersion     string        // CQL version (default: 3.0.0)
-	ProtoVersion   int           // version of the native protocol (default: 2)
-	Timeout        time.Duration // connection timeout (default: 600ms)
-	DefaultPort    int           // default port (default: 9042)
-	Keyspace       string        // initial keyspace (optional)
-	NumConns       int           // number of connections per host (default: 2)
-	NumStreams     int           // number of streams per connection (default: 128)
-	DelayMin       time.Duration // minimum reconnection delay (default: 1s)
-	DelayMax       time.Duration // maximum reconnection delay (default: 10min)
-	StartupMin     int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
-	StartupTimeout time.Duration // amount of to wait for a connection (default: 5s)
-	Consistency    Consistency   // default consistency level (default: Quorum)
-	Compressor     Compressor    // compression algorithm (default: nil)
-	Authenticator  Authenticator // authenticator (default: nil)
-	RetryPolicy    RetryPolicy   // Default retry policy to use for queries(default:0)
+	Hosts           []string      // addresses for the initial connections
+	CQLVersion      string        // CQL version (default: 3.0.0)
+	ProtoVersion    int           // version of the native protocol (default: 2)
+	Timeout         time.Duration // connection timeout (default: 600ms)
+	DefaultPort     int           // default port (default: 9042)
+	Keyspace        string        // initial keyspace (optional)
+	NumConns        int           // number of connections per host (default: 2)
+	NumStreams      int           // number of streams per connection (default: 128)
+	DelayMin        time.Duration // minimum reconnection delay (default: 1s)
+	DelayMax        time.Duration // maximum reconnection delay (default: 10min)
+	StartupMin      int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
+	StartupTimeout  time.Duration // amount of to wait for a connection (default: 5s)
+	Consistency     Consistency   // default consistency level (default: Quorum)
+	Compressor      Compressor    // compression algorithm (default: nil)
+	Authenticator   Authenticator // authenticator (default: nil)
+	RetryPolicy     RetryPolicy   // Default retry policy to use for queries (default: 0)
+	SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
 }
 
 // NewCluster generates a new config for the default cluster implementation.
@@ -119,7 +120,9 @@ func (c *clusterImpl) connect(addr string) {
 		NumStreams:    c.cfg.NumStreams,
 		Compressor:    c.cfg.Compressor,
 		Authenticator: c.cfg.Authenticator,
+		Keepalive:     c.cfg.SocketKeepalive,
 	}
+
 	delay := c.cfg.DelayMin
 	for {
 		conn, err := Connect(addr, cfg, c)

+ 7 - 0
conn.go

@@ -59,6 +59,7 @@ type ConnConfig struct {
 	NumStreams    int
 	Compressor    Compressor
 	Authenticator Authenticator
+	Keepalive     time.Duration
 }
 
 // Conn is a single connection to a Cassandra node. It can be used to execute
@@ -90,6 +91,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 	if err != nil {
 		return nil, err
 	}
+
 	if cfg.NumStreams <= 0 || cfg.NumStreams > 128 {
 		cfg.NumStreams = 128
 	}
@@ -109,6 +111,11 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 		compressor: cfg.Compressor,
 		auth:       cfg.Authenticator,
 	}
+
+	if cfg.Keepalive > 0 {
+		c.setKeepalive(cfg.Keepalive)
+	}
+
 	for i := 0; i < cap(c.uniq); i++ {
 		c.uniq <- uint8(i)
 	}

+ 13 - 0
conn_go11.go

@@ -0,0 +1,13 @@
+// +build !go1.2
+
+package gocql
+
+import (
+	"log"
+	"time"
+)
+
+func (c *Conn) setKeepalive(d time.Duration) error {
+	log.Println("WARN: KeepAlive provided but not supported on Go < 1.2")
+	return nil
+}

+ 21 - 0
conn_go12.go

@@ -0,0 +1,21 @@
+// +build go1.2
+
+package gocql
+
+import (
+	"net"
+	"time"
+)
+
+func (c *Conn) setKeepalive(d time.Duration) error {
+	if tc, ok := c.conn.(*net.TCPConn); ok {
+		err := tc.SetKeepAlivePeriod(d)
+		if err != nil {
+			return err
+		}
+
+		return tc.SetKeepAlive(true)
+	}
+
+	return nil
+}