Browse Source

Merge pull request #306 from Shopify/nonblocking-open

Make Broker.Open truly non-blocking
Evan Huus 10 years ago
parent
commit
8ae77e0583
2 changed files with 16 additions and 7 deletions
  1. 14 5
      broker.go
  2. 2 2
      errors.go

+ 14 - 5
broker.go

@@ -6,6 +6,7 @@ import (
 	"net"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -61,6 +62,7 @@ type Broker struct {
 	conn          net.Conn
 	connErr       error
 	lock          sync.Mutex
+	opened        int32
 
 	responses chan responsePromise
 	done      chan bool
@@ -78,11 +80,11 @@ func NewBroker(addr string) *Broker {
 	return &Broker{id: -1, addr: addr}
 }
 
-// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
-// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
-// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
-// The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of
-// NewBrokerConfig() is used.
+// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
+// waiting for the connection to complete. This means that any subsequent operations on the broker will
+// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
+// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
+// AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
 func (b *Broker) Open(conf *BrokerConfig) error {
 	if conf == nil {
 		conf = NewBrokerConfig()
@@ -93,6 +95,10 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 		return err
 	}
 
+	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
+		return ErrAlreadyConnected
+	}
+
 	b.lock.Lock()
 
 	if b.conn != nil {
@@ -107,6 +113,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
 		if b.connErr != nil {
 			b.conn = nil
+			atomic.StoreInt32(&b.opened, 0)
 			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			return
 		}
@@ -156,6 +163,8 @@ func (b *Broker) Close() (err error) {
 	b.done = nil
 	b.responses = nil
 
+	atomic.StoreInt32(&b.opened, 0)
+
 	return
 }
 

+ 2 - 2
errors.go

@@ -20,8 +20,8 @@ var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the
 // (meaning one outside of the range [0...numPartitions-1]).
 var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
 
-// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
-var ErrAlreadyConnected = errors.New("kafka: broker already connected")
+// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
+var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
 
 // ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 var ErrNotConnected = errors.New("kafka: broker not connected")