فهرست منبع

Merge pull request #604 from Shopify/broker-locking-tweaks

Adjust broker locking order slightly
Evan Huus 9 سال پیش
والد
کامیت
1663b489d3
1فایلهای تغییر یافته به همراه7 افزوده شده و 13 حذف شده
  1. 7 13
      broker.go

+ 7 - 13
broker.go

@@ -45,6 +45,10 @@ func NewBroker(addr string) *Broker {
 // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
 // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
 func (b *Broker) Open(conf *Config) error {
+	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
+		return ErrAlreadyConnected
+	}
+
 	if conf == nil {
 		conf = NewConfig()
 	}
@@ -54,18 +58,8 @@ func (b *Broker) Open(conf *Config) error {
 		return err
 	}
 
-	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
-		return ErrAlreadyConnected
-	}
-
 	b.lock.Lock()
 
-	if b.conn != nil {
-		b.lock.Unlock()
-		Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected)
-		return ErrAlreadyConnected
-	}
-
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
@@ -80,9 +74,9 @@ func (b *Broker) Open(conf *Config) error {
 			b.conn, b.connErr = dialer.Dial("tcp", b.addr)
 		}
 		if b.connErr != nil {
+			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)
-			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			return
 		}
 		b.conn = newBufConn(b.conn)
@@ -129,14 +123,14 @@ func (b *Broker) Close() error {
 	b.done = nil
 	b.responses = nil
 
-	atomic.StoreInt32(&b.opened, 0)
-
 	if err == nil {
 		Logger.Printf("Closed connection to broker %s\n", b.addr)
 	} else {
 		Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
 	}
 
+	atomic.StoreInt32(&b.opened, 0)
+
 	return err
 }