Browse Source

Make Broker.Open truly non-blocking

The intent behind the design was so that the method was always safe to call e.g.
when a lock was held, by making it async, however there were still certain cases
where it could have blocked:
 - if another thread had already called Open, subsequent calls would block until
   that network io completed
 - if the connection was already open *and* the pipeline was full
   (MaxOpenRequests were in flight) then calling Open would block until at least
   one response was received

This adds a simple atomic flag we can use to short-circuit appropriately, such
that Open should now have the following properties
 - it *never* blocks on network IO
 - it guarantees that the broker is connected or connecting when it returns
   (modulo another thread calling Close in the interim)

AFAICT these are the properties it needs for lazy broker connection to be
possible in the client.
Evan Huus 10 years ago
parent
commit
9d71abeed0
2 changed files with 16 additions and 7 deletions
  1. 14 5
      broker.go
  2. 2 2
      errors.go

+ 14 - 5
broker.go

@@ -6,6 +6,7 @@ import (
 	"net"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -61,6 +62,7 @@ type Broker struct {
 	conn          net.Conn
 	connErr       error
 	lock          sync.Mutex
+	opened        int32
 
 	responses chan responsePromise
 	done      chan bool
@@ -78,11 +80,11 @@ func NewBroker(addr string) *Broker {
 	return &Broker{id: -1, addr: addr}
 }
 
-// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
-// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
-// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
-// The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of
-// NewBrokerConfig() is used.
+// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
+// waiting for the connection to complete. This means that any subsequent operations on the broker will
+// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
+// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
+// AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
 func (b *Broker) Open(conf *BrokerConfig) error {
 	if conf == nil {
 		conf = NewBrokerConfig()
@@ -93,6 +95,10 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 		return err
 	}
 
+	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
+		return ErrAlreadyConnected
+	}
+
 	b.lock.Lock()
 
 	if b.conn != nil {
@@ -107,6 +113,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
 		if b.connErr != nil {
 			b.conn = nil
+			atomic.StoreInt32(&b.opened, 0)
 			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			return
 		}
@@ -156,6 +163,8 @@ func (b *Broker) Close() (err error) {
 	b.done = nil
 	b.responses = nil
 
+	atomic.StoreInt32(&b.opened, 0)
+
 	return
 }
 

+ 2 - 2
errors.go

@@ -20,8 +20,8 @@ var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the
 // (meaning one outside of the range [0...numPartitions-1]).
 var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
 
-// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
-var ErrAlreadyConnected = errors.New("kafka: broker already connected")
+// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
+var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
 
 // ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 var ErrNotConnected = errors.New("kafka: broker not connected")