Explorar o código

Publish Broker.Connect() but don't connect automatically when decoding.

Evan Huus %!s(int64=12) %!d(string=hai) anos
pai
achega
087b16cf8b
Modificáronse 2 ficheiros con 25 adicións e 26 borrados
  1. 21 26
      broker.go
  2. 4 0
      metadata_cache.go

+ 21 - 26
broker.go

@@ -36,13 +36,33 @@ func NewBroker(host string, port int32) (b *Broker, err error) {
 	b.id = -1 // don't know it yet
 	b.host = &host
 	b.port = port
-	err = b.connect()
+	err = b.Connect()
 	if err != nil {
 		return nil, err
 	}
 	return b, nil
 }
 
+func (b *Broker) Connect() (err error) {
+	addr, err := net.ResolveIPAddr("ip", *b.host)
+	if err != nil {
+		return err
+	}
+
+	b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port), Zone: addr.Zone})
+	if err != nil {
+		return err
+	}
+
+	b.requests = make(chan requestToSend)
+	b.responses = make(chan responsePromise)
+
+	go b.sendRequestLoop()
+	go b.rcvResponseLoop()
+
+	return nil
+}
+
 func (b *Broker) Close() error {
 	close(b.requests)
 	close(b.responses)
@@ -77,26 +97,6 @@ func (b *Broker) Send(clientID *string, req requestEncoder) (decoder, error) {
 	return response, nil
 }
 
-func (b *Broker) connect() (err error) {
-	addr, err := net.ResolveIPAddr("ip", *b.host)
-	if err != nil {
-		return err
-	}
-
-	b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port), Zone: addr.Zone})
-	if err != nil {
-		return err
-	}
-
-	b.requests = make(chan requestToSend)
-	b.responses = make(chan responsePromise)
-
-	go b.sendRequestLoop()
-	go b.rcvResponseLoop()
-
-	return nil
-}
-
 func (b *Broker) encode(pe packetEncoder) {
 	pe.putInt32(b.id)
 	pe.putString(b.host)
@@ -122,11 +122,6 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
 		return DecodingError("Broker port > 65536")
 	}
 
-	err = b.connect()
-	if err != nil {
-		return err
-	}
-
 	return nil
 }
 

+ 4 - 0
metadata_cache.go

@@ -98,6 +98,10 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 
 	for i := range response.Brokers {
 		broker := &response.Brokers[i]
+		err = broker.Connect()
+		if err != nil {
+			return err
+		}
 		mc.brokers[broker.id] = broker
 	}