|
|
@@ -6,6 +6,7 @@ import (
|
|
|
"sync"
|
|
|
)
|
|
|
|
|
|
+// A single Kafka broker. All operations on this object are entirely concurrency-safe.
|
|
|
type Broker struct {
|
|
|
id int32
|
|
|
host *string
|
|
|
@@ -25,6 +26,8 @@ type responsePromise struct {
|
|
|
errors chan error
|
|
|
}
|
|
|
|
|
|
+// Creates and returns a Broker targetting the given host:port address.
|
|
|
+// This does not attempt to actually connect, you have to call Connect() for that.
|
|
|
func NewBroker(host string, port int32) *Broker {
|
|
|
b := new(Broker)
|
|
|
b.id = -1 // don't know it yet
|
|
|
@@ -33,7 +36,7 @@ func NewBroker(host string, port int32) *Broker {
|
|
|
return b
|
|
|
}
|
|
|
|
|
|
-func (b *Broker) Connect() (err error) {
|
|
|
+func (b *Broker) Connect() error {
|
|
|
b.lock.Lock()
|
|
|
defer b.lock.Unlock()
|
|
|
|
|
|
@@ -67,6 +70,7 @@ func (b *Broker) Close() error {
|
|
|
err := b.conn.Close()
|
|
|
|
|
|
b.conn = nil
|
|
|
+ b.done = nil
|
|
|
b.responses = nil
|
|
|
|
|
|
return err
|