Prechádzať zdrojové kódy

Match the golang database pattern

Replaces Connect and AsyncConnect with Open and Connected
Evan Huus 12 rokov pred
rodič
commit
934c5688a1
4 zmenil súbory, kde vykonal 41 pridanie a 39 odobranie
  1. 32 34
      broker.go
  2. 2 2
      broker_test.go
  3. 6 2
      client.go
  4. 1 1
      errors.go

+ 32 - 34
broker.go

@@ -28,7 +28,7 @@ type responsePromise struct {
 }
 
 // NewBroker creates and returns a Broker targetting the given host:port address.
-// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that.
+// This does not attempt to actually connect, you have to call Open() for that.
 func NewBroker(host string, port int32) *Broker {
 	b := new(Broker)
 	b.id = -1 // don't know it yet
@@ -37,52 +37,50 @@ func NewBroker(host string, port int32) *Broker {
 	return b
 }
 
-func (b *Broker) Connect() error {
+// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
+// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
+// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
+// The only error Open will return directly is AlreadyConnected.
+func (b *Broker) Open() error {
 	b.lock.Lock()
-	defer b.lock.Unlock()
-
-	return b.connect()
-}
 
-// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is
-// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before
-// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for
-// the connection instead of simply returning NotConnected. This does mean that if someone is already operating
-// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock.
-func (b *Broker) AsyncConnect() {
-	b.lock.Lock()
+	if b.conn != nil {
+		b.lock.Unlock()
+		return AlreadyConnected
+	}
 
 	go func() {
 		defer b.lock.Unlock()
-		b.connect()
-	}()
 
-}
+		var addr *net.IPAddr
+		addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
+		if b.conn_err != nil {
+			return
+		}
 
-func (b *Broker) connect() error {
-	if b.conn != nil {
-		return AlreadyConnected
-	}
+		b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
+		if b.conn_err != nil {
+			return
+		}
 
-	var addr *net.IPAddr
-	addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
-	if b.conn_err != nil {
-		return b.conn_err
-	}
+		b.done = make(chan bool)
 
-	b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
-	if b.conn_err != nil {
-		return b.conn_err
-	}
+		// permit a few outstanding requests before we block waiting for responses
+		b.responses = make(chan responsePromise, 4)
 
-	b.done = make(chan bool)
+		go b.responseReceiver()
+	}()
 
-	// permit a few outstanding requests before we block waiting for responses
-	b.responses = make(chan responsePromise, 4)
+	return nil
+}
 
-	go b.responseReceiver()
+// Connected returns true if the broker is connected and false otherwise. If the broker is not
+// connected but it had tried to connect, the error from that connection attempt is also returned.
+func (b *Broker) Connected() (bool, error) {
+	b.lock.Lock()
+	defer b.lock.Unlock()
 
-	return nil
+	return b.conn != nil, b.conn_err
 }
 
 func (b *Broker) Close() error {

+ 2 - 2
broker_test.go

@@ -141,7 +141,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost", 9092)
-	err := broker.Connect()
+	err := broker.Open()
 	if err != nil {
 		return err
 	}
@@ -217,7 +217,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mockBroker.Close()
 
 	broker := NewBroker("localhost", mockBroker.Port())
-	err := broker.Connect()
+	err := broker.Open()
 	if err != nil {
 		t.Fatal(err)
 	}

+ 6 - 2
client.go

@@ -22,7 +22,11 @@ type Client struct {
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
 func NewClient(id string, host string, port int32) (client *Client, err error) {
 	tmp := NewBroker(host, port)
-	err = tmp.Connect()
+	err = tmp.Open()
+	if err != nil {
+		return nil, err
+	}
+	_, err = tmp.Connected()
 	if err != nil {
 		return nil, err
 	}
@@ -227,7 +231,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		if client.brokers[broker.ID()] != nil {
 			go client.brokers[broker.ID()].Close()
 		}
-		broker.AsyncConnect()
+		broker.Open()
 		client.brokers[broker.ID()] = broker
 	}
 

+ 1 - 1
errors.go

@@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp
 // (meaning one outside of the range [0...numPartitions-1]).
 var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")
 
-// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
+// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
 var AlreadyConnected = errors.New("kafka: broker: already connected")
 
 // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.