فهرست منبع

adding tcp keepalives to the broker's socket connection

eric 10 سال پیش
والد
کامیت
76aa5518b0
4فایلهای تغییر یافته به همراه25 افزوده شده و 2 حذف شده
  1. 1 1
      broker.go
  2. 2 0
      client_test.go
  3. 6 0
      config.go
  4. 16 1
      utils.go

+ 1 - 1
broker.go

@@ -68,7 +68,7 @@ func (b *Broker) Open(conf *Config) error {
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
-		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
+		b.conn, b.connErr = DialWithTiming("tcp", b.addr, conf.Net.DialTimeout, conf.Net.KeepAlive)
 		if b.connErr != nil {
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)

+ 2 - 0
client_test.go

@@ -4,6 +4,7 @@ import (
 	"io"
 	"sync"
 	"testing"
+	"time"
 )
 
 func safeClose(t *testing.T, c io.Closer) {
@@ -78,6 +79,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
+	config.Net.KeepAlive = 12 * time.Millisecond
 	config.Metadata.Retry.Max = 0
 	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {

+ 6 - 0
config.go

@@ -12,6 +12,11 @@ type Config struct {
 		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
 		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
 		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
+
+		// KeepAlive specifies the keep-alive period for an active
+		// network connection.
+		// If zero, keep-alives are not enabled.
+		KeepAlive time.Duration
 	}
 
 	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
@@ -126,6 +131,7 @@ func NewConfig() *Config {
 	c.Net.DialTimeout = 30 * time.Second
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
+	c.Net.KeepAlive = 0 * time.Second
 
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond

+ 16 - 1
utils.go

@@ -1,6 +1,10 @@
 package sarama
 
-import "sort"
+import (
+	"net"
+	"sort"
+	"time"
+)
 
 type none struct{}
 
@@ -87,3 +91,14 @@ func (b ByteEncoder) Encode() ([]byte, error) {
 func (b ByteEncoder) Length() int {
 	return len(b)
 }
+
+//DialWithTiming is exactly like net.DialTimeout from the net package, but adds support for tcp keepalives
+//  Some cloud providers, like google compute engine, kill idle connections after a few mins.  Adding the keepalive
+//  keeps the connection open.
+func DialWithTiming(network, address string, timeout time.Duration, keepalive time.Duration) (net.Conn, error) {
+	d := net.Dialer{
+		Timeout:   timeout,
+		KeepAlive: keepalive,
+	}
+	return d.Dial(network, address)
+}