Browse Source

Merge pull request #408 from lytics/master

Adding TCP keepalives support for the broker's connection
Evan Huus 10 years ago
parent
commit
9b048b0f55
3 changed files with 13 additions and 1 deletions
  1. 1 0
      CHANGELOG.md
  2. 6 1
      broker.go
  3. 6 0
      config.go

+ 1 - 0
CHANGELOG.md

@@ -10,6 +10,7 @@ Improvements:
    ([#396](https://github.com/Shopify/sarama/pull/396)).
  - The consumer produces much more useful logging output when leadership
    changes ([#385](https://github.com/Shopify/sarama/pull/385)).
+ - Added support for tcp keepalives ([#407](https://github.com/Shopify/sarama/issues/407)).
 
 Bug Fixes:
  - The OffsetCommitRequest message now correctly implements all three possible

+ 6 - 1
broker.go

@@ -68,7 +68,12 @@ func (b *Broker) Open(conf *Config) error {
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
-		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
+		dialer := net.Dialer{
+			Timeout:   conf.Net.DialTimeout,
+			KeepAlive: conf.Net.KeepAlive,
+		}
+
+		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
 		if b.connErr != nil {
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)

+ 6 - 0
config.go

@@ -12,6 +12,10 @@ type Config struct {
 		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
 		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
 		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
+
+		// KeepAlive specifies the keep-alive period for an active network connection.
+		// If zero, keep-alives are disabled. (default is 0: disabled).
+		KeepAlive time.Duration
 	}
 
 	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
@@ -186,6 +190,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
 	case c.Net.WriteTimeout <= 0:
 		return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
+	case c.Net.KeepAlive < 0:
+		return ConfigurationError("Invalid Net.KeepAlive, must be >= 0")
 	}
 
 	// validate the Metadata values