Browse Source

Adjust broker locking order slightly

I was doing an unrelated experiment with a config that generated a warning on
validation, and I ended up flooded with warnings because we now call `Open` on
every broker returned from the Client, and we were revalidating the config every
single time. One thing led to another and I made several other adjustments in
the same vein.

- Move the atomic `AlreadyConnected` check to before the config validation in
  `Broker.Open`. Config validation is expensive and can log, no point in doing
  it if we don't have to.
- Move all the `atomic.StoreInt32` calls to *after* any of their related log
  messages. None of these were real problems, but it removes any possibility of
  out-of-order log messages due to unusual race conditions.
- Remove the double-check for `AlreadyConnected` after we've taken the full
  lock. Initially I just removed the log message since it's not something we
  need to log, but upon closer inspection I'm pretty sure this is dead code;
  anything that sets `opened` to 0 also sets `conn` to nil *first*, and does so
  while holding the lock.
Evan Huus 9 years ago
parent
commit
62e4ae8645
1 changed files with 7 additions and 13 deletions
  1. 7 13
      broker.go

+ 7 - 13
broker.go

@@ -45,6 +45,10 @@ func NewBroker(addr string) *Broker {
 // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
 // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
 func (b *Broker) Open(conf *Config) error {
+	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
+		return ErrAlreadyConnected
+	}
+
 	if conf == nil {
 		conf = NewConfig()
 	}
@@ -54,18 +58,8 @@ func (b *Broker) Open(conf *Config) error {
 		return err
 	}
 
-	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
-		return ErrAlreadyConnected
-	}
-
 	b.lock.Lock()
 
-	if b.conn != nil {
-		b.lock.Unlock()
-		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected)
-		return ErrAlreadyConnected
-	}
-
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
@@ -80,9 +74,9 @@ func (b *Broker) Open(conf *Config) error {
 			b.conn, b.connErr = dialer.Dial("tcp", b.addr)
 		}
 		if b.connErr != nil {
+			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)
-			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			return
 		}
 		b.conn = newBufConn(b.conn)
@@ -129,14 +123,14 @@ func (b *Broker) Close() error {
 	b.done = nil
 	b.responses = nil
 
-	atomic.StoreInt32(&b.opened, 0)
-
 	if err == nil {
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
 	} else {
 		Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
 	}
 
+	atomic.StoreInt32(&b.opened, 0)
+
 	return err
 }